Introduction à MPI4Py
Table of Contents
Salut à toi ! Bienvenue dans la bidouille !
Utilise « C-c C-o » (control+c puis control+o) pour suivre les liens (les trucs soulignés en bleu ci-dessous).
Ce fichier est disponible à http://sed.bordeaux.inria.fr/la-bidouille
1 Présentation de MPI4py
1.1 Exécuter un code
Les programmes doivent être exécutés avec mpirun
et l'interpreteur python.
mpirun -np nb_procs python my_script
1.2 Convention d'appels des fonctions
Le tutoriel documentation MPI4py explique une difference importante avec le C
ou Fortran
.
Si on appelle des commandes avec des minuscules (comm.send
par exemple), on travail avec des
objets génériques et Python reconnaît les types mais du coup c'est plus lent. Dans la suite
nous allons travailler avec des message MPI commençant par des majuscules(e.g. comm.Send
)
qui transfert des tableaux Numpy, la syntaxe sera de la forme
MPI_Send(sendbuf=[buffer,buff_size,type_de_donnee], ...)
où type_de_donnee
sera de la forme MPI.INT
ou MPI.DOUBLE
par exemple;
2 Fonctionnalités de base
2.1 Détermination du rang
from mpi4py import MPI import numpy comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() print 'Hello from process', rank + 1, 'of' , size
2.2 Communications bloquantes
On peut programmer un «ping-pong» entre 2 processus de la façon suivante
from mpi4py import MPI import numpy as np import sys comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() buffsize = int(sys.argv[1]) nb_ping = int(sys.argv[2]) buffer_in = np.zeros(buffsize,dtype='i') buffer_out = np.ones(buffsize,dtype='i') if (rank == 0): print('let us launch ', nb_ping , 'ping-pongs') deb = MPI.Wtime() for i in range(nb_ping): comm.Send([buffer_out, buffsize, MPI.INT], dest = 1, tag=1) print("0 send %d-th message to 1" % i) comm.Recv([buffer_in, buffsize, MPI.INT], source = 1, tag=1) print("0 receive %d-th message from 1" % i) fin = MPI.Wtime() print('That took %.5f seconds\n' % (fin-deb)) else: for i in range(nb_ping): comm.Recv([buffer_in, buffsize, MPI.INT], source = 0, tag=1) print("1 receive %d-th message from 0" % i) comm.Send([buffer_out, buffsize, MPI.INT], dest = 0, tag=1) print("1 send %d-th message to 0" % i)
Vous pouvez intervertir le Send et le Recv pour le processus 1
. Que se
passe-t-il ?
2.3 Communications non-bloquantes
Nous souhaitons écrire un programme impliquant 2 processus qui doivent s'échanger
leur hostname
avant de pouvoir effectuer un calcul en parallèle.
Exécutez le programme suivant utilisant des communications bloquantes. Qu'observez-vous ?
from mpi4py import MPI import numpy as np import socket import time comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() buffsize = 16 buffer_in = np.zeros(buffsize,dtype='c') buffer_out = np.ones(buffsize,dtype='c') if (rank == 0): buffer_in[:]=socket.gethostname() comm.Ssend([buffer_out, buffsize, MPI.CHAR], dest = 1, tag = 1) comm.Recv([buffer_in, buffsize, MPI.CHAR], source = 1, tag = 1) print 0, 'is sleeping' time.sleep(2) else: buffer_in[:]=socket.gethostname() comm.Ssend([buffer_out, buffsize, MPI.CHAR], dest = 0, tag = 1) comm.Recv([buffer_in, buffsize, MPI.CHAR], source = 0, tag = 1) print 1, 'is sleeping' time.sleep(2)
vous pouvez permutez un des Recv
et Ssend
pour débloquer la communication ou tester
le programme suivant avec des communications non-bloquantes comme suit
from mpi4py import MPI import numpy as np import socket import time comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() buffsize = 16 requests = [MPI.Request() for i in [1,2]] statuses = [MPI.Status() for i in [1,2]] buffer_in = np.zeros(buffsize,dtype='c') buffer_out = np.ones(buffsize,dtype='c') if (rank == 0): buffer_in[:]=socket.gethostname() requests[0] = comm.Isend([buffer_out, buffsize, MPI.CHAR], dest = 1, tag = 1) requests[1] = comm.Irecv([buffer_in, buffsize, MPI.CHAR], source = 1, tag = 1) MPI.Request.Waitall(requests) print 0, 'is sleeping' time.sleep(2) else: buffer_in[:]=socket.gethostname() requests[0] = comm.Isend([buffer_out, buffsize, MPI.CHAR], dest = 0, tag = 1) requests[1] = comm.Irecv([buffer_in, buffsize, MPI.CHAR], source = 0, tag = 1) MPI.Request.Waitall(requests) print 1, 'is sleeping' time.sleep(2)
L'autre solution pour eviter les interblocages est d'utiliser un appel conjoint à SendRecv
from mpi4py import MPI import numpy as np import socket import time comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() buffsize = 16 buffer_in = np.zeros(buffsize,dtype='c') buffer_out = np.ones(buffsize,dtype='c') if (rank == 0): buffer_in[:]=socket.gethostname() comm.Sendrecv([buffer_out, buffsize, MPI.CHAR], dest = 1, sendtag = 1, recvbuf= [buffer_in, buffsize, MPI.CHAR], source = 1, recvtag = 1) print 0, 'is sleeping' time.sleep(2) else: buffer_in[:]=socket.gethostname() comm.Sendrecv([buffer_out, buffsize, MPI.CHAR], dest = 0, sendtag = 1, recvbuf= [buffer_in, buffsize, MPI.CHAR], source = 0, recvtag = 1) print 1, 'is sleeping' time.sleep(2)
2.4 Communications collectives
2.4.1 Barrière
Dans le programme suivant, nous lançons des processus qui ont pour but de
dormir N+1
secondes (N
étant le rang de chacun) et de se synchroniser grâce à une barrière lorsqu'ils ont fini.
from mpi4py import MPI import numpy as np import socket import time comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() time.sleep(rank) print 'Process %d has finished to sleep\n' % rank comm.Barrier() print 'Process %d has left the barriere\n' % rank
2.4.2 Diffusion
Dans le programme suivant, le processus de rang 0
envoie son hostname
à tous les processus.
from mpi4py import MPI import numpy as np import socket import time buffsize = 16 comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() buffer_out= np.zeros(buffsize, dtype='c') if (rank == 0): buffer_out[:]=socket.gethostname() comm.Bcast([buffer_out, buffsize, MPI.CHAR]) print 'Process %d has received %s\n' %(rank,buffer_out)
2.4.3 Réduction
Dans le programme suivant, chaque processus effectue un calcul correspondant à la somme partielle
de tous les élements compris entre a
et b
avec \(a = \mathrm{rank} \times
10\) et \(b = a + 9\).
À l'issue des sommes partielles, on effectue une réduction pour obtenir une somme globale.
from mpi4py import MPI import operator comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() deb = 10 * rank fin = deb + 10 my_cnt = 0 print 'Process %d will add the numbers between %d and %d' % (rank, deb, fin - 1) my_cnt = reduce(operator.add,xrange(deb,fin)) cnt=comm.reduce(my_cnt) if (rank == 0): print 'Global count is %d' % cnt
2.5 Répartition/Agrégation
Dans le programme suivant, (écrit pour 4 processus), nous répartissons le contenu d'un tableau dans la mémoire des 4 processus. Chaque processus affiche ce qu'il a reçu, puis le tout est agrégé dans un nouveau tableau dans la mémoire du processus de rang 0.
from mpi4py import MPI import numpy as np comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() chaine = 'abcdefghijkl' buffsize = chaine.__len__() array=np.empty(buffsize, dtype='c') array[:]=chaine n=4 bloc_size=buffsize/n recv1 = np.empty(bloc_size, dtype='c') recv2 = np.empty(buffsize, dtype='c') if (size == n): comm.Scatter([array, bloc_size, MPI.CHAR], [recv1, bloc_size, MPI.CHAR],0) print 'Process ', rank, 'has received', recv1[:bloc_size] comm.Gather([recv1, bloc_size, MPI.CHAR], [recv2, bloc_size, MPI.CHAR],0) if rank == 0: print 'Final gather', recv2 else: print 'must run with 4 processes'
2.5.1 Communication globale
Il est possible de faire une répartition/agrégation entre tous les processus. Cela revient à ``tranposer'' la matrice données vs processus. On fabrique un exemple \(4\times 4\) avec 4 processeurs.
from mpi4py import MPI import numpy as np comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() n=4 z=np.mgrid[0:n] a=np.int32((n*(z[None,:])+z[:,None]+1).T) recv=np.zeros(n,dtype=a.dtype) if (size == n): print rank ,'sends ', a[rank,:] comm.Alltoall([a[rank,:], 1, MPI.INT],[recv,1,MPI.INT]) print rank, 'recv = ', recv else: if rank == 0: print 'must run with 4 processes'
2.6 Topologie virtuelle
Dans le programme suivant, nous créons une topologie cartésienne avec 16 processus, sous la forme d'une grille de \(4 \times 4\). Ensuite, chaque processus envoie à ses voisins son rang, affiche le rang de ses voisins et affiche les rangs qu'il a reçu (en principe, ceux de ces voisins)
from mpi4py import MPI import numpy as np comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() m = 4 O = 0 E = 1 N = 2 S = 3 if (size == m * m): comm2D = comm.Create_cart((m, m), periods=[True]*2, reorder= [False]*2) coords = comm2D.Get_coords(comm2D.rank) message = np.empty(1, dtype='i') voisin = np.empty(4, dtype='i') ibuf = np.empty((4,1), dtype='i') [voisin[N], voisin[S] ]=comm2D.Shift(0, 1) [voisin[O], voisin[E] ]=comm2D.Shift(1, 1) message[0] = rank requests = [MPI.Request() for i in xrange(8)] for k in xrange(4): requests[k] = comm2D.Isend([message, 1, MPI.INT], dest = voisin[k], tag = 1) # ibuf doit etre un buffer requests[k+4] = comm2D.Irecv([ibuf[k,:], 1, MPI.INT], source = voisin[k], tag = 1) comm2D.Irecv([ibuf[k,:], 1, MPI.INT], source = voisin[k], tag = 1) MPI.Request.Waitall(requests) print ' rank = ', rank , 'coords = ', coords.tolist(), 'neighbors = ', voisin print ' rank = ', rank , ' ibuf =', ibuf.T else: print 'must be run with ', m * m, 'processors'
2.7 Types de données
2.7.1 Données non contiguës
Le passage de données contiguës (lignes de matrice) ne pose pas de probleme avec les buffers numpy. Par contre si on
veut passer une colonne, ca ne marche plus. Pour cela nous allons utiliser un
type sous-bloc de matrice avec l'appel Create_subarray
from mpi4py import MPI import numpy as np comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() n=4 a=np.empty((n,n),dtype='i') first_col = MPI.INT.Create_subarray(a.shape, [a.shape[0], 1], [0,0]) first_col.Commit() if size == 2: if rank == 0: a[:, :] = np.ones((n,n),dtype = 'i') a[:, 0] = np.array(range(n)) req = comm.Isend([a,1,first_col],dest = 1,tag = 1) else: a[:,:] = np.zeros((n,n), dtype='i') req = comm.Irecv([a,1,first_col],source = 0, tag =1) req.Wait() print 'rank = ', rank , 'a = ', a else: print 'must run with two procs'
3 Exercices
3.1 Anneau de processus
Écrire un programme qui lit une donnée en entrée sur le processus 0
et qui l'envoie
au suivant, et chaque processus suivant envoie la donnée a son successeur. On
s'arrête lorsque lorsque le dernier processus est atteint
3.2 Équation de la chaleur
On va chercher à paralléliser le programme suivant, qui résout l'équation de la chaleur sur une grille réguliere
import optparse import numpy as np def chaleur(hx, hy,dt, u_in): """ schema explicite u[i,j]^{n+1}-u[i,j]^{n} = dt/(h_x*h_y) * (u[i-1,j] + u[ """ diag_x = - 2.0 + hx*hx/(2.*dt) diag_y = - 2.0 + hy*hy/(2.*dt) w_x = dt/(hx*hx) w_y = dt/(hy*hy) u_out=u_in.copy() u_out[1:-1,1:-1]=(u_in[0:-2, 1:-1] + u_in[2:, 1:-1] + u_in[1:-1, 1:-1] * diag_x ) * w_x +\ (u_in[1:-1, 0:-2] + u_in[1:-1, 2:] + u_in[1:-1, 1:-1] * diag_y ) * w_y err=np.sum((u_out-u_in)**2) return u_out,err def init_param(nx, ny): hx=1./nx hy=1./ny u=np.zeros((nx+2,ny+2),dtype='float') u[0,:]=1. u[-1,:]=1. u[:,0]=1. u[:,-1]=1. dt= min(hx**2/4,hy**2/4) return hx, hy, dt, u if __name__ == '__main__': parser = optparse.OptionParser() parser.add_option("--nx", type="int", dest="nx", default=10, help="Number of global points in x-direction [default 10]") parser.add_option("--ny", type="int", dest="ny", default=10, help="Number of global points in y-direction [default 10]") parser.add_option("--it", type="int", dest="iter_max", default=1000, help="maximal number of iterations") options,args = parser.parse_args() # initialisation des hx, hy, dt, u = init_param(options.nx, options.ny) it_max=options.iter_max prec=1e-4 err=1e10 ##boucle temporelle for k in range(it_max): [u, err] = chaleur(hx,hy,dt,u) err=np.sqrt(err) print 'k = %d, t = %.3e, err = %.3e' % (k,k*dt,err) if (err<= prec): break #from scipy.io import savemat #savemat('solution_seq',{'sol_seq':u}, oned_as='col') n va chercher à paralléliser le programme suivant, qui résout l'équation de la chaleur sur une grille réguliere
vous pouvez voir une solution possible (non-optimale) là