Skip to content

Commit 0fe1e68

Browse files
author
José Valim
committed
Revert "Set up the monitor only on await for tasks"
This reverts commit 85a04fe.
1 parent 9e80f3b commit 0fe1e68

File tree

5 files changed

+168
-86
lines changed

5 files changed

+168
-86
lines changed

lib/elixir/lib/task.ex

Lines changed: 86 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ defmodule Task do
22
@moduledoc """
33
Conveniences for spawning and awaiting for tasks.
44
5-
Tasks are processes meant to execute one particular
6-
action throughout their life-cycle, often with little or no
7-
communication with other processes. The most common use case
8-
for tasks is to compute a value asynchronously:
5+
Tasks are processes that meant to execute one particular
6+
action throughout their life-cycle, often with little
7+
explicit communication with other processes. The most common
8+
use case for tasks is to compute a value asynchronously:
99
1010
task = Task.async(fn -> do_some_work() end)
1111
res = do_some_other_work()
@@ -16,21 +16,28 @@ defmodule Task do
1616
They are implemented by spawning a process that sends a message
1717
to the caller once the given computation is performed.
1818
19-
Besides `async/1` and `await/1`, tasks can also be used be
20-
started as part of supervision trees and dynamically spawned
21-
in remote nodes. We will explore all three scenarios next.
19+
Besides `async/1` and `await/1`, tasks can also be used as part
20+
of supervision trees and dynamically spawned in remote nodes.
21+
We will explore all three scenarios next.
2222
2323
## async and await
2424
2525
The most common way to spawn a task is with `Task.async/1`. A new
26-
process will be created and linked to the caller. Once the task
27-
action finishes, a message will be sent to the caller with its
28-
result.
26+
process will be created and this process is linked and monitored
27+
by the caller. However, the processes are unlinked right before
28+
the task finishes, allowing the proper error to be triggered only
29+
on `await/1`.
2930
30-
`Task.await/1` is used to read the message sent by the task. On
31-
await, Elixir will also setup a monitor to verify if the process
32-
exited with any abnormal reason (or in case exits are being
33-
trapped by the caller).
31+
This implies three things:
32+
33+
1) In case the caller crashes, the task will be killed and its
34+
computation will abort;
35+
36+
2) In case the task crashes due to an error, the parent will
37+
crash only on `await/1`;
38+
39+
3) In case the task crashes because a linked process caused
40+
it to crash, the parent will crash immediately;
3441
3542
## Supervised tasks
3643
@@ -48,9 +55,14 @@ defmodule Task do
4855
]
4956
5057
Since such tasks are supervised and not directly linked to
51-
the caller, they cannot be awaited on. Note `start_link/1`,
52-
differently from `async/1`, returns `{:ok, pid}` (which is
53-
the result expected by supervision trees).
58+
the caller, they cannot be awaited on. For such reason,
59+
differently from `async/1`, `start_link/1` returns `{:ok, pid}`
60+
(which is the result expected by supervision trees).
61+
62+
Such tasks are useful as workers that run during your application
63+
life-cycle and rarely communicate with other workers. For example,
64+
a worker that pushes data to another server or a worker that consumes
65+
events from an event manager and writes it to a log file.
5466
5567
## Supervision trees
5668
@@ -66,7 +78,7 @@ defmodule Task do
6678
# In the remote node
6779
Task.Supervisor.start_link(name: :tasks_sup)
6880
69-
# In the client
81+
# On the client
7082
Task.Supervisor.async({:tasks_sup, :remote@local}, fn -> do_work() end)
7183
7284
`Task.Supervisor` is more often started in your supervision tree as:
@@ -106,7 +118,7 @@ defmodule Task do
106118
"""
107119
@spec start_link(module, atom, [term]) :: {:ok, pid}
108120
def start_link(mod, fun, args) do
109-
Task.Supervised.start_link({mod, fun, args})
121+
Task.Supervised.start_link(:undefined, {mod, fun, args})
110122
end
111123

