Skip to content

Commit bfc8cbe

Browse files
committed
playing with async
1 parent 67b0925 commit bfc8cbe

File tree

10 files changed

+491
-0
lines changed

10 files changed

+491
-0
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from mpi4py import MPI
2+
import numpy as np
3+
import time
4+
5+
6+
def sleep(n):
7+
tmp = np.random.rand(n)
8+
9+
10+
comm = MPI.COMM_WORLD
11+
rank = comm.Get_rank()
12+
13+
comm.Barrier()
14+
15+
t0 = time.time()
16+
17+
if rank == 0:
18+
sbuf = np.empty(40000000)
19+
sbuf[0] = 0
20+
sbuf[1:4] = np.random.rand(3)
21+
req = comm.Isend(sbuf[:], dest=1, tag=99)
22+
sleep(100000000)
23+
req.wait()
24+
print("[%02d] Original data %s" % (rank, sbuf))
25+
else:
26+
rbuf = np.empty(40000000)
27+
sleep(10000000)
28+
comm.Recv(rbuf[:], source=0, tag=99)
29+
print("[%02d] Received data %s" % (rank, rbuf))
30+
31+
t1 = time.time()
32+
33+
print(f'Rank: {rank} -- Time: {t1-t0}')
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#!/bin/bash -x
2+
#SBATCH --nodes=2
3+
#SBATCH --ntasks-per-node=24
4+
#SBATCH --cpus-per-task=1
5+
#SBATCH --output=run.out
6+
#SBATCH --error=run.err
7+
#SBATCH --time=00:05:00
8+
#SBATCH --partition=devel
9+
10+
export HWT=1
11+
export PIN=`./correct_pinning.sh`
12+
13+
14+
#export SCOREP_ENABLE_TRACING=1
15+
#export PATH=/p/project/ccstma/scorep/6.0-trunk-mrobefix_intel-parastation-papi/bin:$PATH
16+
#export PATH=/p/project/ccstma/scorep/6.0-trunk-mprobefix_intel-impi-papi/bin:$PATH
17+
18+
#srun python -m scorep --mpp=mpi rma.py
19+
#srun python -m scorep --mpp=mpi isend.py
20+
#srun python -m scorep --mpp=mpi thread.py
21+
22+
#srun python rma.py
23+
#srun python isend.py
24+
#srun --cpu_bind=sockets python thread.py -n 12
25+
#srun --cpu_bind=sockets --hint=multithread python thread.py -n 12
26+
27+
echo -e "\n\nDEFAULT PINNING\n---------------------------\n"
28+
srun --label python thread.py -n 24
29+
echo -e "\n\nSOCKET PINNING\n---------------------------\n"
30+
srun --cpu_bind=sockets python thread.py -n 24
31+
echo -e "\n\nBROEMMEL PINNING\n---------------------------\n"
32+
srun $PIN --label python thread.py -n 24
33+
#srun $PIN --label ./show_affinity_jureca.x
34+
35+
touch ready
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import mpi4py
2+
mpi4py.rc.threaded = True
3+
mpi4py.rc.thread_level = "funneled"
4+
# mpi4py.rc.profile('vt-hyb', logfile='threads')
5+
6+
from mpi4py import MPI
7+
from threading import Thread
8+
9+
MPI.COMM_WORLD.Barrier()
10+
11+
# Understanding the Python GIL
12+
# David Beazley, http://www.dabeaz.com
13+
# PyCon 2010, Atlanta, Georgia
14+
# http://www.dabeaz.com/python/UnderstandingGIL.pdf
15+
16+
# Consider this trivial CPU-bound function
17+
def countdown(n):
18+
while n > 0:
19+
n -= 1
20+
21+
# Run it once with a lot of work
22+
COUNT = 10000000 # 10 millon
23+
tic = MPI.Wtime()
24+
countdown(COUNT)
25+
toc = MPI.Wtime()
26+
print ("sequential: %f seconds" % (toc-tic))
27+
28+
# Now, subdivide the work across two threads
29+
t1 = Thread(target=countdown, args=(COUNT//2,))
30+
t2 = Thread(target=countdown, args=(COUNT//2,))
31+
tic = MPI.Wtime()
32+
for t in (t1, t2): t.start()
33+
for t in (t1, t2): t.join()
34+
toc = MPI.Wtime()
35+
print ("threaded: %f seconds" % (toc-tic))

pySDC/playgrounds/parallel/rma.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from mpi4py import MPI
2+
import numpy as np
3+
import time
4+
5+
6+
def sleep(n):
7+
tmp = np.random.rand(n)
8+
9+
10+
comm = MPI.COMM_WORLD
11+
rank = comm.Get_rank()
12+
13+
t0 = time.time()
14+
15+
if rank == 0:
16+
sbuf = np.empty(40000000)
17+
win = MPI.Win.Create(sbuf, comm=comm)
18+
win.Lock(0, MPI.LOCK_EXCLUSIVE)
19+
sbuf[0] = 0
20+
sbuf[1:4] = np.random.rand(3)
21+
win.Unlock(0)
22+
sleep(100000000)
23+
print("[%02d] Original data %s" % (rank, sbuf))
24+
else:
25+
rbuf = np.empty(40000000)
26+
win = MPI.Win.Create(None, comm=comm)
27+
sleep(1000000)
28+
win.Lock(0, MPI.LOCK_EXCLUSIVE)
29+
win.Get(rbuf, 0)
30+
win.Unlock(0)
31+
print("[%02d] Received data %s" % (rank, rbuf))
32+
33+
t1 = time.time()
34+
35+
win.Free()
36+
37+
print(f'Rank: {rank} -- Time: {t1-t0}')
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
from mpi4py import MPI
2+
import numpy as np
3+
import time
4+
5+
def sleep(n):
6+
tmp = np.random.rand(n)
7+
8+
comm = MPI.COMM_WORLD
9+
rank = comm.Get_rank()
10+
11+
if rank == 0:
12+
sbuf = np.empty(4)
13+
win = MPI.Win.Create(sbuf, comm=comm)
14+
else:
15+
rbuf = np.empty(4)
16+
win = MPI.Win.Create(None, comm=comm)
17+
# tmp = np.random.rand(int(10000000/2))
18+
19+
group = win.Get_group()
20+
21+
t0 = time.time()
22+
23+
if rank == 0:
24+
sleep(10000000)
25+
# tmp = np.random.rand(100000000)
26+
for i in range(3):
27+
if i > 0:
28+
sleep(100000000)
29+
win.Wait()
30+
sbuf[0] = i
31+
sbuf[1:] = np.random.rand(3)
32+
print("[%02d] Original data %s" % (rank, sbuf))
33+
win.Post(group.Incl([1]))
34+
win.Wait()
35+
else:
36+
# tmp = np.random.rand(10000)
37+
# tmp = np.random.rand(10000000)
38+
# tmp = np.random.rand(1)
39+
for i in range(3):
40+
win.Start(group.Excl([1]))
41+
win.Get(rbuf, 0)
42+
win.Complete()
43+
sleep(70000000)
44+
print("[%02d] Received data %s" % (rank, rbuf))
45+
46+
t1 = time.time()
47+
group.Free()
48+
win.Free()
49+
50+
print(f'Rank: {rank} -- Time: {t1-t0}')
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
#!/usr/bin/env bash
2+
# Script for a correct pinning w/ regard to hardware threads shared by a process
3+
# using SLURM variables. Also needs an additional HWT multiplicity set in the
4+
# environment.
5+
# To share two hardware threads of each core for a process:
6+
# ...
7+
# export HWT=2
8+
# export PIN=`correct_pinning.sh`
9+
# ...
10+
# srun $PIN ...
11+
# ...
12+
13+
# echo settings? This will break srun integration...
14+
ECHO=${VERBOSE:-false}
15+
# to fix the integration, try
16+
# export PIN=`correct_pinning.sh | grep cpu_bind`
17+
#
18+
19+
# run for JURECA cluster or booster?
20+
# JUWELS should be covered anyway.
21+
MCA="CLS"
22+
echo "$SLURM_JOB_PARTITION" | grep -q booster && MCA="BOO"
23+
24+
function print_config() {
25+
`$ECHO` && echo -e "\nHARDWARE CONFIG:"
26+
`$ECHO` && echo "cores per node: $PHYS_CORES_NODE"
27+
`$ECHO` && echo "CPUs per node: $SOCKETS"
28+
`$ECHO` && echo "cores per CPU: $PHYS_CORES_CPU"
29+
`$ECHO` && echo "hardware threads per core: $SMT"
30+
`$ECHO` && echo "hardware threads per node: $SLURM_CPUS_ON_NODE"
31+
`$ECHO` && echo -e "\nJOB CONFIG:"
32+
`$ECHO` && echo "tasks per node: $SLURM_NTASKS_PER_NODE"
33+
`$ECHO` && echo "hardware threads per task: $SLURM_CPUS_PER_TASK"
34+
`$ECHO` && echo "shared hardware threads per process: $HWT"
35+
}
36+
37+
function pin_cluster() {
38+
SOCKETS=2
39+
SMT=2
40+
PHYS_CORES_NODE=$(($SLURM_CPUS_ON_NODE/$SMT))
41+
PHYS_CORES_CPU=$(($PHYS_CORES_NODE/$SOCKETS))
42+
43+
print_config
44+
45+
# exit straight away if we can't evenly distribute threads
46+
if [ $(($(($SLURM_CPUS_PER_TASK/$HWT))*$HWT)) != $SLURM_CPUS_PER_TASK ]
47+
then
48+
`$ECHO` && echo "No nice disitribution of threads possible"
49+
exit 1
50+
fi
51+
52+
CPUid=0
53+
MASK="--cpu_bind=mask_cpu:"
54+
# loop per process on each node
55+
for PROC in `seq 1 $SLURM_NTASKS_PER_NODE`
56+
do
57+
MAP=""
58+
`$ECHO` && echo "process $PROC"
59+
for CORE in `seq 1 $(($SLURM_CPUS_PER_TASK/$HWT))`
60+
do
61+
CPUid_=$CPUid
62+
for HW in `seq 1 $HWT`
63+
do
64+
MAP="$MAP,$CPUid_"
65+
((CPUid_+=$PHYS_CORES_NODE))
66+
done
67+
((CPUid++))
68+
done
69+
MAP_=`echo $MAP | sed 's/,/2^/' | sed 's/,/+2^/g'`
70+
MAP=`echo $MAP | sed 's/,//'`
71+
`$ECHO` && printf "map for process $PROC: %s\n" $MAP
72+
MASK="$MASK,0x"`echo "obase=16; $MAP_" | bc`
73+
done
74+
MASK=`echo $MASK | sed 's/:,/:/'`
75+
echo $MASK
76+
}
77+
78+
function pin_booster() {
79+
SOCKETS=1
80+
SMT=4
81+
PHYS_CORES_NODE=$(($SLURM_CPUS_ON_NODE/$SMT))
82+
PHYS_CORES_CPU=$(($PHYS_CORES_NODE/$SOCKETS))
83+
84+
print_config
85+
86+
# exit straight away if we can't evenly distribute threads
87+
if [ $(($(($SLURM_CPUS_PER_TASK/$HWT))*$HWT)) != $SLURM_CPUS_PER_TASK ]
88+
then
89+
`$ECHO` && echo "No nice disitribution of threads possible"
90+
exit 1
91+
fi
92+
93+
CPUid=0
94+
MASK="--cpu_bind=mask_cpu:"
95+
# loop per process on each node
96+
for PROC in `seq 1 $SLURM_NTASKS_PER_NODE`
97+
do
98+
MAP=""
99+
`$ECHO` && echo "process $PROC"
100+
for CORE in `seq 1 $(($SLURM_CPUS_PER_TASK/$HWT))`
101+
do
102+
CPUid_=$CPUid
103+
for HW in `seq 1 $HWT`
104+
do
105+
MAP="$MAP,$CPUid_"
106+
((CPUid_+=$PHYS_CORES_CPU))
107+
done
108+
((CPUid++))
109+
if [ $CPUid -eq $PHYS_CORES_CPU ] && [ $HWT -eq 2 ]
110+
then
111+
((CPUid+=$PHYS_CORES_CPU))
112+
fi
113+
done
114+
MAP_=`echo $MAP | sed 's/,/2^/' | sed 's/,/+2^/g'`
115+
MAP=`echo $MAP | sed 's/,//'`
116+
`$ECHO` && printf "map for process $PROC: %s\n" $MAP
117+
MASK="$MASK,0x"`echo "obase=16; $MAP_" | bc`
118+
done
119+
MASK=`echo $MASK | sed 's/:,/:/'`
120+
echo $MASK
121+
}
122+
123+
if [ $MCA == "CLS" ]
124+
then
125+
pin_cluster
126+
elif [ $MCA == "BOO" ]
127+
then
128+
pin_booster
129+
fi
130+
131+
exit 0
12.6 KB
Binary file not shown.

0 commit comments

Comments
 (0)