Fortran a paralelní programování

Coarray Fortran

Moderní Fortran 2008 přinesl rozšíření zvané Coarray Fortran (CAF), které snazší cestou realizuje jednostrannou MPI komunikaci a umožňuje tak vytvářet pomocí standardizované syntaxe fortranské programy, které lze spouštět na více počítačích, tedy v režimu s distribuovanou pamětí. Překonává tak klíčové omezení OpenMP programů, které lze spouštět na jediném počítači (se sdílenou pamětí), a poskytuje jednodušší alternativu k procedurám MPI. CAF je už delší dobu implementován v Intel Fortranu (pro Linux i Windows, pouze ifort, nikoliv ifx), GNU Fortran se rovněž přidal (v Linuxu nebo ve Windows Subsystem for Linux), Nvidia a AMD Fortran dosud (rok 2022) nikoliv. CAF obou překladačů je implementován pomocí MPI; Intel používá vlastní MPI, GNU se obrací na knihovnu OpenCoarrays s podporou MPICH i Open MPI.

Program whoami

Pro uvedení do CAF připravíme trojici ekvivalentních programů v OpenMP, MPI a CAF, které vypíšou identifikaci spuštěných vláken/procesů a podělí se o iterace indexovaného cyklu. Seznámíme se se způsoby překladu a spouštění CAF programů s Intel a GNU překladači.

Výchozí ukázkou budiž OpenMP varianta. Vytvoří se paralelní oblast, vlákna vypíšou svou identifikaci, synchronizují se a podělí se staticky o iterace indexovaného cyklu:

! OpenMP: whoami and static scheduling of an indexed loop
program test1
use omp_lib
integer :: nmax=8        ! number of iterations
!$OMP PARALLEL
print *,'I am:',whoami() ! all threads print ID
!$OMP BARRIER            ! synchronization of all threads

!$OMP DO                 ! default static scheduling of iterations
do i=1,nmax
  print *,'I am:',whoami(),', assigned iteration:',i
enddo
!$OMP END PARALLEL

contains

integer function whoami() result (r)
r=omp_get_thread_num()+1 ! thread id (0 first) plus 1 (1 first)
end function

end program

Překlad, nastavení počtu vláken a spuštění:

# Linux and Intel/GNU
source load_intel
ifort -qopenmp test1.f90 (or) gfortran -fopenmp test1.f90
export OMP_NUM_THREADS=4
./a.out
# Windows and Intel/GNU
ifort -Qopenmp test1.f90 (or) gfortran -fopenmp -o test1 test1.f90
set OMP_NUM_THREADS=4
test1

Následuje MPI varianta. Inicializuje se MPI systém, procesy vypíšou svou identifikaci, synchronizují se a každý proces si vybere iterace počínaje číslem jeho identifikace a s krokem rovným velikosti komunikátoru:

! MPI: whoami and static scheduling of an indexed loop
program test2
use mpi_f08
integer :: nmax=8              ! number of iterations
integer :: np,rank
type(MPI_Comm) :: comm

call MPI_Init()
comm=MPI_COMM_WORLD
call MPI_Comm_size(comm,np)    ! number of processes
call MPI_Comm_rank(comm,rank)  ! process id
print *,'I am:',whoami()
call MPI_Barrier(comm)         ! synchronization of all processes

do i=whoami(),nmax,np          ! static scheduling of iterations
  print *,'I am:',whoami(),', assigned iteration:',i
enddo
call MPI_Finalize()

contains

integer function whoami() result (r)
r=rank+1                       ! process id (0 first) plus 1 (1 first)
end function

end program

Překlad a spuštění s nastavením počtu procesů:

# Intel (Linux and Windows)
source load_intel
mpiifort test2.f90
mpiexec -n 4 ./a.out (or) mpiexec -n 4 test2

# GNU and Open MPI (Linux)
mpif90 test2.f90
mpirun -n 4 a.out

