Skip to content

Commit bba20dc

Browse files
authored
Refactor read-only argument handling in Task.Supervised (#6219)
1 parent 0ab18f5 commit bba20dc

File tree

1 file changed

+54
-41
lines changed

1 file changed

+54
-41
lines changed

lib/elixir/lib/task/supervised.ex

Lines changed: 54 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -164,37 +164,44 @@ defmodule Task.Supervised do
164164
# about our reference to it.
165165
send(monitor_pid, {parent, monitor_ref})
166166

167-
stream_reduce(acc, max_concurrency, _spawned = 0, _delivered = 0, _waiting = %{}, next,
168-
reducer, monitor_pid, monitor_ref, timeout, on_timeout)
167+
config = %{
168+
reducer: reducer,
169+
monitor_pid: monitor_pid,
170+
monitor_ref: monitor_ref,
171+
timeout: timeout,
172+
on_timeout: on_timeout,
173+
}
174+
stream_reduce(acc, max_concurrency, _spawned = 0, _delivered = 0, _waiting = %{}, next, config)
169175
end
170176

171-
defp stream_reduce({:halt, acc}, _max, _spawned, _delivered, _waiting, next,
172-
_reducer, monitor_pid, monitor_ref, timeout, _on_timeout) do
177+
defp stream_reduce({:halt, acc}, _max, _spawned, _delivered, _waiting, next, config) do
178+
%{monitor_pid: monitor_pid, monitor_ref: monitor_ref, timeout: timeout} = config
173179
stream_close(monitor_pid, monitor_ref, timeout)
174180
is_function(next) && next.({:halt, []})
175181
{:halted, acc}
176182
end
177183

178-
defp stream_reduce({:suspend, acc}, max, spawned, delivered, waiting, next,
179-
reducer, monitor_pid, monitor_ref, timeout, on_timeout) do
180-
continuation = &stream_reduce(&1, max, spawned, delivered, waiting, next, reducer, monitor_pid, monitor_ref, timeout, on_timeout)
184+
defp stream_reduce({:suspend, acc}, max, spawned, delivered, waiting, next, config) do
185+
continuation = &stream_reduce(&1, max, spawned, delivered, waiting, next, config)
181186
{:suspended, acc, continuation}
182187
end
183188

184189
# All spawned, all delivered, next is :done.
185190
defp stream_reduce({:cont, acc}, _max, spawned, delivered, _waiting, next,
186-
_reducer, monitor_pid, monitor_ref, timeout, _on_timeout)
191+
%{monitor_pid: monitor_pid, monitor_ref: monitor_ref, timeout: timeout})
187192
when spawned == delivered and next == :done do
188193
stream_close(monitor_pid, monitor_ref, timeout)
189194
{:done, acc}
190195
end
191196

192197
# No more tasks to spawn because max == 0 or next is :done. We wait for task
193198
# responses or tasks going down.
194-
defp stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next,
195-
reducer, monitor_pid, monitor_ref, timeout, on_timeout)
199+
defp stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next, config)
196200
when max == 0
197201
when next == :done do
202+
%{monitor_pid: monitor_pid, monitor_ref: monitor_ref,
203+
timeout: timeout, on_timeout: on_timeout} = config
204+
198205
receive do
199206
# The task at position "position" replied with "value". We put the
200207
# response in the "waiting" map and do nothing, since we'll only act on
@@ -203,8 +210,7 @@ defmodule Task.Supervised do
203210
{{^monitor_ref, position}, value} ->
204211
%{^position => {pid, :running}} = waiting
205212
waiting = Map.put(waiting, position, {pid, {:ok, value}})
206-
stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next,
207-
reducer, monitor_pid, monitor_ref, timeout, on_timeout)
213+
stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next, config)
208214

209215
# The task at position "position" died for some reason. We check if it
210216
# replied already (then the death is peaceful) or if it's still running
@@ -231,8 +237,7 @@ defmodule Task.Supervised do
231237
Map.put(waiting, position, {nil, {:exit, :timeout}})
232238
end
233239
end
234-
stream_deliver({:cont, acc}, max + 1, spawned, delivered, waiting, next,
235-
reducer, monitor_pid, monitor_ref, timeout, on_timeout)
240+
stream_deliver({:cont, acc}, max + 1, spawned, delivered, waiting, next, config)
236241

237242
# The monitor process died. We just cleanup the messages from the monitor
238243
# process and exit.
@@ -242,8 +247,9 @@ defmodule Task.Supervised do
242247
end
243248
end
244249

245-
defp stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next,
246-
reducer, monitor_pid, monitor_ref, timeout, on_timeout) do
250+
defp stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next, config) do
251+
%{monitor_pid: monitor_pid, monitor_ref: monitor_ref,
252+
timeout: timeout} = config
247253
try do
248254
next.({:cont, []})
249255
catch
@@ -254,30 +260,28 @@ defmodule Task.Supervised do
254260
else
255261
{:suspended, [value], next} ->
256262
waiting = stream_spawn(value, spawned, waiting, monitor_pid, monitor_ref, timeout)
257-
stream_reduce({:cont, acc}, max - 1, spawned + 1, delivered, waiting, next,
258-
reducer, monitor_pid, monitor_ref, timeout, on_timeout)
263+
stream_reduce({:cont, acc}, max - 1, spawned + 1, delivered, waiting, next, config)
259264
{_, [value]} ->
260265
waiting = stream_spawn(value, spawned, waiting, monitor_pid, monitor_ref, timeout)
261-
stream_reduce({:cont, acc}, max - 1, spawned + 1, delivered, waiting, :done,
262-
reducer, monitor_pid, monitor_ref, timeout, on_timeout)
266+
stream_reduce({:cont, acc}, max - 1, spawned + 1, delivered, waiting, :done, config)
263267
{_, []} ->
264-
stream_reduce({:cont, acc}, max, spawned, delivered, waiting, :done,
265-
reducer, monitor_pid, monitor_ref, timeout, on_timeout)
268+
stream_reduce({:cont, acc}, max, spawned, delivered, waiting, :done, config)
266269
end
267270
end
268271

