Collectives

Collective communication #

Point-to-point messages are sufficient to write all the parallel algorithms we might want. However, they might not necessarily be the most efficient.

As motivation, let’s think about the time we would expect a reduction (combination of values) to take if we send messages in a 1-D ring of processes.

Recall from the ping-pong exercise that our model for the length of time it takes to send a message with $B$ bytes is

$$ T(B) = \alpha + \beta B $$

To rotate each item all the way around a ring of $P$ processes requires each process to send $P-1$ messages. Again supposing we reduce $B$ bytes:

$$ T_\text{ring}(B) = (P-1)(\alpha + \beta B) $$

So as we add more processes, the time scales linearly.

An alternative is to combine the partial reductions pairwise in a tree, as shown in the figure below

Tree reduction across 16 processes. At each level processes combine pairwise.

Tree reduction across 16 processes. At each level processes combine pairwise.

This approach can be generalised to problems where the number of processes is not a power of two.

To get the result back to all processes, we can just send the combined final value back up the tree in the same manner. For $P$ processes, the tree has depth $\log_2 P$ (we divide the number of by two each time). So now we only send $2 \log_2 P$ messages (up and down), for a total time of

$$ T_{\text{tree}}(B) = 2\log_2 P (\alpha + \beta B) $$

As long as $2\log_2 P < (P-1)$ this takes less overall time. We can solve this numerically to find it should be preferable as long as $P > 6$.

import numpy
import scipy.optimize


# Do Newton to find P such that 2log_2 P - (P-1) = 0
# We know that P = 1 is a solution, so we deflate that away
def f(p):
    return 1/abs(p-1) * (2*numpy.log2(p) - (p - 1))


root = scipy.optimize.newton(f, 2)

print(root)
# => 6.319722355838366
Modelled time to compute a reduction using a 1D ring and a tree, as soon as $P &gt; 6$, the tree is faster

Modelled time to compute a reduction using a 1D ring and a tree, as soon as $P > 6$, the tree is faster

We conclude that some kind of tree-based reduction is superior in almost all circumstances.

This reduction is an example of a collective operation. Rather than sending messages pairwise, all processes in a communicator work together (or collectively) to compute some result.

What about implementation? #

Writing code for a tree-reduction in MPI by hand is actually not too hard, and you can have a go if you want.

However, writing it generically to handle all datatypes is slightly fiddly, and moreover, depending on the topology of the underlying network, other approaches might be more efficient. For example, hypercubes often offer a superior communication pattern.

Our conclusion then, is that we probably don’t want to implement this stuff ourselves.

Fortunately, MPI offers a broad range of builtin collective operations that cover most of the communication patterns we might want to use in a parallel program. It now falls to the implementor of the MPI library (i.e. not us) to come up with an efficient implementation.

This is perhaps a general pattern that we observe when writing MPI programs. It is best to see if our communication pattern fits with something the MPI library implements before rolling our own.

Types of collectives #

One thing to note with all of these collectives is that none of them require a tag. MPI therefore just matches collectives in the order that they are called. So we need to be careful that we call collectives in the same order on all processes.

All of the collective operations we introduce in this section are blocking: they only return once all processes in the communicator have called them, at which point the input and output buffers are safe to reuse.

MPI-3 (standardised in around 2014) introduce non-blocking versions of all the collectives (with names starting with MPI_I like the non-blocking point-to-point functions). These have the same semantics, but we get an MPI_Request object that we must wait on before we can look at the results.

The rationale for the introduction of non-blocking collectives was similar to that for non-blocking point-to-point: they allow some overlap of communication latency with computation, potentially enabling more scalable code.

Barrier: MPI_Barrier #

We already saw a barrier in OpenMP, MPI similarly has one, named MPI_Barrier.

comm.Barrier()

In a barrier, no process may exit the barrier until all have entered. So we can use this to provide a synchronisation point in a program. Note that there are no guarantees that all processes leave at the same time.

There are actually very few reasons that a correct MPI code needs a barrier.

The usual reason to add them is if we want to measure the parallel performance of some part of the code, and don’t want bad load balance from another part to pollute our measurements.

Reductions #

These are useful, there are few related ones.

MPI_Reduce combines messages with a provided operation and accumulates the final result on a specified root rank.

comm.Reduce([sendbuf, count, datatype],
            [recvbuf, count, datatype],
            op, root)

Every process provides a value in sendbuf, the root process (often rank 0, but not necessarily) provides an output buffer in recvbuf. The count and datatype arguments are the same as for point-to-point messaging, they describe how the send and receive buffers are to be interpreted by MPI. The op argument tells MPI how to combine values from different processes.