CAF varianta je bez explicitní inicializace, počet procesů (images) a jejich identifikaci vrací dvojice funkcí num_images() a this_image(), synchronizaci všech procesů zajistí příkaz sync all (jak se zdá, to však nemusí mít vliv na řazení vypisovaných řádků z jednotlivých procesů na společném výstupu) a o iterace se procesy podělí stejně jako v MPI ukázce:

! CAF: whoami and static scheduling of an indexed loop
program test3
integer :: nmax=8        ! number of iterations
print *,'I am:',whoami() ! all images print ID
sync all                 ! synchronization of all images

do i=whoami(),nmax,num_images() ! static scheduling of iterations
  print *,'I am:',whoami(),', assigned iteration:',i
enddo

contains

integer function whoami() result (r)
r=this_image()           ! image id (1 first)
end function

end program

Pro překlad, nastavení počtu procesů a jejich spuštění používá Intel specifický postup zahrnující volby překladače (-coarray aj.) a proměnné prostředí (FOR_COARRAY_NUM_IMAGES aj.). GNU překladač využívá knihovnu OpenCoarrays, jejíž součástí jsou skripty caf (compiler wrapper) a cafrun (program launcher), obalující standardní MPI wrapper a launcher. Knihovna OpenCoarrays může spolupracovat jak s implementací Open MPI (náš MPI default), tak MPICH (default pro CAF). V Ubuntu 22.04 se bohužel jeví, že OpenCoarrays s Open MPI běží jen na jednom stroji a neběží mezi více stroji; OpenCoarrays s MPICH tímto netrpí. Skripty caf a cafrun jsou proto navázány na MPICH (případné volby je třeba uvádět v syntaxi MPICH), pro Open MPI variantu lze použít skripty caf.openmpicafrun.openmpi. Postup pro běh na jediném stroji je následující, použití CAF na více strojích popíšeme později.

# Linux and Intel
# https://software.intel.com/content/www/us/en/develop/documentation/fortran-compiler-coarray-tutorial/top.html
source load_intel
ifort -coarray test3.f90          # compile (set the default number of images by -coarray-num-images=n)
export FOR_COARRAY_NUM_IMAGES=4   # set the actual number of images
./a.out                           # run
FOR_COARRAY_NUM_IMAGES=4 ./a.out  # one-line alternative for set & run

# Linux and GNU with OpenCoarrays and MPICH/Open MPI (Ubuntu 22.04: apt install libcaf-mpich-3 libcaf-openmpi-3)
# https://gcc.gnu.org/wiki/CoarrayLib
caf test3.f90                    # compile with MPICH
cafrun -n 4 ./a.out              # run locally with MPICH
caf.openmpi test3.f90            # compile with Open MPI
cafrun.openmpi -n 4 a.out        # run locally with Open MPI
mpif90 -fcoarray=lib test3.f90 -lcaf_openmpi  # an alternative via the Open MPI wrapper and launcher
mpirun -n 4 a.out
# Windows and Intel
ifort -Qcoarray test3.f90        # set the default number of images by -Qcoarray-num-images=n
set FOR_COARRAY_NUM_IMAGES=4     # set the actual number of images
test3

# Windows and GNU: use Windows Subsystem for Linux and commands for Linux

CAF programy lze překládat i pro jednoprocesový běh (bez paralelizačních knihoven), ale stále s podporou CAF syntaxe:

# Compilation for a single-image run
ifort -coarray=single test3.f90      # Intel
gfortran -fcoarray=single test3.f90  # GNU
./a.out                              # a single-image run

Programy gather/scatter

V dalším předvedeme trojici blízkých programů, ve kterých vlákna/procesy nastaví hodnotu do sobě příslušného prvku pole a hlavní vlákno/proces (master) tyto hodnoty vypíše. OpenMP verze programu to provede pomocí sdíleného pole, MPI a CAF verze pomocí distribuovaného pole v podobě skaláru v každém procesu.

