Introduction à MPI4Py

Table of Contents

Salut à toi ! Bienvenue dans la bidouille !

Utilise « C-c C-o » (control+c puis control+o) pour suivre les liens (les trucs soulignés en bleu ci-dessous).

Ce fichier est disponible à http://sed.bordeaux.inria.fr/la-bidouille

1 Présentation de MPI4py

1.1 Exécuter un code

Les programmes doivent être exécutés avec mpirun et l'interpreteur python.

mpirun -np nb_procs python my_script

1.2 Convention d'appels des fonctions

Le tutoriel documentation MPI4py explique une difference importante avec le C ou Fortran. Si on appelle des commandes avec des minuscules (comm.send par exemple), on travail avec des objets génériques et Python reconnaît les types mais du coup c'est plus lent. Dans la suite nous allons travailler avec des message MPI commençant par des majuscules(e.g. comm.Send) qui transfert des tableaux Numpy, la syntaxe sera de la forme

MPI_Send(sendbuf=[buffer,buff_size,type_de_donnee], ...)

type_de_donnee sera de la forme MPI.INT ou MPI.DOUBLE par exemple;

2 Fonctionnalités de base

2.1 Détermination du rang

from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
print 'Hello from process', rank + 1, 'of' , size

2.2 Communications bloquantes

On peut programmer un «ping-pong» entre 2 processus de la façon suivante

from mpi4py import MPI
import numpy as np
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

buffsize = int(sys.argv[1])
nb_ping = int(sys.argv[2])

buffer_in = np.zeros(buffsize,dtype='i')
buffer_out = np.ones(buffsize,dtype='i')

if (rank == 0):
    print('let us launch ', nb_ping , 'ping-pongs')
    deb = MPI.Wtime()
    for i in range(nb_ping):
       comm.Send([buffer_out, buffsize, MPI.INT], dest = 1, tag=1)
       print("0 send %d-th message to 1" % i)
       comm.Recv([buffer_in, buffsize, MPI.INT], source = 1, tag=1)
       print("0 receive %d-th message from 1" % i)
    fin = MPI.Wtime()
    print('That took %.5f seconds\n' % (fin-deb))

else:
    for i in range(nb_ping):
       comm.Recv([buffer_in, buffsize, MPI.INT], source = 0, tag=1)
       print("1 receive %d-th message from 0" % i)
       comm.Send([buffer_out, buffsize, MPI.INT], dest = 0, tag=1)
       print("1 send %d-th message to 0" % i)

Vous pouvez intervertir le Send et le Recv pour le processus 1. Que se passe-t-il ?

2.3 Communications non-bloquantes

Nous souhaitons écrire un programme impliquant 2 processus qui doivent s'échanger leur hostname avant de pouvoir effectuer un calcul en parallèle. Exécutez le programme suivant utilisant des communications bloquantes. Qu'observez-vous ?

from mpi4py import MPI
import numpy as np
import socket
import time

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

buffsize = 16

buffer_in = np.zeros(buffsize,dtype='c')
buffer_out = np.ones(buffsize,dtype='c')

if (rank == 0):
    buffer_in[:]=socket.gethostname()
    comm.Ssend([buffer_out, buffsize, MPI.CHAR], dest = 1, tag = 1)
    comm.Recv([buffer_in, buffsize, MPI.CHAR], source = 1, tag = 1)
    print 0, 'is sleeping'
    time.sleep(2)
else:

    buffer_in[:]=socket.gethostname()
    comm.Ssend([buffer_out, buffsize, MPI.CHAR], dest = 0, tag = 1)
    comm.Recv([buffer_in, buffsize, MPI.CHAR], source = 0, tag = 1)
    print 1, 'is sleeping'
    time.sleep(2)

vous pouvez permutez un des Recv et Ssend pour débloquer la communication ou tester le programme suivant avec des communications non-bloquantes comme suit

from mpi4py import MPI
import numpy as np
import socket
import time

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

buffsize = 16

requests = [MPI.Request() for i in [1,2]]
statuses = [MPI.Status() for i in [1,2]]
buffer_in = np.zeros(buffsize,dtype='c')
buffer_out = np.ones(buffsize,dtype='c')

if (rank == 0):
    buffer_in[:]=socket.gethostname()
    requests[0] = comm.Isend([buffer_out, buffsize, MPI.CHAR], dest = 1, tag = 1)
    requests[1] = comm.Irecv([buffer_in, buffsize, MPI.CHAR], source = 1, tag = 1)
    MPI.Request.Waitall(requests)
    print 0, 'is sleeping'
    time.sleep(2)
else:

    buffer_in[:]=socket.gethostname()
    requests[0] = comm.Isend([buffer_out, buffsize, MPI.CHAR], dest = 0, tag = 1)
    requests[1] = comm.Irecv([buffer_in, buffsize, MPI.CHAR], source = 0, tag = 1)
    MPI.Request.Waitall(requests)
    print 1, 'is sleeping'
    time.sleep(2)

