Collective communication #
Point-to-point messages are sufficient to write all the parallel algorithms we might want. However, they might not necessarily be the most efficient.
As motivation, let’s think about the time we would expect a reduction (combination of values) to take if we send messages in a 1-D ring of processes.
Recall from the ping-pong exercise that our model for the length of time it takes to send a message with $B$ bytes is
$$ T(B) = \alpha + \beta B $$
To rotate each item all the way around a ring of $P$ processes requires each process to send $P-1$ messages. Again supposing we reduce $B$ bytes:
$$ T_\text{ring}(B) = (P-1)(\alpha + \beta B) $$
So as we add more processes, the time scales linearly.
An alternative is to combine the partial reductions pairwise in a tree, as shown in the figure below
This approach can be generalised to problems where the number of processes is not a power of two.
To get the result back to all processes, we can just send the combined final value back up the tree in the same manner. For $P$ processes, the tree has depth $\log_2 P$ (we divide the number of by two each time). So now we only send $2 \log_2 P$ messages (up and down), for a total time of
$$ T_{\text{tree}}(B) = 2\log_2 P (\alpha + \beta B) $$
As long as $2\log_2 P < (P-1)$ this takes less overall time. We can solve this numerically to find it should be preferable as long as $P > 6$.
import numpy
import scipy.optimize
# Do Newton to find P such that 2log_2 P - (P-1) = 0
# We know that P = 1 is a solution, so we deflate that away
def f(p):
return 1/abs(p-1) * (2*numpy.log2(p) - (p - 1))
root = scipy.optimize.newton(f, 2)
print(root)
# => 6.319722355838366
We conclude that some kind of tree-based reduction is superior in almost all circumstances.
This reduction is an example of a collective operation. Rather than sending messages pairwise, all processes in a communicator work together (or collectively) to compute some result.
What about implementation? #
Writing code for a tree-reduction in MPI by hand is actually not too hard, and you can have a go if you want.
However, writing it generically to handle all datatypes is slightly fiddly, and moreover, depending on the topology of the underlying network, other approaches might be more efficient. For example, hypercubes often offer a superior communication pattern.
Our conclusion then, is that we probably don’t want to implement this stuff ourselves.
Fortunately, MPI offers a broad range of builtin collective operations that cover most of the communication patterns we might want to use in a parallel program. It now falls to the implementor of the MPI library (i.e. not us) to come up with an efficient implementation.
This is perhaps a general pattern that we observe when writing MPI programs. It is best to see if our communication pattern fits with something the MPI library implements before rolling our own.
Types of collectives #
One thing to note with all of these collectives is that none of them require a tag. MPI therefore just matches collectives in the order that they are called. So we need to be careful that we call collectives in the same order on all processes.
All of the collective operations we introduce in this section are blocking: they only return once all processes in the communicator have called them, at which point the input and output buffers are safe to reuse.
MPI-3 (standardised in around 2014) introduce non-blocking versions of all the collectives (with names starting with
MPI_I
like the non-blocking point-to-point functions). These have the same semantics, but we get anMPI_Request
object that we must wait on before we can look at the results.The rationale for the introduction of non-blocking collectives was similar to that for non-blocking point-to-point: they allow some overlap of communication latency with computation, potentially enabling more scalable code.
Barrier: MPI_Barrier
#
We already saw a barrier in OpenMP, MPI similarly has one, named
MPI_Barrier
.
comm.Barrier()
In a barrier, no process may exit the barrier until all have entered. So we can use this to provide a synchronisation point in a program. Note that there are no guarantees that all processes leave at the same time.
There are actually very few reasons that a correct MPI code needs a barrier.
The usual reason to add them is if we want to measure the parallel performance of some part of the code, and don’t want bad load balance from another part to pollute our measurements.
Reductions #
These are useful, there are few related ones.
MPI_Reduce
combines
messages with a provided operation and accumulates the final result on
a specified root
rank.
comm.Reduce([sendbuf, count, datatype],
[recvbuf, count, datatype],
op, root)
Every process provides a value in sendbuf
, the root process (often
rank 0, but not necessarily) provides an output buffer in recvbuf
.
The count and datatype arguments are the same as for point-to-point
messaging, they describe how the send and receive buffers are to be
interpreted by MPI. The op
argument tells MPI how to combine values
from different processes.
Closely related is
MPI_Allreduce
which does the same thing as MPI_Reduce
but puts the final result on
all processes (so there is no need for a root
argument.
comm.Allreduce([sendbuf, count, datatype],
[recvbuf, count, datatype],
op)
We can use the magic value
MPI.IN_PLACE
in place of the specification of the send buffer to state that the initial value in the receive buffer should be sent.recvbuf[:] = stuff_to_send comm.Allreduce(MPI.IN_PLACE, [recvbuf, count, datatype], op) # Is equivalent to comm.Allreduce([stuff_to_send, count, datatype], [recvbuf, count, datatype], op)
MPI_Allreduce
is generally more useful than MPI_Reduce
, since
often we need to make a collective decision based on the result of the
reduction (for example when checking for convergence of a numerical
algorithm). Although it is possible to implement an allreduce using an
MPI_Reduce
followed by an
MPI_Bcast
, more
efficient algorithms exist.
There are also “prefix reduction” versions,
MPI_Scan
and
MPI_Exscan
(for
inclusive and exclusive “scans” of the input respectively).
comm.Scan([sendbuf, count, datatype],
[recvbuf, count, datatype],
op)
comm.Exscan([sendbuf, count, datatype],
[recvbuf, count, datatype],
op)
The final missing piece is what this magic op
argument is. For
combining simple types, we usually can use one of a number of builtin
options given to us by MPI.
Description | Operator | Identity element |
---|---|---|
Addition | MPI.SUM | 0 |
Multiplication | MPI.PROD | 1 |
Minimum | MPI.MIN | Most positive number of given type |
Maximum | MPI.MAX | Most negative number of given type |
Logical and | MPI.LAND | True |
Logical or | MPI.LOR | False |
Logical xor | MPI.LXOR | False |
Bitwise and | MPI.BAND | All bits 1 |
Bitwise or | MPI.BOR | All bits 0 |
Bitwise xor | MPI.BXOR | All bits 0 |
If we pass a count
of more than 1, then the operation is applied to
each entry in turn. So
send = numpy.empty(2, dtype=numpy.int32)
recv = numpy.empty(2, dtype=numpy.int32)
send[:] = ...
comm.Allreduce([send, 2, MPI.INT32_t],
[recv, 2, MPI.INT32_t], op=MPI.SUM)
Produces the sum over all processes of send[0]
in recv[0]
and that
of send[1]
in recv[1]
.
When computing a minimum or maximum value, we might also want to know
on which process the value was found. This can be done using the
combining operations
MPI.MINLOC
and
MPI.MAXLOC
. These
also show an example of using more complicated datatypes.
Suppose we have a pair of values
struct ValAndLoc {
double x;
int loc;
};
We put the value we care about in the first slot, and the rank of the
current process (say) in the second. Now combining with MPI_MAXLOC
produces the MAX
over the first slot, and just copies the second
slot over.
# This is how you make a structured datatype in numpy
dtype = numpy.dtype([('x', numpy.float64), ('loc', numpy.int32)])
local = numpy.empty(1, dtype=dtype)
local[0]["x"] = ...;
local[0]["loc"] = rank;
global_ = numpy.empty_like(dtype)
comm.Allreduce([local, 1, MPI.DOUBLE_INT],
[global_, 1, MPI.DOUBLE_INT],
MPI.MAXLOC)
So we can determine the maximum value, and also the rank on which it was found.
One to all: scatters and broadcasts #
When starting up a simulation, we might want to read some configuration file to provide parameters. This typically should only be done by a single process1, however, all processes will need to know the values so that they can configure the simulation appropriately.
To do this we can broadcast data from a single rank using
MPI_Bcast
.
comm.Bcast([buffer, count, datatype], root)
Note that the value in the buffer for input purposes only matters on the root process, but all processes must provide enough space. For example, to send 10 integers from rank 0
buffer = numpy.empty(10, dtype=numpy.int32)
if comm.rank == 0:
# Initialise values from somewhere (e.g. read from file)
...
comm.Bcast([buffer, 10, MPI.INT32_T], 0, comm);
A more general broadcast mechanism, where each rank receives different
data, is provided by
MPI_Scatter
, which
takes some data on a single rank, and splits it out to all processes.
comm.Scatter([sendbuf, sendcount, sendtype],
[recvbuf, recvcount, recvtype],
root)
Note that thesendcount
argument is the number of values to send to each process (not the total number of values in the send buffer). On ranks other than the root rank, the send parameters are ignored (so thesendbuf
argument can beNone
).
For example, consider again a situation where rank zero reads a configuration file and then determines some allocation of work (perhaps loop iteration counts) for each process. We need to communicate a pair of “start” and “end” values to each process.
rank = comm.rank
recvbuf = numpy.empty(2, dtype=numpy.int32)
if rank == 0:
size = comm.size
sendbuf = numpy.empty(2*size, dtype=numpy.int32)
# Populate sendbuf somehow
...
# Note how sendbuf is 2*size, but the count is 2, because we send
# 2 entries to each process
comm.Scatter([sendbuf, 2, MPI.INT32_T], [recvbuf, 2, MPI.INT32_T],
root=0)
else:
comm.Scatter(None, [recvbuf, 2, MPI.INT32_T], root=0)
This sends entries 0 and 1 in sendbuf to rank 0; entries 2 and 3 to rank 1; entries 4 and 5 to rank 2; and so on.
There is also a more general
MPI_Scatterv
,
in which we can specify how many values we send to each process.
All to one: gathers #
The dual to
MPI_Scatter
, in
which we gather all values to a single process is (perhaps
unsurprisingly) called
MPI_Gather
comm.Gather([sendbuf, sendcount, sendtype],
[recvbuf, recvcount, recvtype],
root)
Note again that therecvcount
argument is the number of entries to receive from each process. The buffer must be big enough to receivecomm.size*recvcount
total entries.
We might use this, for example, to collect output data before writing it to a file2.
Just like MPI_Scatter
, there is a more general
MPI_Gatherv
in
which we specify how many values to gather from each process.
All to all: everyone talks to everyone #
Generally, we would like to avoid parallelisation that requires that we have data whose size scales with the total number of processes on every rank, or where every process must communicate with every other, sometimes this is unavoidable.
For example, multi-dimensional fast Fourier transforms are among the fastest approaches for computing the inverse of the Laplacian in periodic domains. They form, amongst other things, a core computational component of numerical simulation of turbulent flow (see for example the codes at https://github.com/spectralDNS).
To compute Fourier transforms in parallel, we need to do one-dimensional transforms along each cartesian direction, interspersed by a global transpose of the data. Depending on the data distribution, this is a generalised “all to all” communication pattern. See mpi4py-fft for a high-performance Python library that does this.
MPI provides a number of routines that help with these including
MPI_Allgather
(and its “vector” siblingMPI_Allgatherv
).MPI_Alltoall
comm.Allgather([sendbuf, sendcount, sendtype],
[recvbuf, recvcount, recvtype])
comm.Alltoall([sendbuf, sendcount, sendtype],
[recvbuf, recvcount, recvtype])
Note how neither of these calls has a root
argument: there is no
“special” process in these collectives.
The pictorial version of the routines is shown below
Note how the MPI_Alltoall
call ends up transposing the data.
Summary #
MPI has a rich array of collective operations which can be used to implement common communication patterns much more efficiently than we are likely to write by hand using only point-to-point messaging.
Unlike point-to-point messaging, there are no tags in collective
operations, so collectives are matched “in-order” on the communicator
they are used with. We briefly look at how to distinguish collective
operations, and how to restrict the operation to a subset of all the
processes in MPI_COMM_WORLD
when considering communicator
manipulation.
All processes simultaneously accessing the same file can be very slow on some systems. ↩︎
In real-world applications you should use a more scalable, parallel, approach. For example, directly using MPI-IO (see Bill Gropp’s slides for a nice intro and motivation), or a higher-level alternative like HDF5. These additional libraries are beyond the scope of this course. ↩︎