V OpenMP verzi se zjistí počet vláken, alokuje sdílené pole, vlákna zapíšou hodnotu do svých prvků a hlavní vlákno, po implicitní synchronizaci, vypíše obsah sdíleného pole:

! OpenMP: master thread gathers elements of the shared array
! ifort -qopenmp test4.f90; export OMP_NUM_THREADS=4; ./a.out (or) gfortran -fopenmp ...
program test4
use omp_lib
integer,allocatable :: a(:)   ! a shared array
integer np,i

!$OMP PARALLEL
!$OMP SINGLE
np=OMP_GET_NUM_THREADS()      ! number of threads, number of array elements
allocate (a(np))
!$OMP END SINGLE              ! implicit barrier
!$OMP DO
do i=1,np
  a(i)=OMP_GET_THREAD_NUM()+1 ! assign shared array elements
enddo
!$OMP END PARALLEL
print *,a                     ! print shared array elements
end program

V MPI verzi se zjistí počet procesů, procesy zapíšou hodnoty do svých prvků, procesy předají hodnoty svých prvků hlavnímu procesu a ten je vypíše. Použitá varianta dvoustranné komunikace je synchronizovaná.

! MPI: master process gathers elements of the distributed array
! (Intel) mpiifort test5.f90; mpiexec -n 4 ./a.out
! (GNU) mpif90 test5.f90; mpirun -n 4 a.out
program test5
use mpi_f08
integer np,rank,tag
integer a,i
type(MPI_Comm) comm

call MPI_Init()
comm=MPI_COMM_WORLD
call MPI_Comm_size(comm,np)    ! number of processes
call MPI_Comm_rank(comm,rank)  ! process id
a=rank+1                       ! assign distributed array elements
tag=0
if (rank==0) then
  print *,a                    ! print the element from the master process
  do i=1,np-1
    call MPI_Recv(buf=a,count=1,datatype=MPI_INTEGER,source=i,tag=tag,comm=comm,status=MPI_STATUS_IGNORE)
    print *,a                  ! print elements from other processes
  enddo
else
  call MPI_Send(buf=a,count=1,datatype=MPI_INTEGER,dest=0,tag=tag,comm=comm)
endif
call MPI_Finalize()
end program

V CAF verzi je to obdobné jako v MPI, procesy lokálně zapisují hodnoty do svých prvků a o jejich hodnoty si pak říká hlavní proces. Proměnné, které mají být přenášeny mezi procesy, se nazývají coarrays a v programech si nesou hranaté závorky, zde v deklaraci a[*] (hvězdička naznačuje, že počet prvků bude odpovídat počtu procesů) a ve výrazu a[i] (s významem proměnné a z procesu i). Při lokálním užití, kdy k proměnným přistupuje jejich proces, je lze psát bez závorek: a=this_image(). Díky užití jednostranné komunikace je třeba uvést explicitní bariéru mezi úseky programu se zápisem a čtením, jinak hrozí datové dostihy (data race):

! CAF: master image gathers and prints coarray elements
! (Intel) ifort -coarray test6.f90; FOR_COARRAY_NUM_IMAGES=4 ./a.out
! (GNU) caf test6.f90; cafrun -n 4 ./a.out
program test6
integer a[*],np,i  ! a coarray

np=num_images()    ! number of images
a=this_image()     ! assign coarray elements
sync all           ! full barrier
if (this_image()==1) then
  do i=1,np
    print *,a[i]   ! master prints coarray elements
  enddo
endif
end program

Vypuštění příkazu sync all v předchozí ukázce nemusí mít bezprostředně destruktivní efekt, patrně vlivem rychle provedených zápisů do paměti následovaných pomalejší komunikací a výpisy. Když postup otočíme, tedy komunikujeme data z hlavního procesu ostatním a tam je vypisujeme, datové dostihy by se při vynechání sync all nesprávnými hodnotami na výpisu už projevily:

! CAF: master image scatters coarray elements with full barrier
program test6b
integer a[*],np,i
np=num_images()
if (this_image()==1) then
  do i=1,np
    a[i]=i         ! master assigns coarray elements
  enddo
