In this problem set, you will be implementing a simplified version of Google's MapReduce. Prior to starting this problem set, read the short paper on MapReduce to familiarize yourself with the basic architecture. This writeup will assume that you have read sections 1–3.1 of that paper.
The map and reduce functions have the following OCaml types:
val map : 'a -> 'b -> ('c * 'd) list val reduce : 'c -> 'd list -> 'e list
These functions will be computed in a distributed fashion by several agents running concurrently. The agents communicate with each other using a messaging protocol defined in shared/protocol.ml. However, that protocol only allows for the transmission of strings, so you must use OCaml's built-in marshalling and unmarshalling facilities to transmit values of different types. We will explain this more thoroughly below.
For a given MapReduce application foo
, the application-specific code
is stored in the directory apps/foo/. The code for the mappers is in
apps/foo/mapper.ml and the code for the reducers is in
apps/foo/reducer.ml. There is also a controller in the file
apps/foo/foo.ml to handle the initialization of and communication with
the mappers and reducers, reading inputs, and printing results.
The mappers and reducers receive
their input by calling Program.get_input
and communicate their
results by calling Program.set_output
. The specific mechanisms
for these operations are described below.
Execution is divided into five phases:
controller.exe
is invoked with the name of a MapReduce application on the command
line along with other application-specific information (e.g., input file name,
any other parameters as required by the application). This information is passed to
main
method of the controller in the application directory.
This method reads the data from the input file
and parses it into a list of (key, value) pairs to provide as input to the mappers.
It then connects to worker servers and initializes mappers on these servers.
The controller sends each (key, value) pair to an available mapper. The mapper applies the mapping function on this (key, value) pair, resulting in a list of (key, value) pairs (the types of the input and output do not have to be the same). The resulting list of (key, value) pairs is sent back to the controller. The controller continues to send inputs to the next available mapper until all pairs have been mapped.
For each key produced by the mappers, all of its corresponding values are collected into a single list, resulting in a (key, value list) pair for that key.
The controller connects to worker servers and initializes reducers on these servers. The (key, value list) pairs produced in the Combine phase are then sent to available reducers. The reducers perform the reduce operation and send the results back to the controller. This continues until all input has been reduced.
The controller collects the results of the Reduce phase and outputs them in some manner.
Consider the canonical MapReduce example of counting the number of occurrences of each word in a set of documents. The input is a set of (document id, body) pairs. In the Map phase, each word that occurs in the body of a file is mapped to the pair (word, "1"), indicating that the word has been seen once. In the Combine phase, all pairs with the same first component are collected to form the pair (word, ["1"; "1"; ...; "1"]), one such pair for each word. Then in the Reduce phase, for each word, the list of ones is summed to determine the number of occurrences of that word.
For simplicity, our framework accepts only a single data file containing all the
input documents. Each document is represented in the input file as a
(id, document name, body) triple. See
data/reuters.txt,
a collection of Reuters articles from the 1980's,
or the shorter files data/test1.txt
and data/test2.txt for an example of the formatting.
Map_reduce.map_reduce
prepares a document file
by first calling
Util.load_documents
and then formatting the documents into
(key, value) pairs for the mapper.
We have included a full implementation of this application in apps/word_count. Here is a detailed explanation of the sequence of events.
controller.exe word_count filename
. It immediately calls
the main
method of apps/word_count/word_count.ml
,
passing it the argument list. The main
method calls
controller/Map_reduce.map_reduce
, which is common controller code
for performing a simple one-phase MapReduce on documents. Other more involved
applications have their own controller code.
filename
are read in and parsed
using Util.load_documents
which splits the collection of documents into {id; title; body} triples.
These triples are converted into (id, body) pairs.
Map_reduce.map kv_pairs "" "apps/word_count/mapper.ml"
.
The second argument represents shared data that is accessible to
all mappers in addition to their input (key, value) pair. For this application,
there is no shared data, so this argument is the empty string. In general,
the shared data can be accessed by a mapper using Program.get_shared_data()
.
Map_reduce.map
initializes a mapper worker manager using
Worker_manager.initialize_mappers "apps/word_count/mapper.ml" ""
.
The Worker_manager
loads in the list of available workers from
the file named addresses. Each line of this file contains a
worker address of the form ip_address:port_number. Each of these
workers is sent the mapper code. The worker creates a mapper with that code and
and sends back the id of the resulting mapper. This id is combined with the address
of the worker to uniquely identify that mapper. If certain workers are unavailable,
this function will report this fact, but will continue to run successfully.
Map_reduce.map
then sends individual unmapped (id, body)
pairs to available mappers until it has received results for all pairs. Free
mappers are obtained using Worker_manager.pop_worker()
. Mappers
should be released once their results have been received using
Worker_manager.push_worker
. Read
controller/worker_manager.mli for more complete documentation.
Once all (id, body) pairs have been mapped, the new list of (word, "1")
pairs is returned.
Map_reduce.map_reduce
receives the results of the mapping.
If desired, Util.print_map_results
can be called here to display the
results of the Map phase for debugging purposes.
Map_reduce.combine
. The results
can be displayed using Util.print_combine_results
.
Map_reduce.reduce
is then called with the results of
Map_reduce.combine
, along with the (empty) shared data string
and the name of the reducer file apps/word_count/reducer.ml
.
Map_reduce.reduce
initializes the reducer worker manager by calling the
appropriate Worker_manager
function, which retrieves worker
addresses in the same manner as for mappers.
Map_reduce.reduce
then sends the unreduced (word, count list)
pairs to available reducers until it has received results for all input.
This is performed in essentially the same manner as the map phase. When
all pairs have been reduced, the new list of (word, count) tuples is
returned. In this application, the key doesn't change, so
Worker_manager.reduce
and the reduce workers only
calculate and return the new value (in this case, count), instead of returning
the key (in this case, word) and count.
main
method of word_count.ml
,
which displays them using Util.print_reduce_results
.
Worker_server
receives a connection and spawns a thread, which
calls Worker.handle_request
to handle it.
Worker.handle_request
determines the request type. If it is an
initialization request, then the new mapper or reducer is built using
Program.build
, which returns either the new worker id or
the compilation error. See worker_server/program.ml for more complete
documentation. If it is a map or reduce request,
the worker id is verified as referencing a valid mapper or reducer, respectively.
If the id is valid, then Program.run
is called with that id and
the provided input. This runs the relevant worker, which receives its input
by calling Program.get_input()
. Once the worker terminates, having
set its output using Program.set_output
, these results are returned
by Program.run
. If the request is invalid, then the appropriate
error message, as defined in shared/protocol.ml, is prepared.
Worker.send_response
is called, which is responsible for
sending the result back to the client. If the response is sent successfully,
then Worker.handle_request
simply recurses, otherwise it returns unit.
controller/
main.ml
This is the main module that is called to start up an application. It does not do anything except parse the command-line arguments to determine which application to start, then calls the main method of that application, passing it the command-line arguments.
map_reduce.ml
This module provides functionality for performing the actual map, combine, and reduce operations.
These methods can be called by the individual applications.
See the map_reduce.mli
for additional documentation.
The method map_reduce
in this module contains controller code
common to a number of applications that manipulate documents. It can be used with different
mappers and reducers to achieve different results. It loads and parses the documents
from the input file, then calls Map_reduce.map
to run the mappers.
Next, the list of (key, value)
pairs are combined using Map_reduce.combine
. The combined values are then reduced
using Map_reduce.reduce
. These results are passed back to the individual
application controllers for output.
worker_manager.ml
This module is the interface for communicating with workers.
It includes functions to initialize or kill mappers and reducers,
select inactive workers, assign a
map/reduce task to a worker, and return workers that have completed
their task to the pool of inactive workers.
Detailed descriptions of each function in the module are in
worker_manager.mli
.
If a request fails due to some sort of network or file descriptor error,
then the Worker_manager
will automatically retry a fixed number
of times. It is therefore acceptable for workers to intermittently output
errors as long as they are still able to service requests.
worker_server/
program.ml
This module provides the ability to build and run mappers/reducers. It
incudes the function get_input
and set_output
,
which mappers and reducers must use.
worker_server.ml
This module is responsible for initializing the server and
spawning threads to handle client connection
requests.
These threads simply invoke Worker.handle_request
.
worker.ml
This module is responsible for managing communication between the clients and the mappers/reducers that they build and then run. A more thorough description is contained in the your tasks section.
The protocol that the controller application and the workers use to communicate is stored in shared/protocol.ml.
InitMapper (code, shared_data)
Initialize a mapper. Each element in the code
list is a line in
mapper.ml, which will be executed sequentially.
Note that if you wish to open
a module, you must include
double-semicolons before accessing its contents.
For example, open Util;;
must appear in the code
list before a call to load_documents
.
shared_data
is accessible to a mapper, as
described above. The semicolons indicate the end of a statement
and force evaluation of the expression.
InitReducer (code)
Initialize a reducer. Same as above.
MapRequest (worker_id, key, value)
Execute mapper with id worker_id
with (key, value)
as input.
ReduceRequest (worker_id, key, value list)
Execute reducer with id worker_id
with
(key, value list)
as input.
Mapper (worker_id option, error)
Mapper (Some id, _)
indicates the requested mapper was successfully
built, and has the returned id. Mapper (None, error)
indicates
compilation of the mapper failed with the returned error.
Reducer (worker_id option, error)
Same as above, except for a reducer.
InvalidWorker (worker_id)
Either a map request was made, and the worker_id
does not correspond to a
mapper, or a reduce request was made, and the worker_id
does not correspond
to a reducer.
RuntimeError (worker_id, error)
Worker with id worker_id
output the runtime error(s) stored in
error
. Also used to indicate that the worker didn't return any
output, meaning Program.run
returned None
.
MapResults (worker_id, (key, value) list)
Map request sent to worker_id
resulted in output of (key,
value) list
.
ReduceResults (worker_id, value list)
Reduce request sent to worker_id
resulted in output of
value list
.
Only string data can be communicated between agents.
Values can be converted to and from strings explicitly
using Util.marshal
and Util.unmarshal
,
which call the OCaml built-in Marshal.to_string
and
Marshal.from_string
functions, respectively.
You can send strings via communication channels without marshalling, but other
values need to be marshalled. Your mapper or reducer must also convert
the input it receives from strings back to the appropriate type it can operate on, and once
it has finished, it needs to convert its output into strings to communicate it back.
Note that marshalling is not type safe. OCaml cannot detect misuse of marshalled data during compilation. If you unmarshal a string and treat it as a value of any type other than the type it was marshalled as, your program will compile, run, and crash. You should therefore take particular care that the types match when marshalling/unmarshalling. Make sure that the type of marshalled input sent to your mapper matches the type that the mapper expects, and that the type of the marshalled results that the mapper sends back matches the type expected. The same is true for reducers using the reducer messages. Recall the syntax for type annotations:
let x : int = unmarshal my_data
Annotated variable declarations will help pinpoint unmarshalling errors.
All code you submit must adhere to the specifications defined in the respective .mli
files.
As always, do not change the .mli files.
You must implement functions in the following files:
shared/hashtable.ml
You are required to implement a hashtable according to the specifications
in shared/hashtable.mli. This data structure will be useful in your
MapReduce implementation.
Note: You may not use the OCaml Hashtbl library
functions in your hashtable implementation.
The only exception is that you may use Hashtbl.hash
as your hashing function.
You are encouraged but not required to use your hashtable in your implementation
of MapReduce. Assuming you decide to
use a hashtable in other sections of
this problem set,
you may use the Hashtbl
module instead of your own.
controller/map_reduce.ml
The code in this module is responsible for performing the actual map, combine, and reduce operations by initializing the mappers/reducers, sending them input that hasn't been mapped/reduced yet, and then returning the final list of results.
The map
and reduce
functions must be implemented
according to the specifications in controller/map_reduce.mli. The
functionality should mirror the above example execution description. Both
functions must make use of available workers simultaneously
and be able to handle worker failure.
However, you don't need to handle the case
when all workers fail when there is no coding error in your worker code
(i.e. complete network failure). Additionally, there should only be one active
request per worker at a time, and if a worker fails, it should not be
returned to the Worker_manager
using the push_worker
function.
Both map
and reduce
share the same basic structure.
First, the workers are
initialized through a call to the appropriate Worker_manager
function. Once the workers have been initialized, the input list should
be iterated over, sending each available element to a free worker, which can
be accessed using the pop_worker
function of
Worker_manager
. A mapper is invoked using
Worker_manager.map
providing the mapper's id and the (key,
value) pair. Each mapper (and therefore invocation of
Worker_manager.map
) receives an individual (key, value) pair, and
outputs a new (key, value) list, which is simply added onto the list of all
previous map results. A reducer is invoked similarly using
Worker_manager.reduce
. These functions block until a response
is received, so it is important that they are invoked using a thread from
the included Thread_pool
module. Additionally, this requires
that these spawned threads store their results in a shared data structure,
which must be accessed in a thread-safe manner.
Once all of the input has been iterated over, it is important that any input
that remains unfinished, either due to a slow or failed worker, is
re-submitted to available workers until all input has been processed.
Finally, close connections to the workers with a call to
Worker_manager.clean_up_workers
and return the results.
Note: It is acceptable to use Thread.delay
with a short
sleep period (0.1 seconds or less) when looping over unfinished data to send
to available workers in order to prevent a flooding of workers with unnecessary work.
The combine
function must be implemented according to specifications,
but does not need to make use of any workers. It should combine the provided
(key, value) pair list into a list of (key, value list) pairs in linear time
such that each key in
the provided list occurs exactly once in the returned list, and each value list for a given
key in the returned list contains all the values that key was mapped to in the provided list.
worker_server/worker.ml
The code in this module is responsible for handling communication between the clients that request work and the mapper/reducers that perform the work.
The handle_request
function must be implemented
according to the mli specifications and the above description, and must be
thread-safe. This function receives a client connection as input and must retrieve
the worker_request
from that connection.
If the request is for a mapper or reducer initialization, then the code must be
built using Program.build
, which returns (Some id, "")
where
id is the worker id if the compilation was successful,
or (None, error)
if the compilation was not successful. If the
build succeeds, the worker id should be stored in the appropriate mapper or
reducer set (depending on the initialization request), and the id should be
sent back to the client using send_response
. If the build fails,
then the error message should be sent back to the client using
send_response
.
If the request is to perform a map or reduce, then the worker id must
be verified to be of the correct type by looking it up in either the mapper
or reducer set before the mapper or reducer is invoked using
Program.run
. The return value is Some v
, where v
is the output of the program, or None
if the program failed to provide
any output or generated an error.
Note that the mapper and reducer sets are shared between all request handler threads
spawned by the Worker_server
, and therefore access to them must
be thread-safe.
Note: Code in shared/util.ml is accessible to all workers by default.
We have provided a Makefile
and a build script build.bat
that you can use to build your project.
port_number
for requests.
word_count
with datafile filename.
If you would like to define additional modules, please add them to
the appropriate build.bat file or Makefile
.
You have a great deal of freedom in this assignment. Enjoy it.
You will use the simplified MapReduce that you created in part 1 to
implement three applications: inverted_index
, page_rank
,
and nbody
.
Each application will require a mapper apps/xxx/mapper.ml
and reducer apps/xxx/reducer.ml
,
where xxx
is the name of the application, as well as a controller
apps/xxx/xxx.ml
. We have supplied a
full implementation of the word_count
application as
an example.
An inverted
index is a mapping from words to the documents in which they
appear. For example, if these were your documents:
Document 1:
OCaml map reduce
fold filter ocamlThe inverted index would look like this:
word | document |
---|---|
ocaml | 1 2 |
map | 1 |
reduce | 1 |
fold | 2 |
filter | 2 |
To implement this application, you should
take a dataset of documents (such as data/reuters.txt
) as input and use MapReduce to produce an inverted index.
This can be done in a way very similar to word_count
.
In particular, you can use Map_reduce.map_reduce
.
Print your final results using Util.print_reduce_results
.
Your mapper and reducer code should go in apps/inverted_index/mapper.ml and apps/inverted_index/reducer.ml, respectively, and your controller code should go in apps/inverted_index/inverted_index.ml.
PageRank is a link analysis algorithm used by Google to weight the importance of different websites on the Internet. You will be implementing a very simplified, iterative version of PageRank. To begin running simplified PageRank, we initialize each website with PageRank of 1/n, where n is the total number of websites. Each step of the iteration has two parts: For each website, we divide its PageRank by the number of links out of it, and send this amount to each website linked to. Then, for all websites, we sum up all values being sent into it, thus obtaining a new PageRank for each website. The number of iterations is supplied on the command line.
For more details on simplified PageRank, see Section 14.3 in Networks, Crowds, and Markets: Reasoning About a Highly Connected World by Cornell Professors David Easley and Jon Kleinberg. Note: There is an error in the example on page 408 of the networks book. On the second iteration, page A should have a PageRank of 5/16, not 3/16.
Your mapper and reducer code should go in apps/page_rank/mapper.ml and apps/page_rank/reducer.ml, respectively, and your controller code should go in apps/page_rank/page_rank.ml.
In order to simulate websites, we have defined a new type, website
.
Each line is of the format pageid@pagetitle@links. The field links is of the
form out1,out2,...,outm, where each outi is the pageid of a link out of
the page. The file data/websites.txt
contains the same example as
on pages 407 and 408 of the Networks book draft linked above.
After performing PageRank for the specified number of iterations, you should print the final PageRank values. We have provided you with a function Util.print_page_ranks, which takes in a list of tuples of pageids and PageRanks, and prints them to the screen.
Behavior is undefined if any two pages share the same key. Behavior is also undefined if any page has zero outgoing links. However, behavior is defined if a page has zero incoming links: it naturally ends up with zero PageRank. When printing the PageRanks for each page, you must explicitly list that page with a PageRank of zero.
No page will link to another page twice, nor link to a nonexistent page; the outgoing links list for each page will contain unique valid IDs.
An n-body simulation models the movement of objects in space due to the gravitational forces acting between them over time. Given a collection of n bodies possessing a mass, location, and velocity, we compute new positions and velocities for each body based on the gravitational forces acting on each. These vectors are then applied to the bodies for a small period of time and then the process repeats, creating a new vector. Tracking the positions of the bodies over time yields a series of frames which, shown in succession, model the bodies' movements across a plane.
The module shared/plane.ml
defines
representations for scalar values, two-dimensional points, vectors,
and common functions such as Euclidean distance.
Using Plane
, we
can define a type that represent the mass, position, and velocity of a
body:
type mass = Plane.scalar type location = Plane.point type velocity = Plane.vector type body = mass * location * velocityand a function
acceration
that calculates the acceleration of
one body on another:
val acceleration : body -> body -> Plane.vector
To understand how the acceleration
function works, we need to
review a few basic facts from physics. Recall that force is equal to
mass times acceleration (F = m × a) and the gravitational
force between objects with masses m and n separated by
distance d is given by (G × m × n) / d²
where G is the gravitational constant. Putting these two
equations together, and solving for a, we have that the
magnitude of the acceleration vector due to gravity for the object
with mass n is G × m / d². The direction of
the acceleration vector is the same as the direction of the unit
vector between the objects. Note that this calculation assumes that
the objects do not collide.
Plane.vector
in the sequence
returned by accelerations
.
This algorithm fits nicely into the MapReduce framework. Accelerations for each body can be computed and applied in parallel; map across the bodies to get the accelerations on each due to every other body, then apply each acceleration vector to get a new position and velocity for the body.
We have provided implementations of Nbody.main
and the IO helper Util.string_of_bodies
.
Your
task is to implement Nbody.make_transcript
which will run a simulation for a given number of iterations
and generate a textual representation of the bodies over time.
This output file can be opened using the supplied viewer
bouncy.jar
, which displays the simulation:
Recall the command to open jar files:
java -jar bouncy.jarSpecifically,
make_transcript
should take
a list of (string * body)
pairs, where the string
uniquely identifies the dynamic bodies, and an integer
steps
and update the bodies for steps
iterations using the acceleration function described above.
Create nbody/mapper.ml
and Nbody/reducer.ml
to modify the bodies at each step while maintaining the identifier
strings. You should document bodies' postions after each update using
Util.string_of_bodies
and return
a complete string once steps
updates have occurred.
We have provided sample bodies in shared/simulations.ml
.
Use these as models to write your own simulations, which you may
optionally
submit as Simulations.zardoz
. Particularly creative
submissions may recieve karma.
This portion of the assignment is based on materials developed by Dan Licata (Carnegie Mellon University) and Professor David Bindel.
Note that for the above applications, your MapReduce code will probably run a lot slower than a non-MapReduce implementation of the same algorithm. This slowdown is due to the fact that there is a lot of overhead and the simplified MapReduce framework is not very well optimized. Performance gains are only realized when doing this on a massive scale.
Simply zip up and submit your entire ps5 folder. Be sure to run make clean
or manually delete all compiled files (.cm*) and executables first.
You are required to use a source control system like Git or SVN. Submit the log file that describes your activity. We will provide you with an SVN repository, hosted by CSUG, or you may use a private repository from a provider like xp-dev or bitbucket.
For information on how to get started with your provided SVN account, read Using Subversion in the CSUGLab.
Note: Your repository name is cs3110_<netid(s)>
. For example,
cs3110_njl36
, or if you have a partner: cs3110_dck10_njl36
.
Notice that the netids of groups are in alphabetical order. This repository name is what
you put in place of project1
in the directions in the provided link.
If you use Windows, and are unfamiliar with the command line or Cygwin, there is a GUI based SVN client called Tortoise SVN that provides convenient context menu access to your repo.
For Debian/Ubuntu, a GUI-based solution to explore would be Rapid SVN. To install:
apt-get install rapidsvn
Mac users looking into a front-end can try SC Plugin, which claims to provide similar functionality to Tortoise. Rapid SVN might work as well.
There is also a plug-in for Eclipse called Subclipse that integrates everything for you nicely.
Note that all of these options are simply graphical mimics of the extremely powerful terminal commands, and are by no means necessary to successfully utilize version control.
We will be holding 15-minute design review meetings. Your group is expected to meet with one of the course staff during their office hours to discuss this assignment. A sign-up schedule is available on CMS.
Be prepared to give a short presentation on how you and your partner plan to implement MapReduce, including how you will divide the work, and bring any questions you may have concerning the assignment. Staff members will ask questions to gauge your understanding of varying aspects of the assignment, ranging from the high-level outline to specific design pitfalls. This is not a quiz; you are not expected to be familiar with the intricacies of the project during your design review. Rather, you are expected to have outlined your design and prepared thoughtful questions. Your design review should focus on your approach to Part 1, but you may spend a few minutes at the end on Part 2.
Design reviews are for your benefit. Meet with your partner prior to the review and spend time outlining and implementing MapReduce. This is a large and difficult assignment. The review is an opportunity to ensure your group understands the assignment and has a feasible design early on.