Applications of Parallel Computers
MPI Programming
Prof David Bindel
Please click the play button below.
Welcome to another exciting slide deck for CS 5220! The topic
for today is MPI programming.
OK, let's set some expectations here. Everyone here supposedly
knows how to program, and has at least some familiarity with a C
family language. So you all know already that you aren't going
to learn to be a proficient MPI programmer from one slide deck,
or even two. You learn by diving into an actual problem or two with
a book at your side, or maybe with a reference web page open.
The point of this lecture is to give you a conceptual road map
so that you know what you are looking for when you flip through
your (online) book or web page.
We'll give you some MPI coding problems to swear at, too, but
that's not really part of the slide deck.
And, with that prelude out of the way, let's get started.
Message passing programming
Basic operations:
Pairwise messaging: send/receive
Collective messaging: broadcast, scatter/gather
Collective computation: parallel prefix (sum, max)
Barriers (no need for locks!)
Environmental inquiries (who am I? do I have mail?)
(Much of what follows is adapted from Bill Gropp.)
The Message Passing Interface (or MPI) is a big interface with a
number of different types of operations. Today, we'll talk
about five main ones. First, there's pairwise messaging:
point-to-point data sends and receives. Then there's collective
messaging operations that involve several senders and receivers
simultaneously. There is also some support for simple types of
collective computation operations, like computing a sum with
contributions from all processors (called a reduction operation)
or computing a list of partial sums (called a scan or parallel
prefix operation). Then there are synchronization mechanisms
like barriers, though their role in MPI programming is a little
subtler than in shared memory programming - and if you have
taken a course where you've seen synchronization with locks,
you'll be happy to know that there is no need for locks here.
And, last but not least, there are environmental inquiries that
tell us things about the run-time setup, like how many
processors are involved in a given MPI run, or the process
number (or rank, in MPI lingo) of a given processor.
This slide deck has evolved some over time, but it was
originally modeled pretty closely on a similar presentation
given by Bill Gropp of UIUC.
MPI
Message Passing Interface
An interface spec — many implementations
(OpenMPI, MPICH, MVAPICH, Intel, ...)
Single Program Multiple Data (SPMD) style
Bindings to C, C++, Fortran
MPI is short for Message Passing Interface. The "interface"
part is important. There are many different implementations out
there, with OpenMPI, MPICH, and the Intel MPI being maybe the
most popular. These different implementations are not
guaranteed to do things the same way behind the scenes, nor to
inter-operate with each other. They just implement the same
programming interface.
There are several versions of the MPI programming interface,
including in C, C++, and Fortran. Other languages (like Python)
often have unofficial MPI interfaces, too. We can usually use
MPI across language boundaries, even if we can't mix MPI
implementations. For example, it's completely legitimate for a
C routine to send MPI messages to a Fortran routine, though it
certainly doesn't happen all that often, partly because of the
nature of MPI codes. MPI programs are SPMD: Single Program,
Multiple Data. It's possible that different MPI ranks can do
very different things during program execution, but they all
start from the same main routine.
MPI
Version 1.0 in 1994, 2.2 in 2009, 3.0 in 2012
MPI 3 goodies:
Nonblocking collectives
Neighborhood collectives
RMA and one-sided comm
Will stick to MPI-2 today
MPI has been around for a long time. Version 1.0 is over a
quarter century old, though MPI-3 is much more recent than that -
less than a decade. We'll mostly talk about MPI-2 constructs
today, but MPI-3 provides some nice extensions, including
non-blocking collective operations, neighborhood collectives,
and support for one-sided communications.
If none of that last sentence made sense, don't worry. It
should after this week.
Hello world
#include <mpi.h>
#include <stdio.h>
int main(int argc, char** argv) {
int rank, size;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
printf("Hello from %d of %d\n", rank, size);
MPI_Finalize();
return 0;
}
Let's start by walking through a "hello world" example in MPI.
This example, or something very similar, is in the
demos/graphite-hello directory.
We start by including the MPI header file at the very beginning.
The MPI header is where we have the declarations for the MPI
functions. This code uses only four of those functions, but they
are calls that almost every MPI program makes.
The first call is
to MPI_Init, and this call always has to come first. Notice that
we pass in pointers to the argc and argv inputs to main, which are
used to pass in command-line parameters. That's because some MPI
implementations can use argc and argv to pass in data about the
MPI setup when the program gets started. The MPI_Init is supposed
to take any of that extra stuff out, so you should ideally call it
before you do any argument processing (for example).
The last call is to MPI_Finalize. This always has to come at the
end of your MPI programs, after you've finished any communication.
The two calls in between are not required in the same way that you
require the MPI_Init and MPI_Finalize calls, but they show up in
most MPI codes nonetheless. MPI indexes processes by "ranks,"
and so MPI_Comm_rank is used to get the rank of the current
process (zero-indexed) and MPI_Comm_size gets the total number of
processes.
The first argument to both of these functions, MPI_COMM_WORLD, is
the name of the default communicator. A communicator in MPI is a
collection of processes that can communicate with each other for a
different context. It's not uncommon for small MPI codes to just
use MPI_COMM_WORLD for communication, and we'll use that as our
default communicator argument today.
Building, queueing, running
Several steps to actually run
mpicc -o foo.x foo.c # Compile the program
sbatch foo.sub # Submit to queue (SLURM)
# mpirun -n 2 ./foo.x # (in foo.sub) Run on 2 procs
This is all platform-specific.
Writing the MPI program is only the first step. Once we have
the code, we have to compile it and run it!
We usually compile with mpicc or mpif90 or some similarly-named
command. This builds on your usual GCC or CLang or Intel
compiler behind the scenes, but does the work for you of
figuring out where the MPI support libraries and header files
are.
Once we have compiled the code, we use a command like mpirun or
mpiexec - depends on the implementation - to actually launch the
MPI job on some number of nodes. The mpirun interface gets a
bunch of information both from command line variables (e.g. we
use -n 2 in the example above to say that we're running on two
processors) and from shell environment variables and from
files.
On my laptop, I would just use mpirun directly. But if I want
to run on a cluster, like the Graphite cluster or the SDSC Comet
system that we will use in the class, I would typically submit a
runner job to a batch queue system. The batch queue system is
responsible for sharing the resources in a big cluster or
supercomputer: the idea is that rather than always running tasks
interactively, you write a script that says what you want to do
and what resources you will need, and a scheduler figures out
when the system will be able to give you those resources, and
runs the code for you asynchronously. We've used the syntax
here for the SLURM batch system, but there are others, like PBS
and SGE. They all use different commands and work in slightly
different ways, but they all have the same basic functionality:
you tell the scheduler the resources, and it finds time on those
resources for you so that you can run your job. The scheduler
will also frequently set up the environment variables and files
that the MPI implementation needs in order to know where it
should run things for you. So there's a lot that goes on behind
the scenes here.
Communicators
Processes form groups
Messages sent in contexts
Separate communication for libraries
Group + context = communicator
Identify process by rank in group
Default is MPI_COMM_WORLD
As we said two slides ago, messages in MPI are sent by
communicators. A communicator is a group of processes and a
messaging context - for example, the context of a library or
framework that we would like to not interfere with
MPI_COMM_WORLD. As we said before, MPI_COMM_WORLD is the name
of the default communicator.
Sending and receiving
Need to specify:
What’s the data?
Different machines use different encodings (e.g. endian-ness)
$\implies$ “bag o’ bytes” model is inadequate
How do we identify processes?
How does receiver identify messages?
What does it mean to “complete” a send/recv?
We use communicators to communicate, naturally enough, but what
does communication in MPI look like? We need to specify at
least three things: who is sending, who is receiving, and what
is in the message. There is actually some subtlety to each of
these, starting with the content of the messages. For example,
suppose that we wanted to send an integer across the wire.
Different systems can encode integers in different ways: for
example, most modern Intel machines are "little-endian," meaning
that the least significant bytes of an integer appear first in
memory; but there used to be common machines that were
"big-endian," meaning that the most significant bytes appear
first. So if we want to send messages from a big-endian machine
to a little-endian machine, we'd better re-order the bytes in
our integers first. That means that we need to know not only a
region in memory where the message resides, but also a bit about
the meaning of the bytes in that region.
By the way, the terms "big-endian" and "little-endian" come from
Gulliver's Travels. These were the names of the opposing
parties in Lilliput. The Big-Endians broke their eggs at the
big end; the Little-Endians, at the other end. Yes, the whole
thing is obscure and a little bit silly - but that describes a
lot of CS, doesn't it?
MPI datatypes
Message is (address, count, datatype). Allow:
Basic types (MPI_INT , MPI_DOUBLE )
Contiguous arrays
Strided arrays
Indexed arrays
Arbitrary structures
Complex data types may hurt performance?
So, MPI messages are made up of different data types. Most
messages involve basic data types like ints or doubles, or arrays
of those basic data types. But MPI also provides support for more
complicated data types. It's often worth keeping things simple if
you can, though. MPI is big, and implementors tend to spend the
most time tuning the parts of the standard that get the most use.
And complicated MPI data types don't necessarily get used all that
often.
MPI tags
Use an integer tag to label messages
Help distinguish different message types
Can screen messages with wrong tag
MPI_ANY_TAG is a wildcard
Along with the data, MPI messages come with an integer tag that
identifies the type or sequence number of a message. Tags
aren't needed to get the basic functionality in MPI, but they
are really convenient for helping debug when things go wrong,
or when messages get re-ordered in some way. Message receivers
can indicate that they are expecting a particular tag, and
screen out messages that don't match; or they can specify
MPI_ANY_TAG to say that they will take any message.
MPI Send/Recv
Basic blocking point-to-point communication:
int
MPI_Send(void *buf, int count,
MPI_Datatype datatype,
int dest, int tag, MPI_Comm comm);
int
MPI_Recv(void *buf, int count,
MPI_Datatype datatype,
int source, int tag, MPI_Comm comm,
MPI_Status *status);
All right. Let's introduce the next pair of MPI commands:
blocking send and receive.
MPI_Send sends a message, blocking until the message is "in the
system" - whatever that might mean. We have to tell the routine
where the message is in memory, how many items are in the message,
the type of those items, the destination rank, the message tag,
and the communicator.
MPI_Recv receives a message. The arguments look a lot like those
for the send, but there are a couple differences to know about.
First, we don't need to know the exact size of the data to be
received in advance; the receive count just tells us the size of the
buffer we are providing. Second, the source and tag can be either
a processor rank and tag that we are supposed to match, or they
can be wildcard values. Finally, we have an additional output
argument, called status, appearing at the end of the argument
list. We use this to communicate additional information about
what we received.
MPI send/recv semantics
Send returns when data gets to system
... might not yet arrive at destination!
Recv ignores messages that mismatch source/tag
MPI_ANY_SOURCE and MPI_ANY_TAG wildcards
Recv status contains more info (tag, source, size)
OK. So the semantics are: we send data, and the function
returns when the data is on its way, whether or not it has
actually arrived. The receiver then picks up the data,
screening on the source and tag parameters, unless the wildcards
MPI_ANY_SOURCE and MPI_ANY_TAG are provided. And the receive
function also writes to a status object, telling us the actual
tag, sources, and message size of the thing we received.
Ping-pong pseudocode
Process 0:
for i = 1:ntrials
send b bytes to 1
recv b bytes from 1
end
Process 1:
for i = 1:ntrials
recv b bytes from 0
send b bytes to 0
end
With send and receive, we now have enough information to write a
ping-pong test. Recall that we discussed this in the last slide
deck; the idea is that we send a message back and forth between
process 0 and process 1, varying the message size, and we do some timing.
Ping-pong MPI
void ping(char* buf, int n, int ntrials, int p)
{
for (int i = 0; i < ntrials; ++i) {
MPI_Send(buf, n, MPI_CHAR, p, 0,
MPI_COMM_WORLD);
MPI_Recv(buf, n, MPI_CHAR, p, 0,
MPI_COMM_WORLD, NULL);
}
}
(Pong is similar)
Here is the ping code in MPI. Pong looks similar. Note that MPI_CHAR
is a single byte, so we are sending b-byte messages back and
forth. Passing a NULL pointer to the last argument of receive
says that we don't actually care about the results that would
normally be passed back in the status object.
Ping-pong MPI
for (int sz = 1; sz <= MAX_SZ; sz += 1000) {
if (rank == 0) {
double t1 = MPI_Wtime();
ping(buf, sz, NTRIALS, 1);
double t2 = MPI_Wtime();
printf("%d %g\n", sz, t2-t1);
} else if (rank == 1) {
pong(buf, sz, NTRIALS, 0);
}
}
At the start of the ping-pong test, both processors enter the same
main routine. We starting pinging at rank 0, and ponging at rank
1. We'll measure the times at rank 0 using the MPI_Wtime routine,
which gives the current wall clock time measured in seconds.
Blocking and buffering
Block until data “in system” — maybe in a buffer?
I've said a couple times now that MPI_Send blocks until data is
in the system. But what does that mean? Often, it means that
the data has been copied from the user-space memory provided in
the Send call into a buffer maintained by the operating system.
Blocking and buffering
Alternative: don’t copy, block until done.
An alternative is to do a "zero-copy" send, where the network
card directly accesses the user buffer. That saves some time on
the copy, though it means that we are not going to allow the MPI
program to do anything while the message is in transit.
Problem 1: Potential deadlock
Both processors wait to send before they receive!
May not happen if lots of buffering on both sides.
We might run into a problem in systems that use the zero-copy
approach, or use buffered sends but don't have ample buffer
space. If two processors both want to send to each other and
are blocked on the send, they can never reach the subsequent
receive! This is a deadlock scenario.
Solution 1: Alternating order
Could alternate who sends and who receives.
One way around the deadlock is to alternate who sends and who
receives. This is what happens in our ping-pong tester, for
example. Of course, this gets trickier when we are building
more complicated exchange patterns out of sends and receives.
Solution 2: Combined send/recv
Common operations deserve explicit support!
Fortunately, there are a lot of situations where we want to send
a message and receive a message at about the same time, so MPI
has a primitive for it: MPI_Sendrecv.
Combined sendrecv
MPI_Sendrecv(sendbuf, sendcount, sendtype,
dest, sendtag,
recvbuf, recvcount, recvtype,
source, recvtag,
comm, status);
Blocking operation, combines send and recv to avoid deadlock.
MPI_Sendrecv takes all the arguments of a send, then all of the
arguments of a receive, and it does the two operations in one big
lump. It will manage things in order to make sure there are no
deadlocks for scenarios like the crossing sends that we discussed earlier.
Problem 2: Communication overhead
Partial solution: nonblocking communication
Of course, even if we avoid deadlock, we might not be happy with
the cost of blocking operations like MPI_Send and MPI_Recv just
because we may leave the processor idle. But we can overlap
communication and computation a little better with the so-called
nonblocking operations.
Blocking vs non-blocking
MPI_Send and MPI_Recv are blocking
Send does not return until data is in system
Recv does not return until data is ready
Cons: possible deadlock, time wasted waiting
Why blocking?
Overwrite buffer during send $\implies$ evil!
Read buffer before data ready $\implies$ evil!
OK, we just talked about the problems with blocking sends and
receives. We might deadlock if we sequence our operations in an
ill-advised way, and we might waste time while we wait for the
operations to complete. So with these problems, why block? The
answer is that we can compromise correctness if we try to read
or write to buffers before the send and receive complete. The
blocking semantics are there to protect us.
Blocking vs non-blocking
Alternative: nonblocking communication
Split into distinct initiation/completion phases
Initiate send/recv and promise not to touch buffer
Check later for operation completion
The alternative to blocking communication is non-blocking
communication. In these primitives, we split starting a send or
receive and finishing the operation into distinct phases. In
between the start of a send and the end of a send, we are
allowed to do any work we want - as long as we promise not to
touch the send buffer. The situation with receives is similar.
The key to correctness here is that you follow your promise not
to touch the buffers while messages are in progress. If you
break your promise, you get undefined behavior.
Overlap communication and computation
Here's the behavior in pictures. Two processors want to
exchange data, so we start a nonblocking send-receive
operation. Then we do some work that doesn't depend on the
message, and only block on exchange completion after we've done
that work. If we have enough work, we can completely mask the
communication cost.
Nonblocking operations
Initiate message:
MPI_Isend(start, count, datatype, dest
tag, comm, request);
MPI_Irecv(start, count, datatype, dest
tag, comm, request);
Wait for message completion:
MPI_Wait(request, status);
Test for message completion:
MPI_Test(request, status);
The relevant MPI calls to start a non-blocking send or receive are
MPI_Isend and MPI_Irecv. Each of these calls has an output
argument called "request" that creates a handle that we can use
later to refer to the operation. The MPI_Wait and MPI_Test
objects take the request object as an input; MPI_Wait blocks
until message completion, and MPI_Test returns whether the message
is done. These calls have an output argument called status
that tells us the status of the message - this is the same type of
status object that we saw in the call to MPI_Recv.
Multiple outstanding requests
Sometimes useful to have multiple outstanding messages:
MPI_Waitall(count, requests, statuses);
MPI_Waitany(count, requests, index, status);
MPI_Waitsome(count, requests, indices, statuses);
Multiple versions of test as well.
Sometimes we have more than one non-blocking send or receive in
progress, and we want to wait on some or all of them. There are
versions of MPI_Test that look at multiple pending requests,
too.
Other send/recv variants
Other variants of MPI_Send
MPI_Ssend (synchronous) – do not complete until receive has begun
MPI_Bsend (buffered) – user provides buffer (via MPI_Buffer_attach )
MPI_Rsend (ready) – user guarantees receive has already been posted
Can combine modes (e.g. MPI_Issend )
MPI_Recv receives anything.
There are other variants of the MPI_Send, too. The synchronous
version makes sure that the message is actually received before
proceeding; the buffered version provides an adequate user-space
buffer; and the ready version guarantees that the receiver has
already posted a receive operation before we begin. We can
combine modes, too. There are a lot of combinations.
Receive is simpler: we basically just use the blocking version
or the nonblocking version. No ready, buffered, or synchronous
modes on this end.
Another approach
Send/recv is one-to-one communication
An alternative is one-to-many (and vice-versa):
Broadcast to distribute data from one process
Reduce to combine data from all processors
Operations are called by all processes in communicator
Point-to-point message exchanges are only one way to organize
communications in a message-passing code. We can also use
all-to-one and one-to-all patterns, or even all-to-all
patterns. Examples here include broadcast, reduction, and
scatter-gather, among others. Let's go through a few instances now.
Broadcast and reduce
MPI_Bcast(buffer, count, datatype,
root, comm);
MPI_Reduce(sendbuf, recvbuf, count, datatype,
op, root, comm);
buffer is copied from root to others
recvbuf receives result only at root
op $\in \{$ MPI_MAX , MPI_SUM , …$\}$
Two of the most fundamental collective operations are broadcasting
and reduction. MPI_Bcast sends a message from a root node that is
received at every other node in the communicator. The arguments
look much like the arguments to MPI_Send and MPI_Recv, except for
the presence of the root rank argument and the absence of a status
object.
MPI_Reduce goes the other way: the root node receives
contributions from every processor, and combines them via some
associative operation like max or sum. Note that we have to
specify a buffer for data to be sent and another buffer for the
result to be retrieved, but the receive buffer is only used at the
root process.
Example: basic Monte Carlo
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
int main(int argc, char** argv) {
int nproc, myid, ntrials = atoi(argv[1]);
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &nproc);
MPI_Comm_rank(MPI_COMM_WORLD, &my_id);
MPI_Bcast(&ntrials, 1, MPI_INT,
0, MPI_COMM_WORLD);
run_mc(myid, nproc, ntrials);
MPI_Finalize();
return 0;
}
Let's give an example of using broadcast and reduce in a Monte
Carlo computation. We start by deciding how many trials each
processor should compute; in this case, we just get it from the
command line, but maybe we would do something more elaborate.
From rank 0, we are going to broadcast the number of trials per
processor to all the other ranks; then we will actually run the
Monte Carlo computation.
Example: basic Monte Carlo
Let sum[0] = $\sum_i X_i$ and sum[1] = $\sum_i X_i^2$.
void run_mc(int myid, int nproc, int ntrials) {
double sums[2] = {0,0};
double my_sums[2] = {0,0};
/* ... run ntrials local experiments ... */
MPI_Reduce(my_sums, sums, 2, MPI_DOUBLE,
MPI_SUM, 0, MPI_COMM_WORLD);
if (myid == 0) {
int N = nproc*ntrials;
double EX = sums[0]/N;
double EX2 = sums[1]/N;
printf("Mean: %g; err: %g\n",
EX, sqrt((EX*EX-EX2)/N));
}
}
The idea of Monte Carlo is that we'll run lots of independent
computational experiments, and then compute a sample mean and
variance over those experiments. To do this, we need to get the
sum of experiment outputs and the sum of the squares of the
experiment outputs. We'll keep the local contribution to these
sums in the my_sums array at each processor. Then, after we have
run the trials on each processor, we will reduce from these
my_sums array into a sums array that lives at rank 0. Finally, at
rank 0 we will use the overall sums to compute a sample mean and
standard deviation.
Yes, I know that this is a biased estimator for the variance,
since we used an N instead of an N-1. In Monte Carlo
computations, if you notice the difference between division by N
and division by N-1, you probably should have used more trials.
Collective operations
Involve all processes in communicator
Basic classes:
Synchronization (e.g. barrier)
Data movement (e.g. broadcast)
Computation (e.g. reduce)
Collective operations like broadcast and reduce have to be
called simultaneously from all ranks in a communicator, and
involve communications among all the processes. We have
synchronization collectives like barrier; communication
collectives like broadcast; and computation collectives like
reduce.
We aren't going to talk about most of these in any detail,
but we can illustrate the basic ideas in pictures.
Barrier
MPI_Barrier(comm);
Not much more to say. Not needed that often.
All right, maybe no picture for the MPI barrier. It's a
barrier; everyone blocks at it until all of the ranks have
reached it. This is not that useful with the communication
operations we've discussed so far; it's more relevant when we
are using one-sided communication operations.
Broadcast
Here is MPI_Bcast in pictures. We start with A only on the root
(rank 0); and after the broadcast, everyone has a copy.
Scatter/gather
Broadcast sends the same data item to everyone. Scatter and
gather send different data items between the root and every
other rank in the communicator. For example, if the root starts
with ABCD and scatters to the other processors, then at the end
we have A at rank 0, B at rank 1, C at rank 2, and D at rank 3.
Gather goes the other way, collecting contributions from all the
ranks and putting them together in an array at the root.
Allgather
Allgather is like a gather operation, but instead of just
gathering data to the root, every processor gets a copy.
Alltoall
And all-to-all is like a simultaneous scatter operation from every
rank at once. All-to-all is also sometimes called a transpose
operation, for reasons that I hope are clear from the diagram!
Reduce
Broadcast, scatter/gather, allgather, and all-to-all are all
pure communication operations. Reduce, as we've already
mentioned, does some computation as well. Here we do a sum
reduction, but we can also do max or min reductions - any
associative operation will work.
Scan
Another powerful collective computation operation is scan, or
parallel prefix. The idea is that each rank p will get a
partial sum (or partial result) from the data at ranks 0 through
p. This is fantastically useful for things like computing
shared global indices, for example.
The kitchen sink
In addition to above, have vector variants (v suffix), more All variants (Allreduce), Reduce_scatter, ...
MPI3 adds one-sided communication (put/get)
MPI is not a small library!
But a small number of calls goes a long way
Init/Finalize
Get_comm_rank, Get_comm_size
Send/Recv variants and Wait
Allreduce, Allgather, Bcast
We have just scratched the surface of what one can do with even
MPI-2. In addition to the basic operations sketched so far,
there are more All variants (like Allgather), vector variants
that work with a chunk of data at a time, combined operations
like reduce-scatter (a little like send-receive), and more.
It's not a small library!
Fortunately, we can go a long way with a few operations like
send/receive pairs and collectives like scatter, gather,
broadcast, and reduction.
If I am ambitious, there may be another slide deck on Thursday
about the MPI-3 operations. But the real next step for all of
you is to try some things out! I will provide some demo codes
for this purpose, and we'll talk through some of them in
meetings this week. Until then!