L'autre solution pour eviter les interblocages est d'utiliser un appel conjoint à SendRecv

from mpi4py import MPI
import numpy as np
import socket
import time

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

buffsize = 16

buffer_in = np.zeros(buffsize,dtype='c')
buffer_out = np.ones(buffsize,dtype='c')

if (rank == 0):
    buffer_in[:]=socket.gethostname()
    comm.Sendrecv([buffer_out, buffsize, MPI.CHAR], dest = 1, sendtag = 1,  recvbuf= [buffer_in, buffsize, MPI.CHAR], source = 1, recvtag = 1)
    print 0, 'is sleeping'
    time.sleep(2)
else:

    buffer_in[:]=socket.gethostname()
    comm.Sendrecv([buffer_out, buffsize, MPI.CHAR], dest = 0, sendtag = 1,  recvbuf= [buffer_in, buffsize, MPI.CHAR], source = 0, recvtag = 1)
    print 1, 'is sleeping'
    time.sleep(2)

2.4 Communications collectives

2.4.1 Barrière

Dans le programme suivant, nous lançons des processus qui ont pour but de dormir N+1 secondes (N étant le rang de chacun) et de se synchroniser grâce à une barrière lorsqu'ils ont fini.

from mpi4py import MPI
import numpy as np
import socket
import time

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
time.sleep(rank)
print 'Process %d has finished to sleep\n' % rank
comm.Barrier()
print 'Process %d has left the barriere\n' % rank

2.4.2 Diffusion

Dans le programme suivant, le processus de rang 0 envoie son hostname à tous les processus.

from mpi4py import MPI
import numpy as np
import socket
import time

buffsize = 16
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
buffer_out= np.zeros(buffsize, dtype='c')
if (rank == 0):
    buffer_out[:]=socket.gethostname()
comm.Bcast([buffer_out, buffsize, MPI.CHAR])
print 'Process %d has received %s\n' %(rank,buffer_out)

2.4.3 Réduction

Dans le programme suivant, chaque processus effectue un calcul correspondant à la somme partielle de tous les élements compris entre a et b avec \(a = \mathrm{rank} \times 10\) et \(b = a + 9\). À l'issue des sommes partielles, on effectue une réduction pour obtenir une somme globale.

from mpi4py import MPI
import operator
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
deb = 10 * rank
fin = deb + 10
my_cnt = 0
print 'Process %d will add the numbers between %d and %d' % (rank, deb, fin - 1)
my_cnt = reduce(operator.add,xrange(deb,fin))
cnt=comm.reduce(my_cnt)
if (rank == 0):
    print 'Global count is %d' % cnt

2.5 Répartition/Agrégation

Dans le programme suivant, (écrit pour 4 processus), nous répartissons le contenu d'un tableau dans la mémoire des 4 processus. Chaque processus affiche ce qu'il a reçu, puis le tout est agrégé dans un nouveau tableau dans la mémoire du processus de rang 0.

from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
chaine = 'abcdefghijkl'
buffsize = chaine.__len__()
array=np.empty(buffsize, dtype='c')
array[:]=chaine
n=4
bloc_size=buffsize/n
recv1 = np.empty(bloc_size, dtype='c')
recv2 = np.empty(buffsize, dtype='c')
if (size == n):
  comm.Scatter([array, bloc_size, MPI.CHAR], [recv1, bloc_size, MPI.CHAR],0)
  print 'Process ', rank, 'has received', recv1[:bloc_size]
  comm.Gather([recv1, bloc_size, MPI.CHAR], [recv2, bloc_size, MPI.CHAR],0)
  if rank == 0:
    print 'Final gather', recv2
else:
  print 'must run with 4 processes'

2.5.1 Communication globale

Il est possible de faire une répartition/agrégation entre tous les processus. Cela revient à ``tranposer'' la matrice données vs processus. On fabrique un exemple \(4\times 4\) avec 4 processeurs.

from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
n=4
z=np.mgrid[0:n]
a=np.int32((n*(z[None,:])+z[:,None]+1).T)
recv=np.zeros(n,dtype=a.dtype)
if (size == n):
  print rank ,'sends ', a[rank,:]
  comm.Alltoall([a[rank,:], 1, MPI.INT],[recv,1,MPI.INT])
  print rank, 'recv = ', recv
else:
  if rank == 0:
    print 'must run with 4 processes'

2.6 Topologie virtuelle

Dans le programme suivant, nous créons une topologie cartésienne avec 16 processus, sous la forme d'une grille de \(4 \times 4\). Ensuite, chaque processus envoie à ses voisins son rang, affiche le rang de ses voisins et affiche les rangs qu'il a reçu (en principe, ceux de ces voisins)