112124
@doc """
@@ -135,8 +147,9 @@ defmodule Task do
135147
@spec async(module, atom, [term]) :: t
136148
def async(mod, fun, args) do
137149
mfa = {mod, fun, args}
138-
ref = make_ref
139-
pid = :proc_lib.spawn_link(Task.Supervised, :async, [mfa, self(), ref])
150+
pid = :proc_lib.spawn_link(Task.Supervised, :async, [self(), mfa])
151+
ref = Process.monitor(pid)
152+
send(pid, {self(), ref})
140153
%Task{pid: pid, ref: ref}
141154
end
142155

@@ -148,24 +161,67 @@ defmodule Task do
148161
exit with the same reason as the task.
149162
"""
150163
@spec await(t, timeout) :: term | no_return
151-
def await(%Task{pid: pid, ref: ref}=task, timeout \\ 5000) do
152-
mon_ref = Process.monitor(pid)
153-
164+
def await(%Task{ref: ref}=task, timeout \\ 5000) do
154165
receive do
155166
{^ref, reply} ->
156-
Process.demonitor(mon_ref, [:flush])
167+
Process.demonitor(ref, [:flush])
157168
reply
158-
{:DOWN, ^mon_ref, _, _, :noconnection} ->
159-
exit({{:nodedown, get_node(task.pid)}, {__MODULE__, :await, [task, timeout]}})
160-
{:DOWN, ^mon_ref, _, _, reason} ->
169+
{:DOWN, ^ref, _, _, :noconnection} ->
170+
mfa = {__MODULE__, :await, [task, timeout]}
171+
exit({{:nodedown, get_node(task.pid)}, mfa})
172+
{:DOWN, ^ref, _, _, reason} ->
161173
exit({reason, {__MODULE__, :await, [task, timeout]}})
162174
after
163175
timeout ->
164-
Process.demonitor(mon_ref, [:flush])
176+
Process.demonitor(ref, [:flush])
165177
exit({:timeout, {__MODULE__, :await, [task, timeout]}})
166178
end
167179
end
168180

181+
@doc """
182+
Receives a group of tasks and a message and finds
183+
a task that matches the given message.
184+
185+
This function returns a tuple with the task and the
186+
returned value in case the message matches a task that
187+
exited with success, it raises in case the found task
188+
failed or nil if no task was found.
189+
190+
This function is useful in situations where multiple
191+
tasks are spawned and their results are collected just
192+
later on. For example, a GenServer can spawn tasks,
193+
store the tasks in a list and later use `Task.find/2`
194+
to see if upcoming messages are from any of the tasks.
195+
"""
196+
@spec find([t], any) :: {term, t} | nil | no_return
197+
def find(tasks, msg)
198+
199+
def find(tasks, {ref, reply}) when is_reference(ref) do
200+
Enum.find_value tasks, fn
201+
%Task{ref: task_ref} = t when ref == task_ref ->
202+
Process.demonitor(ref, [:flush])
203+
{reply, t}
204+
%Task{} ->
205+
nil
206+
end
207+
end
208+
209+
def find(tasks, {:DOWN, ref, _, _, reason} = msg) when is_reference(ref) do
210+
find = fn(%Task{ref: task_ref}) -> task_ref == ref end
211+
case Enum.find(tasks, find) do
212+
%Task{pid: pid} when reason == :noconnection ->
213+
exit({{:nodedown, get_node(pid)}, {__MODULE__, :find, [tasks, msg]}})
214+
%Task{} ->
215+
exit({reason, {__MODULE__, :find, [tasks, msg]}})
216+
nil ->
217+
nil
218+
end
219+
end
220+
221+
def find(_tasks, _msg) do
222+
nil
223+
end
224+
169225
defp get_node({_, n}) when is_atom(n), do: n
170-
defp get_node(pid) when is_pid(pid), do: pid
226+
defp get_node(pid) when is_pid(pid), do: pid
171227
end

lib/elixir/lib/task/supervised.ex

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,61 @@
11
defmodule Task.Supervised do
22
@moduledoc false
33

4-
def start_link(mfa) do
5-
:proc_lib.start_link(__MODULE__, :noreply, [mfa])
4+
def start_link(:undefined, fun) do
5+
:proc_lib.start_link(__MODULE__, :noreply, [fun])
66
end
77

8-
def start_link(mfa, caller, ref) do
9-
:proc_lib.start_link(__MODULE__, :reply, [mfa, caller, ref])
8+
def start_link(caller, fun) do
9+
:proc_lib.start_link(__MODULE__, :reply, [caller, fun])
1010
end
1111