endif
sync all           ! full barrier (data race when omitted)
print *,a
end program

Může být efektivnější nahradit plnou bariéru sync all, tedy bod současné synchronizace všech procesů, parciálními bariérami sync images, synchronizujícími jen komunikující páry. Více synchronizací prováděných hlavním procesem lze sdružit do paralelizovatelného zápisu s polem identifikací synchronizovaných procesů, sync images ([(i,i=2,np)]), nebo do zápisu pro párovou synchronizaci procesu s každým z ostatních procesů, sync images (*):

! CAF: master image scatters coarray elements with partial barriers
program test6c
integer :: a[*],np,i
np=num_images()
if (this_image()==1) then
  do i=1,np
    a[i]=i         ! master assigns coarray elements
  ! sync images (i) ! alternative: serialized synchronization of images 1 and i
  enddo
! sync images ([(i,i=2,np)]) ! alternative: parallelizable synchronization with explicitly listed images
  sync images (*)  ! partial barrier: synchronization of image 1 and each of the other images
else
  sync images (1)  ! partial barrier: synchronization of images with image 1
endif
print *,a
end program

Komunikační benchmark

Dosud uvedené programy nekladou významnou zátěž na přenosové kanály. O to se snaží až následující benchmark. Procesy v každé iteraci hlavního cyklu (nmax iterací) získají od sousedních dvou procesů jejich vektory (o vmax prvcích) a sečtou je (MPI: MPI_Sendrecv, CAF: v(:)[left]+v(:)[right]). Každý proces si zvolí jeden z prvků součtu a tyto prvky jsou pak kolektivním sumačním podprogramem (MPI: MPI_Reduce, CAF: co_sum) sečteny do prvního procesu. Ten se postará o kontrolní výpis, který by měl obsahovat dva sloupce s hodnotami nn*2 z každé iterace. Cyklus v CAF verzi obsahuje jeden synchronizační příkaz, nutný kvůli předchozímu updatu coarray v následovanému jeho čtením z okolních procesů; ponecháváme jednoduché sync all, zakomentovaný řádek s cílenějším sync images nevedl při pokusech ke zrychlení.

Zdrojový kód v MPI:

! MPI benchmark: transfer of arrays between adjacent processes
! (Intel) mpiifort test_mpi.f90; time mpirun -n 16 ./a.out
! (GNU) mpif90 test_mpi.f90; time mpirun -n 16 ./a.out
program test_mpi
use mpi_f08
implicit none
integer,parameter :: nmax=20    ! number of iterations
integer,parameter :: vmax=1000000 ! vector size
integer,allocatable,dimension(:) :: v,vleft,vright,result
integer np,rank,left,right,n,m,sumresult
type(MPI_Comm) comm

call MPI_Init()
comm=MPI_COMM_WORLD
call MPI_Comm_size(comm,np)     ! number of processes
call MPI_Comm_rank(comm,rank)   ! number of this process

if (rank==0) print '(a,i0,x,i0)','num_procs & this_proc = ',np,rank+1
left=merge(rank-1,np-1,rank>0)  ! a left neighbour
right=merge(rank+1,0,rank<np-1) ! a right neighbour
allocate (v(vmax))              ! allocation of a distributed vector
allocate (vleft,vright,result,mold=v)  ! sourced allocation
do n=1,nmax                     ! nmax iterations
  v=n                           ! vector initialization
  call MPI_Sendrecv(sendbuf=v,sendcount=vmax,sendtype=MPI_INTEGER,dest=right,sendtag=0, &
                    recvbuf=vleft,recvcount=vmax,recvtype=MPI_INTEGER,source=left,recvtag=0, &
                    comm=comm,status=MPI_STATUS_IGNORE)    ! transfer of vectors
  call MPI_Sendrecv(v,vmax,MPI_INTEGER,left,0, vright,vmax,MPI_INTEGER,right,0, comm,MPI_STATUS_IGNORE)
  result=vleft+vright           ! vector addition
  m=min(rank+1,vmax)            ! picking an index
  call MPI_Reduce(sendbuf=result(m),recvbuf=sumresult,count=1,datatype=MPI_INTEGER,op=MPI_SUM,root=0,comm=comm)
                                ! collective summation (expected result: np*n*2)
  if (rank==0) then             ! master process prints output
    sumresult=sumresult/np
    print '(i0,2(x,i0))',n,sumresult  ! expected output: n n*2
    if (sumresult<0 .or. sumresult/=n*2) then; print '(a,i0)','Error at n =',n; error stop; endif
  endif