from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
m = 4
O = 0
E = 1
N = 2
S = 3
if (size == m * m):
   comm2D = comm.Create_cart((m, m), periods=[True]*2, reorder= [False]*2)
   coords = comm2D.Get_coords(comm2D.rank)
   message = np.empty(1, dtype='i')
   voisin = np.empty(4, dtype='i')
   ibuf = np.empty((4,1), dtype='i')
   [voisin[N], voisin[S] ]=comm2D.Shift(0, 1)
   [voisin[O], voisin[E] ]=comm2D.Shift(1, 1)
   message[0] = rank
   requests = [MPI.Request() for i in xrange(8)]
   for k in xrange(4):
       requests[k] = comm2D.Isend([message, 1, MPI.INT], dest = voisin[k], tag = 1)
       # ibuf doit etre un buffer
       requests[k+4] = comm2D.Irecv([ibuf[k,:], 1, MPI.INT], source = voisin[k], tag = 1)
       comm2D.Irecv([ibuf[k,:], 1, MPI.INT], source = voisin[k], tag = 1)
   MPI.Request.Waitall(requests)
   print ' rank = ', rank , 'coords = ', coords.tolist(), 'neighbors = ', voisin
   print ' rank = ', rank , ' ibuf =', ibuf.T
else:
  print 'must be run with ', m * m, 'processors'

2.7 Types de données

2.7.1 Données non contiguës

Le passage de données contiguës (lignes de matrice) ne pose pas de probleme avec les buffers numpy. Par contre si on veut passer une colonne, ca ne marche plus. Pour cela nous allons utiliser un type sous-bloc de matrice avec l'appel Create_subarray

from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
n=4
a=np.empty((n,n),dtype='i')
first_col =  MPI.INT.Create_subarray(a.shape, [a.shape[0], 1], [0,0])
first_col.Commit()
if size == 2:
   if rank == 0:
      a[:, :] = np.ones((n,n),dtype = 'i')
      a[:, 0] = np.array(range(n))
      req = comm.Isend([a,1,first_col],dest = 1,tag = 1)
   else:
      a[:,:] = np.zeros((n,n), dtype='i')
      req = comm.Irecv([a,1,first_col],source = 0, tag =1)
   req.Wait()
   print 'rank = ', rank , 'a = ', a
else:
   print 'must run with two procs'

3 Exercices

3.1 Anneau de processus

Écrire un programme qui lit une donnée en entrée sur le processus 0 et qui l'envoie au suivant, et chaque processus suivant envoie la donnée a son successeur. On s'arrête lorsque lorsque le dernier processus est atteint

une solution

3.2 Équation de la chaleur

On va chercher à paralléliser le programme suivant, qui résout l'équation de la chaleur sur une grille réguliere

import optparse
import numpy as np

def chaleur(hx, hy,dt, u_in):
    """
    schema explicite

    u[i,j]^{n+1}-u[i,j]^{n} = dt/(h_x*h_y) * (u[i-1,j] + u[

    """
    diag_x = - 2.0 + hx*hx/(2.*dt)
    diag_y = - 2.0 + hy*hy/(2.*dt)
    w_x =  dt/(hx*hx)
    w_y =  dt/(hy*hy)
    u_out=u_in.copy()
    u_out[1:-1,1:-1]=(u_in[0:-2, 1:-1] + u_in[2:, 1:-1] + u_in[1:-1, 1:-1] * diag_x ) * w_x +\
                     (u_in[1:-1, 0:-2] + u_in[1:-1, 2:] + u_in[1:-1, 1:-1] * diag_y ) * w_y
    err=np.sum((u_out-u_in)**2)
    return u_out,err

def init_param(nx, ny):
    hx=1./nx
    hy=1./ny
    u=np.zeros((nx+2,ny+2),dtype='float')
    u[0,:]=1.
    u[-1,:]=1.
    u[:,0]=1.
    u[:,-1]=1.
    dt= min(hx**2/4,hy**2/4)
    return hx, hy, dt, u

if __name__ == '__main__':

    parser = optparse.OptionParser()

    parser.add_option("--nx", type="int", dest="nx", default=10,
                        help="Number of global points in x-direction [default 10]")
    parser.add_option("--ny", type="int", dest="ny", default=10,
                       help="Number of global points in y-direction [default 10]")
    parser.add_option("--it", type="int", dest="iter_max", default=1000,
                       help="maximal number of iterations")
    options,args = parser.parse_args()
    # initialisation des
    hx, hy, dt, u = init_param(options.nx, options.ny)
    it_max=options.iter_max
    prec=1e-4
    err=1e10
    ##boucle temporelle
    for k in range(it_max):
          [u, err] = chaleur(hx,hy,dt,u)
          err=np.sqrt(err)
          print 'k = %d, t = %.3e, err = %.3e' % (k,k*dt,err)
          if (err<= prec): break
    #from scipy.io import savemat
    #savemat('solution_seq',{'sol_seq':u}, oned_as='col')
    n va chercher à paralléliser le programme suivant, qui résout l'équation
    de la chaleur sur une grille réguliere

vous pouvez voir une solution possible (non-optimale)

Date: 21 Décembre 2015

Author: Marc Fuentes

Created: 2017-12-21 og. 12:55

Validate