Skip to content

Commit d27b285

Browse files
author
José Valim
committed
Revert "Add support for supervisor fun to Task.Supervisor.async_stream/* (#6552)"
This reverts commit 91061bf.
1 parent 1684c80 commit d27b285

File tree

2 files changed

+5
-121
lines changed

2 files changed

+5
-121
lines changed

lib/elixir/lib/task/supervisor.ex

Lines changed: 5 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,6 @@ defmodule Task.Supervisor do
2020
| {:restart, :supervisor.restart()}
2121
| {:shutdown, :supervisor.shutdown()}
2222

23-
@typedoc "Supervisor spec used by `async_stream`"
24-
@type async_stream_supervisor ::
25-
Supervisor.supervisor()
26-
| (term -> Supervisor.supervisor())
27-
2823
@doc false
2924
def child_spec(arg) do
3025
%{
@@ -158,15 +153,6 @@ defmodule Task.Supervisor do
158153
own task. The tasks will be spawned under the given `supervisor` and
159154
linked to the current process, similarly to `async/4`.
160155
161-
You may also provide a function as the `supervisor`. Before each task is
162-
started, the function will be invoked (in a new process which is linked to
163-
the current process) with the stream entry that the to-be-spawned task will
164-
process as its argument. The function should return a supervisor pid or name,
165-
which will be used to spawn the task. This allows one to dynamically start
166-
tasks in different locations in the supervision tree(s) on the local (or
167-
another) node. Notably, this enables the distribution of concurrent stream
168-
tasks over multiple nodes.
169-
170156
When streamed, each task will emit `{:ok, value}` upon successful
171157
completion or `{:exit, reason}` if the caller is trapping exits.
172158
Results are emitted in the same order as the original `enumerable`.
@@ -207,7 +193,7 @@ defmodule Task.Supervisor do
207193
Enum.to_list(stream)
208194
209195
"""
210-
@spec async_stream(async_stream_supervisor, Enumerable.t(), module, atom, [term], keyword) ::
196+
@spec async_stream(Supervisor.supervisor(), Enumerable.t(), module, atom, [term], keyword) ::
211197
Enumerable.t()
212198
def async_stream(supervisor, enumerable, module, function, args, options \\ [])
213199
when is_atom(module) and is_atom(function) and is_list(args) do
@@ -224,7 +210,7 @@ defmodule Task.Supervisor do
224210
225211
See `async_stream/6` for discussion, options, and examples.
226212
"""
227-
@spec async_stream(async_stream_supervisor, Enumerable.t(), (term -> term), keyword) ::
213+
@spec async_stream(Supervisor.supervisor(), Enumerable.t(), (term -> term), keyword) ::
228214
Enumerable.t()
229215
def async_stream(supervisor, enumerable, fun, options \\ []) when is_function(fun, 1) do
230216
build_stream(supervisor, :link, enumerable, fun, options)
@@ -241,7 +227,7 @@ defmodule Task.Supervisor do
241227
See `async_stream/6` for discussion, options, and examples.
242228
"""
243229
@spec async_stream_nolink(
244-
async_stream_supervisor,
230+
Supervisor.supervisor(),
245231
Enumerable.t(),
246232
module,
247233
atom,
@@ -263,7 +249,7 @@ defmodule Task.Supervisor do
263249
264250
See `async_stream/6` for discussion and examples.
265251
"""
266-
@spec async_stream_nolink(async_stream_supervisor, Enumerable.t(), (term -> term), keyword) ::
252+
@spec async_stream_nolink(Supervisor.supervisor(), Enumerable.t(), (term -> term), keyword) ::
267253
Enumerable.t()
268254
def async_stream_nolink(supervisor, enumerable, fun, options \\ []) when is_function(fun, 1) do
269255
build_stream(supervisor, :nolink, enumerable, fun, options)
@@ -354,27 +340,12 @@ defmodule Task.Supervisor do
354340
%Task{pid: pid, ref: ref, owner: owner}
355341
end
356342

357-
defp supervisor_fun(supervisor, {_module, _fun, _args})
358-
when is_function(supervisor, 1) do
359-
fn {_module, _fun, [entry | _rest_args]} -> supervisor.(entry) end
360-
end
361-
362-
defp supervisor_fun(supervisor, fun)
363-
when is_function(supervisor, 1) and is_function(fun, 1) do
364-
fn {_erlang, _apply, [_fun, [entry]]} -> supervisor.(entry) end
365-
end
366-
367-
defp supervisor_fun(supervisor, _fun) do
368-
fn _mfa -> supervisor end
369-
end
370-
371343
defp build_stream(supervisor, link_type, enumerable, fun, options) do
372-
supervisor_fun = supervisor_fun(supervisor, fun)
373344
shutdown = options[:shutdown]
374345

375346
&Task.Supervised.stream(enumerable, &1, &2, fun, options, fn owner, mfa ->
376347
args = [owner, :monitor, get_info(owner), mfa]
377-
{:ok, pid} = start_child_with_spec(supervisor_fun.(mfa), args, :temporary, shutdown)
348+
{:ok, pid} = start_child_with_spec(supervisor, args, :temporary, shutdown)
378349
if link_type == :link, do: Process.link(pid)
379350
{link_type, pid}
380351
end)

lib/elixir/test/elixir/task/supervisor_test.exs

Lines changed: 0 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,6 @@ defmodule Task.SupervisorTest do
2222
number
2323
end
2424

25-
def sleep_and_return_ancestor(number, :another_arg) do
26-
sleep_and_return_ancestor(number)
27-
end
28-
29-
def sleep_and_return_ancestor(number) do
30-
Process.sleep(number)
31-
{:dictionary, dictionary} = Process.info(self(), :dictionary)
32-
33-
dictionary
34-
|> Keyword.get(:"$ancestors")
35-
|> List.first()
36-
end
37-
3825
test "can be supervised directly", config do
3926
modules = [{Task.Supervisor, name: config.test}]
4027
assert {:ok, _} = Supervisor.start_link(modules, strategy: :one_for_one)
@@ -319,80 +306,6 @@ defmodule Task.SupervisorTest do
319306

320307
refute_received _
321308
end
322-
323-
test "streams an enumerable with fun and supervisor fun", %{supervisor: supervisor} do
324-
{:ok, other_supervisor} = Task.Supervisor.start_link()
325-
326-
assert fn i -> if rem(i, 2) == 0, do: supervisor, else: other_supervisor end
327-
|> Task.Supervisor.async_stream(1..4, &sleep_and_return_ancestor/1, @opts)
328-
|> Enum.to_list() ==
329-
[ok: other_supervisor, ok: supervisor, ok: other_supervisor, ok: supervisor]
330-
end
331-
332-
test "streams an enumerable with mfa and supervisor fun", %{supervisor: supervisor} do
333-
{:ok, other_supervisor} = Task.Supervisor.start_link()
334-
fun = :sleep_and_return_ancestor
335-
336-
assert fn i -> if rem(i, 2) == 0, do: supervisor, else: other_supervisor end
337-
|> Task.Supervisor.async_stream(1..4, __MODULE__, fun, [], @opts)
338-
|> Enum.to_list() ==
339-
[ok: other_supervisor, ok: supervisor, ok: other_supervisor, ok: supervisor]
340-
end
341-
342-
test "streams an enumerable with mfa with args and supervisor fun", %{supervisor: supervisor} do
343-
{:ok, other_supervisor} = Task.Supervisor.start_link()
344-
fun = :sleep_and_return_ancestor
345-
346-
assert fn i -> if rem(i, 2) == 0, do: supervisor, else: other_supervisor end
347-
|> Task.Supervisor.async_stream(1..4, __MODULE__, fun, [:another_arg], @opts)
348-
|> Enum.to_list() ==
349-
[ok: other_supervisor, ok: supervisor, ok: other_supervisor, ok: supervisor]
350-
end
351-
352-
test "streams an enumerable with fun and executes supervisor fun in monitor process",
353-
context do
354-
%{supervisor: supervisor} = context
355-
parent = self()
356-
357-
supervisor_fun = fn _i ->
358-
{:links, links} = Process.info(self(), :links)
359-
assert parent in links
360-
send(parent, {parent, self()})
361-
supervisor
362-
end
363-
364-
assert supervisor_fun
365-
|> Task.Supervisor.async_stream(1..4, &sleep_and_return_ancestor/1, @opts)
366-
|> Enum.to_list() == [ok: supervisor, ok: supervisor, ok: supervisor, ok: supervisor]
367-
368-
receive do
369-
{^parent, linked} ->
370-
for _ <- 1..3, do: assert_received({^parent, ^linked})
371-
after
372-
0 ->
373-
flunk("Did not receive any message from monitor process.")
374-
end
375-
end
376-
377-
test "streams an enumerable with fun and bad supervisor fun" do
378-
Process.flag(:trap_exit, true)
379-
380-
stream =
381-
fn _i -> raise "bad" end
382-
|> Task.Supervisor.async_stream(1..4, &sleep_and_return_ancestor/1, @opts)
383-
384-
assert {{%RuntimeError{message: "bad"}, _stacktrace}, _mfa} = catch_exit(Stream.run(stream))
385-
386-
refute_received _
387-
388-
stream =
389-
fn _i -> :not_a_supervisor end
390-
|> Task.Supervisor.async_stream(1..4, &sleep_and_return_ancestor/1, @opts)
391-
392-
assert {{:noproc, _stacktrace}, _mfa} = catch_exit(Stream.run(stream))
393-
394-
refute_received _
395-
end
396309
end
397310

398311
describe "async_stream_nolink" do

0 commit comments

Comments
 (0)