enddo
deallocate (v,vleft,vright,result)
call MPI_Finalize()
end program

Zdrojový kód v CAF:

! CAF benchmark: transfer of coarrays between adjacent images
! (Intel) ifort -coarray test_caf.f90; time FOR_COARRAY_NUM_IMAGES=16 ./a.out
! (GNU/MPICH) caf test_caf.f90; time cafrun -n 16 ./a.out
program test_caf
implicit none
integer,parameter :: nmax=20    ! number of iterations
integer,parameter :: vmax=1000000 ! vector size
integer,allocatable,dimension(:) :: v[:],result ! declaration of an allocatable coarray
integer np,this,left,right,n,m,sumresult

np=num_images()                 ! number of images
this=this_image()               ! number of this image
if (this==1) print '(a,i0,x,i0)','num_images & this_image = ',np,this
left=merge(this-1,np,this>1)    ! a left neighbour
right=merge(this+1,1,this<np)   ! a right neighbour
allocate (v(vmax)[*])           ! allocation of the coarray
allocate (result,mold=v)        ! sourced allocation
do n=1,nmax                     ! nmax iterations
  v=n                           ! vector initialization
  sync all                      ! need of sync before reading updated v from other images
  ! sync images ([left,right])  ! an alternative
  result=v(:)[left]+v(:)[right] ! transfer of coarrays and vector addition
  m=min(this,vmax)              ! picking an index
  sumresult=result(m)           ! a scalar for collective summation
  call co_sum(sumresult,1)      ! collective summation (expected result: np*n*2)
  if (this==1) then             ! master image prints output
    sumresult=sumresult/np
    print '(i0,2(x,i0))',n,sumresult  ! expected output: n n*2
    if (sumresult<0 .or. sumresult/=n*2) then; print '(a,i0)','Error at n =',n; error stop; endif
  endif
enddo
deallocate (v,result)
end program

Objem dat přenášených v iteracích mezi procesy se nastaví konstantou vmax, počet procesů se zvolí volbami launcheru a doba běhu programu se doladí pomocí počtu iterací nmax. Testovat lze rychlost běhů při použití různých MPI a CAF implementací na jediném stroji i na více strojích. Mezi více stroji lze porovnávat také přenosové rychlosti různých strojů klastru, viz poznámky o InfiniBandu, IPoIB a Ethernetu níže. Jeden překladač může být šikovnější při bězích s méně procesy, jiný lépe zvládne komunikaci mnoha procesů. Efekt může mít i způsob rozložení procesů na více strojích, také viz níže.

Běh na více strojích

MPI a také Coarray Fortran umožňují v Linuxu běh programu na více strojích, v distribuovaném režimu. Předpokladem je nastavení bezheslového spojení mezi stroji (viz např. poznámku dole na stránce Překlad a spouštění MPI úloh). Vhodné je spouštět úlohy ve sdíleném adresáři zadaném absolutní cestou. Rychlost přenosu dat mezi stroji je závislá na kabeláži a její softwarové podpoře: na některých strojích máme InfiniBand (viz přehled strojů) podporovaný rychlejším softwarem UCX nebo pomalejším IPoIB, na všech strojích samozřejmě také pomalý (gigabitový) Ethernet s TCP.