12-
def async(mfa, caller, ref) do
13-
send caller, {ref, apply(mfa)}
12+
def async(caller, {module, fun, args}) do
13+
ref =
14+
# There is a race condition on this operation when working accross
15+
# node that manifests if a `Task.Supervisor.async/1` call is made
16+
# while the supervisor is busy spawning previous tasks.
17+
#
18+
# Imagine the following workflow:
19+
#
20+
# 1. The nodes disconnect
21+
# 2. The async call fails and is caught, the calling process does not exit
22+
# 3. The task is spawned and links to the calling process, causing the nodes to reconnect
23+
# 4. The calling process has not exited and so does not send its monitor reference
24+
# 5. The spawned task waits forever for the monitor reference so it can begin
25+
#
26+
# We have solved this by specifying a timeout of 5000 seconds.
27+
# Given no work is done in the client in between the task start and
28+
# sending the reference, 5000 should be enough to not raise false
29+
# negatives unless the nodes are indeed not available.
30+
receive do
31+
{^caller, ref} -> ref
32+
after
33+
5000 -> exit(:timeout)
34+
end
35+
36+
try do
37+
apply(module, fun, args)
38+
else
39+
result ->
40+
send caller, {ref, result}
41+
catch
42+
:error, reason ->
43+
exit({reason, System.stacktrace()})
44+
:throw, value ->
45+
exit({{:nocatch, value}, System.stacktrace()})
46+
after
47+
:erlang.unlink(caller)
48+
end
1449
end
1550

16-
def reply(mfa, caller, ref) do
51+
def reply(caller, mfa) do
1752
:erlang.link(caller)
1853
:proc_lib.init_ack({:ok, self()})
19-
send caller, {ref, apply(mfa)}
54+
async(caller, mfa)
2055
end
2156

22-
def noreply(mfa) do
57+
def noreply({module, fun, args}) do
2358
:proc_lib.init_ack({:ok, self()})
24-
apply(mfa)
25-
end
26-
27-
defp apply({module, fun, args}) do
2859
try do
2960
apply(module, fun, args)
3061
catch

lib/elixir/lib/task/supervisor.ex

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,9 @@ defmodule Task.Supervisor do
5555
"""
5656
@spec async(Supervisor.supervisor, module, atom, [term]) :: Task.t
5757
def async(supervisor, module, fun, args) do
58-
ref = make_ref()
59-
{:ok, pid} = Supervisor.start_child(supervisor, [{module, fun, args}, self(), ref])
58+
{:ok, pid} = Supervisor.start_child(supervisor, [self(), {module, fun, args}])
59+
ref = Process.monitor(pid)
60+
send pid, {self(), ref}
6061
%Task{pid: pid, ref: ref}
6162
end
6263

@@ -97,6 +98,6 @@ defmodule Task.Supervisor do
9798
"""
9899
@spec start_child(Supervisor.supervisor, module, atom, [term]) :: {:ok, pid}
99100
def start_child(supervisor, module, fun, args) do
100-
Supervisor.start_child(supervisor, [{module, fun, args}])
101+
Supervisor.start_child(supervisor, [:undefined, {module, fun, args}])
101102
end
102103
end

lib/elixir/test/elixir/task/supervisor_test.exs

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ defmodule Task.SupervisorTest do
4141
# Assert response and monitoring messages
4242
ref = task.ref
4343
assert_receive {^ref, :done}
44+
assert_receive {:DOWN, ^ref, _, _, :normal}
4445
end
4546

4647
test "async/3", config do
@@ -83,32 +84,21 @@ defmodule Task.SupervisorTest do
8384
assert Task.Supervisor.terminate_child(config[:supervisor], pid) == :ok
8485
end
8586

86-
@wait 100
87-
8887
test "await/1 exits on task throw", config do
89-
Process.flag(:trap_exit, true)
90-
task = Task.Supervisor.async(config[:supervisor], fn -> :timer.sleep(@wait); throw :unknown end)
88+
task = Task.Supervisor.async(config[:supervisor], fn -> throw :unknown end)
9189
assert {{{:nocatch, :unknown}, _}, {Task, :await, [^task, 5000]}} =
9290
catch_exit(Task.await(task))
93-
after
94-
Process.flag(:trap_exit, false)
9591
end
9692

9793
test "await/1 exits on task error", config do
98-
Process.flag(:trap_exit, true)
99-
task = Task.Supervisor.async(config[:supervisor], fn -> :timer.sleep(@wait); raise "oops" end)
94+
task = Task.Supervisor.async(config[:supervisor], fn -> raise "oops" end)
10095
assert {{%RuntimeError{}, _}, {Task, :await, [^task, 5000]}} =
10196
catch_exit(Task.await(task))
102-
after
103-
Process.flag(:trap_exit, false)
10497
end
10598

10699
test "await/1 exits on task exit", config do
107-
Process.flag(:trap_exit, true)
108-
task = Task.Supervisor.async(config[:supervisor], fn -> :timer.sleep(@wait); exit :unknown end)
100+
task = Task.Supervisor.async(config[:supervisor], fn -> exit :unknown end)
109101
assert {:unknown, {Task, :await, [^task, 5000]}} =
110102
catch_exit(Task.await(task))
111-
after
112-
Process.flag(:trap_exit, false)
113103
end
114104
end

0 commit comments

Comments
 (0)