Skip to content

Commit cf8e8f9

Browse files
committed
WIP Logs for flake
1 parent 69aed84 commit cf8e8f9

File tree

2 files changed

+27
-18
lines changed

2 files changed

+27
-18
lines changed

deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ init({Upstream, Queue}) when ?is_amqqueue(Queue) ->
5454
QName = amqqueue:get_name(Queue),
5555
case rabbit_amqqueue:lookup(QName) of
5656
{ok, Q} ->
57+
rabbit_federation_link_util:log_debug(QName, "TRACE starting link", []),
5758
DeobfuscatedUpstream = rabbit_federation_util:deobfuscate_upstream(Upstream),
5859
DeobfuscatedUParams = rabbit_federation_upstream:to_params(DeobfuscatedUpstream, Queue),
5960
UParams = rabbit_federation_util:obfuscate_upstream_params(DeobfuscatedUParams),
@@ -62,6 +63,7 @@ init({Upstream, Queue}) when ?is_amqqueue(Queue) ->
6263
join({rabbit_federation_queue, QName}),
6364
gen_server2:cast(self(), maybe_go),
6465
rabbit_amqqueue:notify_decorators(Q),
66+
rabbit_federation_link_util:log_debug(QName, "TRACE starting link, not started", []),
6567
{ok, #not_started{queue = Queue,
6668
run = false,
6769
upstream = Upstream,
@@ -74,7 +76,8 @@ init({Upstream, Queue}) when ?is_amqqueue(Queue) ->
7476
handle_call(Msg, _From, State) ->
7577
{stop, {unexpected_call, Msg}, {unexpected_call, Msg}, State}.
7678

77-
handle_cast(maybe_go, State) ->
79+
handle_cast(maybe_go, #not_started{queue = Queue} = State) ->
80+
rabbit_federation_link_util:log_debug(amqqueue:get_name(Queue), "TRACE maybe_go", []),
7881
go(State);
7982

8083
handle_cast(go, State = #not_started{}) ->
@@ -111,7 +114,8 @@ handle_cast(pause, State = #state{ch = Ch, upstream = Upstream}) ->
111114
handle_cast(Msg, State) ->
112115
{stop, {unexpected_cast, Msg}, State}.
113116

114-
handle_info(#'basic.consume_ok'{}, State) ->
117+
handle_info(#'basic.consume_ok'{}, #state{queue = Queue} = State) ->
118+
rabbit_federation_link_util:log_debug(amqqueue:get_name(Queue), "TRACE received basic.consume_ok", []),
115119
{noreply, State};
116120

117121
handle_info(#'basic.ack'{} = Ack, State = #state{ch = Ch,
@@ -222,7 +226,8 @@ go(S0 = #not_started{run = Run,
222226
true -> consume(Ch, Upstream, UQueue);
223227
false -> ok
224228
end,
225-
{noreply, #state{queue = Queue,
229+
rabbit_federation_link_util:log_debug(QName, "TRACE go, started connection and channel", []),
230+
{noreply, #state{queue = Queue,
226231
run = Run,
227232
conn = Conn,
228233
ch = Ch,

deps/rabbitmq_federation/test/queue_SUITE.erl

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ multiple_upstreams_pattern(Config) ->
185185
TargetArgs = ?config(target_queue_args, Config),
186186
with_ch(Config,
187187
fun (Ch) ->
188-
expect_federation(Ch, <<"upstream">>, <<"pattern.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
189-
expect_federation(Ch, <<"upstream2">>, <<"pattern.downstream">>, ?EXPECT_FEDERATION_TIMEOUT)
188+
expect_federation(Config, Ch, <<"upstream">>, <<"pattern.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
189+
expect_federation(Config, Ch, <<"upstream2">>, <<"pattern.downstream">>, ?EXPECT_FEDERATION_TIMEOUT)
190190
end, [q(<<"upstream">>, SourceArgs),
191191
q(<<"upstream2">>, SourceArgs),
192192
q(<<"pattern.downstream">>, TargetArgs)]),
@@ -200,8 +200,8 @@ multiple_downstreams(Config) ->
200200
with_ch(Config,
201201
fun (Ch) ->
202202
timer:sleep(?INITIAL_WAIT),
203-
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
204-
expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT)
203+
expect_federation(Config, Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
204+
expect_federation(Config, Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT)
205205
end, upstream_downstream(Config) ++ [q(<<"fed.downstream2">>, Args)]).
206206

207207
message_flow(Config) ->
@@ -210,8 +210,8 @@ message_flow(Config) ->
210210
with_ch(Config,
211211
fun (Ch) ->
212212
timer:sleep(?INITIAL_WAIT),
213-
publish_expect(Ch, <<>>, <<"one">>, <<"one">>, <<"first one">>, ?EXPECT_FEDERATION_TIMEOUT),
214-
publish_expect(Ch, <<>>, <<"two">>, <<"two">>, <<"first two">>, ?EXPECT_FEDERATION_TIMEOUT),
213+
publish_expect(Config, Ch, <<>>, <<"one">>, <<"one">>, <<"first one">>, ?EXPECT_FEDERATION_TIMEOUT),
214+
publish_expect(Config, Ch, <<>>, <<"two">>, <<"two">>, <<"first two">>, ?EXPECT_FEDERATION_TIMEOUT),
215215
Seq = lists:seq(1, 50),
216216
[publish(Ch, <<>>, <<"one">>, <<"bulk">>) || _ <- Seq],
217217
[publish(Ch, <<>>, <<"two">>, <<"bulk">>) || _ <- Seq],
@@ -236,7 +236,7 @@ dynamic_reconfiguration(Config) ->
236236
with_ch(Config,
237237
fun (Ch) ->
238238
timer:sleep(?INITIAL_WAIT),
239-
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
239+
expect_federation(Config, Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
240240

241241
%% Test that clearing connections works
242242
clear_upstream(Config, 0, <<"localhost">>),
@@ -257,8 +257,8 @@ federate_unfederate(Config) ->
257257
with_ch(Config,
258258
fun (Ch) ->
259259
timer:sleep(?INITIAL_WAIT),
260-
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
261-
expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT),
260+
expect_federation(Config, Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
261+
expect_federation(Config, Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT),
262262

263263
%% clear the policy
264264
rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"fed">>),
@@ -279,8 +279,8 @@ dynamic_plugin_stop_start(Config) ->
279279
timer:sleep(?INITIAL_WAIT),
280280
UpQ = <<"upstream">>,
281281
DownQ1 = <<"fed.downstream">>,
282-
expect_federation(Ch, UpQ, DownQ1, ?EXPECT_FEDERATION_TIMEOUT),
283-
expect_federation(Ch, UpQ, DownQ2, ?EXPECT_FEDERATION_TIMEOUT),
282+
expect_federation(Config, Ch, UpQ, DownQ1, ?EXPECT_FEDERATION_TIMEOUT),
283+
expect_federation(Config, Ch, UpQ, DownQ2, ?EXPECT_FEDERATION_TIMEOUT),
284284

285285
%% Disable the plugin, the link disappears
286286
ct:pal("Stopping rabbitmq_federation"),
@@ -310,7 +310,7 @@ dynamic_plugin_stop_start(Config) ->
310310
],
311311
length(L) =:= 2
312312
end),
313-
expect_federation(Ch, UpQ, DownQ1, 120000)
313+
expect_federation(Config, Ch, UpQ, DownQ1, 120000)
314314
end, upstream_downstream(Config) ++ [q(DownQ2, Args)]).
315315

316316
restart_upstream(Config) ->
@@ -362,8 +362,12 @@ publish_expect(Ch, X, Key, Q, Payload) ->
362362
publish(Ch, X, Key, Payload),
363363
expect(Ch, Q, [Payload]).
364364

365-
publish_expect(Ch, X, Key, Q, Payload, Timeout) ->
365+
publish_expect(Config, Ch, X, Key, Q, Payload, Timeout) ->
366366
publish(Ch, X, Key, Payload),
367+
ct:pal("Messages in broker ~p ", [rabbit_ct_broker_helpers:rabbitmqctl_list(Config, 0, ["list_queues", "name", "messages", "messages_ready"])]),
368+
Status = rabbit_ct_broker_helpers:rpc(Config, 0,
369+
rabbit_federation_status, status, []),
370+
ct:pal("Federation status ~p", [Status]),
367371
expect(Ch, Q, [Payload], Timeout).
368372

369373
%% Doubled due to our strange basic.get behaviour.
@@ -376,10 +380,10 @@ expect_federation(Ch, UpstreamQ, DownstreamQ) ->
376380
Payload = <<Base/binary, "-to-", UpstreamQ/binary>>,
377381
publish_expect(Ch, <<>>, UpstreamQ, DownstreamQ, Payload).
378382

379-
expect_federation(Ch, UpstreamQ, DownstreamQ, Timeout) ->
383+
expect_federation(Config, Ch, UpstreamQ, DownstreamQ, Timeout) ->
380384
Base = <<"HELLO">>,
381385
Payload = <<Base/binary, "-to-", UpstreamQ/binary>>,
382-
publish_expect(Ch, <<>>, UpstreamQ, DownstreamQ, Payload, Timeout).
386+
publish_expect(Config, Ch, <<>>, UpstreamQ, DownstreamQ, Payload, Timeout).
383387

384388
expect_no_federation(Ch, UpstreamQ, DownstreamQ) ->
385389
publish(Ch, <<>>, UpstreamQ, <<"HELLO">>),

0 commit comments

Comments
 (0)