Intel CAF používá specifický přístup založený na volbách překladače a konfiguračním souboru, na který se odkáže při překladu a jehož obsah se zohlední při spuštění. Konfigurační soubor obsahuje potřebné volby MPI launcheru, především celkový počet procesů -n, seznam strojů -hosts a počet procesů (postupně přidělovaných soust) pro každý stroj -ppn (processes per node), a také jméno spouštěného programu. Seznam strojů a počet procesů na nich může být uveden i v samostatném machine file.

# Intel CAF distributed run
# https://www.intel.com/content/www/us/en/developer/articles/technical/distributed-memory-coarray-fortran-with-the-intel-fortran-compiler-for-linux-essential.html
source load_intel
cd /AbsolutePath
ifort -coarray=distributed -coarray-config-file=caf.cfg test7.f90
# edit caf.cfg, e.g.: -n 16 -ppn 8 -hosts geofA,geofB ./a.out
./a.out

Diagnostiku lze vyžádat nastavením proměnné I_MPI_DEBUG na kladné hodnoty 1–100 (rozumná volba: 5). K dispozici je MPI profiler: nastaví se proměnná I_MPI_STATS na kladnou hodnotu 1–5 a po skončení běhu se získá statistika navrženým voláním příkazu aps (application performance snapshot). Pomocí proměnné I_MPI_OFI_PROVIDER lze na strojích s InfiniBandem přepínat vedení komunikace přes InfiniBand (hodnota mlx) nebo IPoIB (hodnota tcp), na strojích bez InfiniBandu se komunikuje po Ethernetu (hodnota tcp).

# Intel environment variables
source load_intel
I_MPI_DEBUG=5 ./a.out  # with diagnostics
I_MPI_STATS=5 ./a.out  # with profiling
I_MPI_FABRICS=shm:ofi I_MPI_OFI_PROVIDER=mlx ./a.out  # using fast InfiniBand
I_MPI_FABRICS=shm:ofi I_MPI_OFI_PROVIDER=tcp ./a.out  # using slower IPoIB or slow Ethernet

GNU CAF programy přeložené s knihovnou OpenCoarrays běží v Ubuntu 22.04 mezi více stroji jen s MPICH, nikoliv s Open MPI. (Např. v Ostravě však CAF s Open MPI prý běží.) Skripty cafcafrun jsou proto navázány na MPICH, jejich konfigurační volby je třeba uvádět v syntaxi MPICH (prakticky stejně jako v Intel MPI). Repozitářové MPICH v Ubuntu 22.04 komunikuje na strojích s InfiniBandem (viz přehled strojů) přes IPoIB, na strojích bez InfiniBandu po Ethernetu.

# (Linux) GNU CAF/OpenCoarrays/MPICH distributed run (Ubuntu 22.04: apt install libcaf-mpich-3)
cd /AbsolutePath
caf test7.f90
cafrun -n 16 -ppn 8 -hosts geofA,geofB ./a.out
# unsuccessful distributed run by GNU CAF/OpenCoarrays/Open MPI (Ubuntu 22.04: apt install libcaf-openmpi-3)
# mpif90 -fcoarray=lib test7.f90 -lcaf_openmpi
# mpirun -n 16 -H geofA:8,geofB:8 a.out
# output: ... An error occurred in MPI_Win_create ... MPI_ERR_WIN: invalid window ...

Rychlost běhu benchmarku test7 na více strojích závisí také na rozložení procesů mezi stroji. Procesy v test7 komunikují se svými bezprostředními sousedy; např. při -n 16 -ppn 8 bude na každém stroji po 8 sousedících procesech a bude tak převažovat lokální komunikace mezi nimi, při -n 16 -ppn 1 budou na jednom stroji liché procesy a na druhém sudé a komunikace tak bude vedena hlavně mezi stroji. Pak se i výrazně projeví, komunikují-li procesy pomocí InfiniBandu či Ethernetu.

Více ukázek programů v CAF nabízí např. instalační balík OpenCoarrays, u nás též v síťovém adresáři /opt/OpenCoarrays/src/tests.