MPI_Reduce combines values with a specified operation and places the result on the root process.

MPI_Reduce combines values with a specified operation and places the result on the root process.

Closely related is MPI_Allreduce which does the same thing as MPI_Reduce but puts the final result on all processes (so there is no need for a root argument.

comm.Allreduce([sendbuf, count, datatype],
               [recvbuf, count, datatype],
               op)

We can use the magic value MPI.IN_PLACE in place of the specification of the send buffer to state that the initial value in the receive buffer should be sent.

recvbuf[:] = stuff_to_send
comm.Allreduce(MPI.IN_PLACE, [recvbuf, count, datatype], op)
# Is equivalent to
comm.Allreduce([stuff_to_send, count, datatype],
               [recvbuf, count, datatype], op)
MPI_Allreduce combines values with a specified operation and places the result on all processes.

MPI_Allreduce combines values with a specified operation and places the result on all processes.

MPI_Allreduce is generally more useful than MPI_Reduce, since often we need to make a collective decision based on the result of the reduction (for example when checking for convergence of a numerical algorithm). Although it is possible to implement an allreduce using an MPI_Reduce followed by an MPI_Bcast, more efficient algorithms exist.

There are also “prefix reduction” versions, MPI_Scan and MPI_Exscan (for inclusive and exclusive “scans” of the input respectively).

comm.Scan([sendbuf, count, datatype],
          [recvbuf, count, datatype],
          op)
comm.Exscan([sendbuf, count, datatype],
            [recvbuf, count, datatype],
            op)
MPI_Scan computes an inclusive prefix reduction.

MPI_Scan computes an inclusive prefix reduction.

MPI_Exscan computes an exclusive prefix reduction, on rank 0 the value in the output buffer is undefined.

MPI_Exscan computes an exclusive prefix reduction, on rank 0 the value in the output buffer is undefined.

The final missing piece is what this magic op argument is. For combining simple types, we usually can use one of a number of builtin options given to us by MPI.

DescriptionOperatorIdentity element
AdditionMPI.SUM0
MultiplicationMPI.PROD1
MinimumMPI.MINMost positive number of given type
MaximumMPI.MAXMost negative number of given type
Logical andMPI.LANDTrue
Logical orMPI.LORFalse
Logical xorMPI.LXORFalse
Bitwise andMPI.BANDAll bits 1
Bitwise orMPI.BORAll bits 0
Bitwise xorMPI.BXORAll bits 0

If we pass a count of more than 1, then the operation is applied to each entry in turn. So

send = numpy.empty(2, dtype=numpy.int32)
recv = numpy.empty(2, dtype=numpy.int32)
send[:] = ...
comm.Allreduce([send, 2, MPI.INT32_t],
               [recv, 2, MPI.INT32_t], op=MPI.SUM)

Produces the sum over all processes of send[0] in recv[0] and that of send[1] in recv[1].

When computing a minimum or maximum value, we might also want to know on which process the value was found. This can be done using the combining operations MPI.MINLOC and MPI.MAXLOC. These also show an example of using more complicated datatypes.

Suppose we have a pair of values

struct ValAndLoc {
  double x;
  int loc;
};

We put the value we care about in the first slot, and the rank of the current process (say) in the second. Now combining with MPI_MAXLOC produces the MAX over the first slot, and just copies the second slot over.

# This is how you make a structured datatype in numpy
dtype = numpy.dtype([('x', numpy.float64), ('loc', numpy.int32)])
local = numpy.empty(1, dtype=dtype)
local[0]["x"] = ...;
local[0]["loc"] = rank;

global_ = numpy.empty_like(dtype)
comm.Allreduce([local, 1, MPI.DOUBLE_INT],
               [global_, 1, MPI.DOUBLE_INT],
               MPI.MAXLOC)

So we can determine the maximum value, and also the rank on which it was found.

One to all: scatters and broadcasts #

When starting up a simulation, we might want to read some configuration file to provide parameters. This typically should only be done by a single process1, however, all processes will need to know the values so that they can configure the simulation appropriately.

To do this we can broadcast data from a single rank using MPI_Bcast.

comm.Bcast([buffer, count, datatype], root)
MPI_Bcast sends data from a root rank to all ranks in the communicator.

MPI_Bcast sends data from a root rank to all ranks in the communicator.

Note that the value in the buffer for input purposes only matters on the root process, but all processes must provide enough space. For example, to send 10 integers from rank 0

buffer = numpy.empty(10, dtype=numpy.int32)
if comm.rank == 0:
   # Initialise values from somewhere (e.g. read from file)
   ...
comm.Bcast([buffer, 10, MPI.INT32_T], 0, comm);

A more general broadcast mechanism, where each rank receives different data, is provided by MPI_Scatter, which takes some data on a single rank, and splits it out to all processes.

comm.Scatter([sendbuf, sendcount, sendtype],
             [recvbuf, recvcount, recvtype],
             root)
MPI_Scatter splits and sends data on the root rank to all ranks in the communicator.

MPI_Scatter splits and sends data on the root rank to all ranks in the communicator.

Note that the sendcount argument is the number of values to send to each process (not the total number of values in the send buffer). On ranks other than the root rank, the send parameters are ignored (so the sendbuf argument can be None).

For example, consider again a situation where rank zero reads a configuration file and then determines some allocation of work (perhaps loop iteration counts) for each process. We need to communicate a pair of “start” and “end” values to each process.

rank = comm.rank
recvbuf = numpy.empty(2, dtype=numpy.int32)

if rank == 0:
    size = comm.size
    sendbuf = numpy.empty(2*size, dtype=numpy.int32)
    # Populate sendbuf somehow
    ...
    # Note how sendbuf is 2*size, but the count is 2, because we send
    # 2 entries to each process
    comm.Scatter([sendbuf, 2, MPI.INT32_T], [recvbuf, 2, MPI.INT32_T],
                 root=0)
else:
    comm.Scatter(None, [recvbuf, 2, MPI.INT32_T], root=0)

This sends entries 0 and 1 in sendbuf to rank 0; entries 2 and 3 to rank 1; entries 4 and 5 to rank 2; and so on.

There is also a more general MPI_Scatterv, in which we can specify how many values we send to each process.

All to one: gathers #

The dual to MPI_Scatter, in which we gather all values to a single process is (perhaps unsurprisingly) called MPI_Gather

comm.Gather([sendbuf, sendcount, sendtype],
            [recvbuf, recvcount, recvtype],
            root)
Note again that the recvcount argument is the number of entries to receive from each process. The buffer must be big enough to receive comm.size*recvcount total entries.
MPI_Gather gathers data from all ranks to a root rank.

MPI_Gather gathers data from all ranks to a root rank.

We might use this, for example, to collect output data before writing it to a file2.

Just like MPI_Scatter, there is a more general MPI_Gatherv in which we specify how many values to gather from each process.

All to all: everyone talks to everyone #

Generally, we would like to avoid parallelisation that requires that we have data whose size scales with the total number of processes on every rank, or where every process must communicate with every other, sometimes this is unavoidable.

For example, multi-dimensional fast Fourier transforms are among the fastest approaches for computing the inverse of the Laplacian in periodic domains. They form, amongst other things, a core computational component of numerical simulation of turbulent flow (see for example the codes at https://github.com/spectralDNS).

To compute Fourier transforms in parallel, we need to do one-dimensional transforms along each cartesian direction, interspersed by a global transpose of the data. Depending on the data distribution, this is a generalised “all to all” communication pattern. See mpi4py-fft for a high-performance Python library that does this.

MPI provides a number of routines that help with these including

comm.Allgather([sendbuf, sendcount, sendtype],
               [recvbuf, recvcount, recvtype])
comm.Alltoall([sendbuf, sendcount, sendtype],
               [recvbuf, recvcount, recvtype])

Note how neither of these calls has a root argument: there is no “special” process in these collectives.

The pictorial version of the routines is shown below

MPI_Allgather acts like MPI_Gather but no process is special.

MPI_Allgather acts like MPI_Gather but no process is special.

Note how the MPI_Alltoall call ends up transposing the data.

MPI_Alltoall sends data from all processes to all processes, transposing along the way.

MPI_Alltoall sends data from all processes to all processes, transposing along the way.

Summary #

MPI has a rich array of collective operations which can be used to implement common communication patterns much more efficiently than we are likely to write by hand using only point-to-point messaging.

Unlike point-to-point messaging, there are no tags in collective operations, so collectives are matched “in-order” on the communicator they are used with. We briefly look at how to distinguish collective operations, and how to restrict the operation to a subset of all the processes in MPI_COMM_WORLD when considering communicator manipulation.


  1. All processes simultaneously accessing the same file can be very slow on some systems. ↩︎

  2. In real-world applications you should use a more scalable, parallel, approach. For example, directly using MPI-IO (see Bill Gropp’s slides for a nice intro and motivation), or a higher-level alternative like HDF5. These additional libraries are beyond the scope of this course. ↩︎