269-
defp stream_deliver({:suspend, acc}, max, spawned, delivered, waiting, next,
270-
reducer, monitor_pid, monitor_ref, timeout, on_timeout) do
271-
continuation = &stream_deliver(&1, max, spawned, delivered, waiting, next, reducer, monitor_pid, monitor_ref, timeout, on_timeout)
272+
defp stream_deliver({:suspend, acc}, max, spawned, delivered, waiting, next, config) do
273+
continuation = &stream_deliver(&1, max, spawned, delivered, waiting, next, config)
272274
{:suspended, acc, continuation}
273275
end
274-
defp stream_deliver({:halt, acc}, max, spawned, delivered, waiting, next,
275-
reducer, monitor_pid, monitor_ref, timeout, on_timeout) do
276-
stream_reduce({:halt, acc}, max, spawned, delivered, waiting, next,
277-
reducer, monitor_pid, monitor_ref, timeout, on_timeout)
276+
277+
defp stream_deliver({:halt, acc}, max, spawned, delivered, waiting, next, config) do
278+
stream_reduce({:halt, acc}, max, spawned, delivered, waiting, next, config)
278279
end
279-
defp stream_deliver({:cont, acc}, max, spawned, delivered, waiting, next,
280-
reducer, monitor_pid, monitor_ref, timeout, on_timeout) do
280+
281+
defp stream_deliver({:cont, acc}, max, spawned, delivered, waiting, next, config) do
282+
%{reducer: reducer, monitor_pid: monitor_pid,
283+
monitor_ref: monitor_ref, timeout: timeout} = config
284+
281285
case waiting do
282286
%{^delivered => {nil, reply}} ->
283287
try do
@@ -290,12 +294,10 @@ defmodule Task.Supervised do
290294
:erlang.raise(kind, reason, stacktrace)
291295
else
292296
pair ->
293-
stream_deliver(pair, max, spawned, delivered + 1, Map.delete(waiting, delivered), next,
294-
reducer, monitor_pid, monitor_ref, timeout, on_timeout)
297+
stream_deliver(pair, max, spawned, delivered + 1, Map.delete(waiting, delivered), next, config)
295298
end
296299
%{} ->
297-
stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next,
298-
reducer, monitor_pid, monitor_ref, timeout, on_timeout)
300+
stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next, config)
299301
end
300302
end
301303

@@ -356,13 +358,24 @@ defmodule Task.Supervised do
356358
# process waits, this process dies with the same reason.
357359
receive do
358360
{^parent_pid, monitor_ref} ->
359-
stream_monitor_loop(parent_pid, parent_ref, mfa, spawn, monitor_ref, _running_tasks = %{}, timeout)
361+
config = %{
362+
parent_pid: parent_pid,
363+
parent_ref: parent_ref,
364+
mfa: mfa,
365+
spawn: spawn,
366+
monitor_ref: monitor_ref,
367+
timeout: timeout,
368+
}
369+
stream_monitor_loop(_running_tasks = %{}, config)
360370
{:DOWN, ^parent_ref, _, _, reason} ->
361371
exit(reason)
362372
end
363373
end
364374

365-
defp stream_monitor_loop(parent_pid, parent_ref, mfa, spawn, monitor_ref, running_tasks, timeout) do
375+
defp stream_monitor_loop(running_tasks, config) do
376+
%{parent_pid: parent_pid, parent_ref: parent_ref, mfa: mfa,
377+
spawn: spawn, monitor_ref: monitor_ref, timeout: timeout} = config
378+
366379
receive do
367380
# The parent process is telling us to spawn a new task to process
368381
# "value". We spawn it and notify the parent about its pid.
@@ -379,7 +392,7 @@ defmodule Task.Supervised do
379392
timed_out?: false,
380393
}
381394
running_tasks = Map.put(running_tasks, ref, task_info)
382-
stream_monitor_loop(parent_pid, parent_ref, mfa, spawn, monitor_ref, running_tasks, timeout)
395+
stream_monitor_loop(running_tasks, config)
383396

384397
# The parent process is telling us to stop because the stream is being
385398
# closed. In this case, we forcely kill all spawned processes and then
@@ -410,7 +423,7 @@ defmodule Task.Supervised do
410423
:ok = Process.cancel_timer(timer_ref, async: true, info: false)
411424
message_kind = if(timed_out?, do: :timed_out, else: :down)
412425
send(parent_pid, {message_kind, {monitor_ref, position}, reason})
413-
stream_monitor_loop(parent_pid, parent_ref, mfa, spawn, monitor_ref, running_tasks, timeout)
426+
stream_monitor_loop(running_tasks, config)
414427

415428
# One of the spawned processes timed out. We kill that process here
416429
# regardless of the value of :on_timeout. We then send a message to the
@@ -425,10 +438,10 @@ defmodule Task.Supervised do
425438
_other ->
426439
running_tasks
427440
end
428-
stream_monitor_loop(parent_pid, parent_ref, mfa, spawn, monitor_ref, running_tasks, timeout)
441+
stream_monitor_loop(running_tasks, config)
429442

430443
{:EXIT, _, _} ->
431-
stream_monitor_loop(parent_pid, parent_ref, mfa, spawn, monitor_ref, running_tasks, timeout)
444+
stream_monitor_loop(running_tasks, config)
432445
end
433446
end
434447

0 commit comments

Comments
 (0)