Skip to content

Commit 67cd575

Browse files
whatyouhideJosé Valim
authored andcommitted
Check the return value of the :hash function (#251)
1 parent 28c76d2 commit 67cd575

File tree

2 files changed

+22
-3
lines changed

2 files changed

+22
-3
lines changed

lib/gen_stage/dispatchers/partition_dispatcher.ex

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ defmodule GenStage.PartitionDispatcher do
2020
named `:odd` and `:even`.
2121
2222
* `:hash` - the hashing algorithm, which receives the event and returns
23-
a tuple with two elements, the event to be dispatched as first argument
23+
a tuple with two elements, the event to be dispatched as first argument
2424
and the partition as second. The partition must be one of the partitions
25-
specified in `:partitions` above. The default uses
25+
specified in `:partitions` above. The default uses
2626
`fn event -> {event, :erlang.phash2(event, Enum.count(partitions))} end`
2727
on the event to select the partition.
2828
@@ -242,7 +242,14 @@ defmodule GenStage.PartitionDispatcher do
242242
{deliver_now, deliver_later, waiting} = split_events(events, waiting, [])
243243

244244
for event <- deliver_now do
245-
{event, partition} = hash.(event)
245+
{event, partition} =
246+
case hash.(event) do
247+
{event, partition} ->
248+
{event, partition}
249+
250+
other ->
251+
raise "the :hash function should return {event, partition}, got: #{inspect(other)}"
252+
end
246253

247254
case :erlang.get(partition) do
248255
:undefined ->

test/gen_stage/partition_dispatcher_test.exs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,18 @@ defmodule GenStage.PartitionDispatcherTest do
208208
assert error =~ "The known partitions are [:bar, :foo]"
209209
end
210210

211+
test "errors if the :hash function returns a bad value" do
212+
pid = self()
213+
ref = make_ref()
214+
disp = dispatcher(partitions: [:foo, :bar], hash: fn _ -> :not_a_tuple end)
215+
{:ok, 0, disp} = D.subscribe([partition: :foo], {pid, ref}, disp)
216+
{:ok, 3, disp} = D.ask(3, {pid, ref}, disp)
217+
218+
assert_raise RuntimeError, ~r/the :hash function should return/, fn ->
219+
D.dispatch([1, 2, 5], 3, disp)
220+
end
221+
end
222+
211223
test "errors on init" do
212224
assert_raise ArgumentError, ~r/the enumerable of :partitions is required/, fn ->
213225
dispatcher([])

0 commit comments

Comments
 (0)