1
+ # This example demonstrates a job scheduling through adding the
2
+ # number 100 to every component of the vector data. The root
3
+ # assigns one element to each worker to compute the operation.
4
+ # When the worker is finished, the root sends another element
5
+ # until each element is added 100
6
+ # Inspired on
7
+ # https://www.hpc.ntnu.no/ntnu-hpc-group/vilje/user-guide/software/mpi-and-mpi-io-training-tutorial/basic-mpi/job-queue
8
+
9
+ using MPI
10
+
11
+ function job_queue (data,f)
12
+ MPI. Init ()
13
+
14
+ comm = MPI. COMM_WORLD
15
+ rank = MPI. Comm_rank (comm)
16
+ world_size = MPI. Comm_size (comm)
17
+ nworkers = world_size - 1
18
+
19
+ root = 0
20
+
21
+ MPI. Barrier (comm)
22
+ T = eltype (data)
23
+ N = size (data)[1 ]
24
+ send_mesg = Array {T} (undef, 1 )
25
+ recv_mesg = Array {T} (undef, 1 )
26
+
27
+ if rank == root # I am root
28
+
29
+ idx_recv = 0
30
+ idx_sent = 1
31
+
32
+ new_data = Array {T} (undef, N)
33
+ # Array of workers requests
34
+ sreqs_workers = Array {MPI.Request} (undef,nworkers)
35
+ # -1 = start, 0 = channel not available, 1 = channel available
36
+ status_workers = ones (nworkers).*- 1
37
+
38
+ # Send message to workers
39
+ for dst in 1 : nworkers
40
+ if idx_sent > N
41
+ break
42
+ end
43
+ send_mesg[1 ] = data[idx_sent]
44
+ sreq = MPI. Isend (send_mesg, dst, dst+ 32 , comm)
45
+ idx_sent += 1
46
+ sreqs_workers[dst] = sreq
47
+ status_workers[dst] = 0
48
+ print (" Root: Sent number $(send_mesg[1 ]) to Worker $dst \n " )
49
+ end
50
+
51
+ # Send and receive messages until all elements are added
52
+ while idx_recv != N
53
+ # Check to see if there is an available message to receive
54
+ for dst in 1 : nworkers
55
+ if status_workers[dst] == 0
56
+ (flag, status) = MPI. Test! (sreqs_workers[dst])
57
+ if flag
58
+ status_workers[dst] = 1
59
+ end
60
+ end
61
+ end
62
+ for dst in 1 : nworkers
63
+ if status_workers[dst] == 1
64
+ ismessage, status = MPI. Iprobe (dst,dst+ 32 , comm)
65
+ if ismessage
66
+ # Receives message
67
+ MPI. Recv! (recv_mesg, dst, dst+ 32 , comm)
68
+ idx_recv += 1
69
+ new_data[idx_recv] = recv_mesg[1 ]
70
+ print (" Root: Received number $(recv_mesg[1 ]) from Worker $dst \n " )
71
+ if idx_sent <= N
72
+ send_mesg[1 ] = data[idx_sent]
73
+ # Sends new message
74
+ sreq = MPI. Isend (send_mesg, dst, dst+ 32 , comm)
75
+ idx_sent += 1
76
+ sreqs_workers[dst] = sreq
77
+ status_workers[dst] = 1
78
+ print (" Root: Sent number $(send_mesg[1 ]) to Worker $dst \n " )
79
+ end
80
+ end
81
+ end
82
+ end
83
+ end
84
+
85
+ for dst in 1 : nworkers
86
+ # Termination message to worker
87
+ send_mesg[1 ] = - 1
88
+ sreq = MPI. Isend (send_mesg, dst, dst+ 32 , comm)
89
+ sreqs_workers[dst] = sreq
90
+ status_workers[dst] = 0
91
+ print (" Root: Finish Worker $dst \n " )
92
+ end
93
+
94
+ MPI. Waitall! (sreqs_workers)
95
+ print (" Root: New data = $new_data \n " )
96
+ else # If rank == worker
97
+ # -1 = start, 0 = channel not available, 1 = channel available
98
+ status_worker = - 1
99
+ while true
100
+ sreqs_workers = Array {MPI.Request} (undef,1 )
101
+ ismessage, status = MPI. Iprobe (root, rank+ 32 , comm)
102
+
103
+ if ismessage
104
+ # Receives message
105
+ MPI. Recv! (recv_mesg, root, rank+ 32 , comm)
106
+ # Termination message from root
107
+ if recv_mesg[1 ] == - 1
108
+ print (" Worker $rank : Finish\n " )
109
+ break
110
+ end
111
+ print (" Worker $rank : Received number $(recv_mesg[1 ]) from root\n " )
112
+ # Apply function (add number 100) to array
113
+ send_mesg = f (recv_mesg)
114
+ sreq = MPI. Isend (send_mesg, root, rank+ 32 , comm)
115
+ sreqs_workers[1 ] = sreq
116
+ status_worker = 0
117
+ end
118
+ # Check to see if there is an available message to receive
119
+ if status_worker == 0
120
+ (flag, status) = MPI. Test! (sreqs_workers[1 ])
121
+ if flag
122
+ status_worker = 1
123
+ end
124
+ end
125
+ end
126
+ end
127
+ MPI. Barrier (comm)
128
+ MPI. Finalize ()
129
+ end
130
+
131
+ f = x -> x.+ 100
132
+ data = collect (1 : 10 )
133
+ job_queue (data,f)
0 commit comments