Skip to content

Commit 5a7c9bb

Browse files
authored
Merge pull request #97 from nsweeting/v0.19.0
Ensure :EXIT signals go through handle_error/1
2 parents 11d090e + 14f1751 commit 5a7c9bb

File tree

3 files changed

+81
-1
lines changed

3 files changed

+81
-1
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ on:
1010

1111
jobs:
1212
test:
13-
runs-on: ubuntu-latest
13+
runs-on: ubuntu-20.04
1414
name: Elixir ${{ matrix.elixir }} / OTP ${{ matrix.otp }}
1515
strategy:
1616
matrix:

lib/rabbit/consumer/executer.ex

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ defmodule Rabbit.Consumer.Executer do
3636
@doc false
3737
@impl GenServer
3838
def init({message, opts}) do
39+
Process.flag(:trap_exit, true)
40+
3941
with {:ok, opts} <- validate_opts(opts, @opts_schema) do
4042
state = init_state(message, opts)
4143
set_timeout(state.timeout)
@@ -58,6 +60,17 @@ defmodule Rabbit.Consumer.Executer do
5860
{:stop, :timeout, state}
5961
end
6062

63+
def handle_info({:EXIT, pid1, reason}, %{executer: pid2} = state) when pid1 == pid2 do
64+
{reason, stack} =
65+
case reason do
66+
{%_{} = reason, stack} -> {reason, stack}
67+
reason -> {reason, []}
68+
end
69+
70+
handle_error(state, reason, stack)
71+
{:stop, reason, state}
72+
end
73+
6174
@doc false
6275
@impl GenServer
6376
def handle_cast({:complete, ref1}, %{executer_ref: ref2} = state) when ref1 == ref2 do

test/consumer_test.exs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,36 @@ defmodule Rabbit.ConsumerTest do
101101
end
102102
end
103103

104+
defmodule ErrorTestConsumer do
105+
use Rabbit.Consumer
106+
107+
@impl Rabbit.Consumer
108+
def init(:consumer, opts) do
109+
{:ok, opts}
110+
end
111+
112+
@impl Rabbit.Consumer
113+
def handle_setup(state) do
114+
AMQP.Queue.declare(state.channel, state.queue, auto_delete: true)
115+
AMQP.Queue.purge(state.channel, state.queue)
116+
:ok
117+
end
118+
119+
@impl Rabbit.Consumer
120+
def handle_message(msg) do
121+
decoded_payload = Base.decode64!(msg.payload)
122+
{_, _, fun} = :erlang.binary_to_term(decoded_payload)
123+
fun.()
124+
end
125+
126+
@impl Rabbit.Consumer
127+
def handle_error(msg) do
128+
decoded_payload = Base.decode64!(msg.payload)
129+
{pid, ref, _} = :erlang.binary_to_term(decoded_payload)
130+
send(pid, {:handle_error, ref, msg})
131+
end
132+
end
133+
104134
setup do
105135
{:ok, connection} = Connection.start_link(TestConnection)
106136
{:ok, producer} = Producer.start_link(TestProducer, connection: connection)
@@ -263,6 +293,43 @@ defmodule Rabbit.ConsumerTest do
263293
assert msg.custom_meta == %{foo: "bar"}
264294
end
265295

296+
test "will run the handle_error/1 callback on exceptions", meta do
297+
{:ok, _consumer, queue} = start_consumer(ErrorTestConsumer, meta, [])
298+
error = fn -> raise "boom" end
299+
ref = publish_message(meta, queue, msg: error)
300+
301+
assert_receive {:handle_error, ^ref, msg}
302+
assert msg.error_reason == %RuntimeError{message: "boom"}
303+
end
304+
305+
test "will run the handle_error/1 callback on linked processes crashing", meta do
306+
{:ok, _consumer, queue} = start_consumer(ErrorTestConsumer, meta, [])
307+
308+
error = fn ->
309+
spawn_link(fn -> raise "boom" end)
310+
:timer.sleep(5000)
311+
end
312+
313+
ref = publish_message(meta, queue, msg: error)
314+
315+
assert_receive {:handle_error, ^ref, msg}
316+
assert msg.error_reason == %RuntimeError{message: "boom"}
317+
end
318+
319+
test "will run the handle_error/1 callback on process exits", meta do
320+
{:ok, _consumer, queue} = start_consumer(ErrorTestConsumer, meta, [])
321+
322+
error = fn ->
323+
Process.exit(self(), :boom)
324+
:timer.sleep(5000)
325+
end
326+
327+
ref = publish_message(meta, queue, msg: error)
328+
329+
assert_receive {:handle_error, ^ref, msg}
330+
assert msg.error_reason == :boom
331+
end
332+
266333
defp start_consumer(meta, opts \\ []), do: start_consumer(TestConsumer, meta, opts)
267334

268335
defp start_consumer(module, meta, opts) do

0 commit comments

Comments
 (0)