Skip to content

Commit 58f3904

Browse files
committed
Add timestamp field to track consumer status changes
1 parent 5e76721 commit 58f3904

File tree

3 files changed

+56
-39
lines changed

3 files changed

+56
-39
lines changed

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ do_handle_group_connection_reconnected(Pid, #?MODULE{groups = Groups0} = S0,
361361
lists:foldr(
362362
fun(#consumer{pid = P, status = {_, St}} = C, {L, _})
363363
when P == Pid ->
364-
{[C#consumer{status = {?CONNECTED, St}} | L], true};
364+
{[csr_status(C, {?CONNECTED, St}) | L], true};
365365
(C, {L, UpdatedFlag}) ->
366366
{[C | L], UpdatedFlag or false}
367367
end, {[], false}, Consumers0),
@@ -390,7 +390,7 @@ handle_forgotten_active_reconnected(Pid,
390390
pid = P,
391391
subscription_id = SID} = C, {Cs, Eff})
392392
when P =:= Pid andalso St =:= ?FORG_ACT ->
393-
{[C#consumer{status = ?CONN_WAIT} | Cs],
393+
{[csr_status(C, ?CONN_WAIT) | Cs],
394394
[notify_consumer_effect(Pid, SID, S,
395395
Name, false, true) | Eff]};
396396
(C, {Cs, Eff}) ->
@@ -403,20 +403,20 @@ handle_forgotten_active_reconnected(Pid,
403403
when P =:= Pid andalso St =:= ?FORG_ACT ->
404404
%% update forgotten active
405405
%% tell it to step down
406-
{[C#consumer{status = ?CONN_WAIT} | Cs],
406+
{[csr_status(C, ?CONN_WAIT) | Cs],
407407
[notify_consumer_effect(P, SID, S,
408408
Name, false, true) | Eff]};
409409
(#consumer{status = {?FORGOTTTEN, _},
410410
pid = P} = C, {Cs, Eff})
411411
when P =:= Pid ->
412412
%% update forgotten
413-
{[C#consumer{status = ?CONN_WAIT} | Cs], Eff};
413+
{[csr_status(C, ?CONN_WAIT) | Cs], Eff};
414414
(#consumer{status = ?CONN_ACT,
415415
pid = P,
416416
subscription_id = SID} = C, {Cs, Eff}) ->
417417
%% update connected active
418418
%% tell it to step down
419-
{[C#consumer{status = ?CONN_WAIT} | Cs],
419+
{[csr_status(C, ?CONN_WAIT) | Cs],
420420
[notify_consumer_effect(P, SID, S,
421421
Name, false, true) | Eff]};
422422
(C, {Cs, Eff}) ->
@@ -776,7 +776,7 @@ handle_group_forget_connection(Pid, #?MODULE{groups = Groups0} = S0,
776776
lists:foldr(
777777
fun(#consumer{pid = P, status = {?DISCONNECTED, St}} = C, {L, _})
778778
when P == Pid ->
779-
{[C#consumer{status = {?FORGOTTTEN, St}} | L], true};
779+
{[csr_status(C, {?FORGOTTTEN, St}) | L], true};
780780
(C, {L, UpdatedFlag}) ->
781781
{[C | L], UpdatedFlag or false}
782782
end, {[], false}, Consumers0),
@@ -843,7 +843,7 @@ handle_group_after_connection_node_disconnected(ConnPid,
843843
Cs1 = lists:foldr(fun(#consumer{status = {_, St},
844844
pid = Pid} = C0,
845845
Acc) when Pid =:= ConnPid ->
846-
C1 = C0#consumer{status = {?DISCONNECTED, St}},
846+
C1 = csr_status(C0, {?DISCONNECTED, St}),
847847
[C1 | Acc];
848848
(C, Acc) ->
849849
[C | Acc]
@@ -932,10 +932,7 @@ map_to_group(#{<<"consumers">> := Consumers, <<"partition_index">> := Index}) ->
932932

933933
map_to_consumer(#{<<"pid">> := Pid, <<"subscription_id">> := SubId,
934934
<<"owner">> := Owner, <<"active">> := Active}) ->
935-
#consumer{pid = Pid,
936-
subscription_id = SubId,
937-
owner = Owner,
938-
status = active_to_status(Active)}.
935+
csr(Pid, SubId, Owner, active_to_status(Active)).
939936

940937
active_to_status(true) ->
941938
{?CONNECTED, ?ACTIVE};
@@ -964,15 +961,9 @@ do_register_consumer(VirtualHost,
964961
Consumer =
965962
case lookup_active_consumer(Group0) of
966963
{value, _} ->
967-
#consumer{pid = ConnectionPid,
968-
owner = Owner,
969-
subscription_id = SubscriptionId,
970-
status = {?CONNECTED, ?WAITING}};
964+
csr(ConnectionPid, SubscriptionId, Owner, ?CONN_WAIT);
971965
false ->
972-
#consumer{pid = ConnectionPid,
973-
subscription_id = SubscriptionId,
974-
owner = Owner,
975-
status = {?CONNECTED, ?ACTIVE}}
966+
csr(ConnectionPid, SubscriptionId, Owner, ?CONN_ACT)
976967
end,
977968
Group1 = add_to_group(Consumer, Group0),
978969
StreamGroups1 = update_groups(VirtualHost, Stream, ConsumerName,
@@ -1004,20 +995,13 @@ do_register_consumer(VirtualHost,
1004995
case Group0 of
1005996
#group{consumers = []} ->
1006997
%% first consumer in the group, it's the active one
1007-
Consumer0 =
1008-
#consumer{pid = ConnectionPid,
1009-
owner = Owner,
1010-
subscription_id = SubscriptionId,
1011-
status = {?CONNECTED, ?ACTIVE}},
998+
Consumer0 = csr(ConnectionPid, SubscriptionId, Owner, ?CONN_ACT),
1012999
G1 = add_to_group(Consumer0, Group0),
10131000
{G1,
10141001
[notify_consumer_effect(ConnectionPid, SubscriptionId,
10151002
Stream, ConsumerName, true)]};
10161003
_G ->
1017-
Consumer0 = #consumer{pid = ConnectionPid,
1018-
owner = Owner,
1019-
subscription_id = SubscriptionId,
1020-
status = {?CONNECTED, ?WAITING}},
1004+
Consumer0 = csr(ConnectionPid, SubscriptionId, Owner, ?CONN_WAIT),
10211005
G1 = add_to_group(Consumer0, Group0),
10221006
maybe_rebalance_group(G1, {VirtualHost, Stream, ConsumerName})
10231007
end,
@@ -1163,10 +1147,10 @@ compute_active_consumer(#group{partition_index = -1,
11631147
fun(#consumer{pid = P, subscription_id = SID} = C, L)
11641148
when P =:= Pid andalso SID =:= SubId ->
11651149
%% change status of new active
1166-
[C#consumer{status = ?CONN_ACT} | L];
1150+
[csr_status(C, ?CONN_ACT) | L];
11671151
(#consumer{status = {?CONNECTED, _}} = C, L) ->
11681152
%% other connected consumers are set to "waiting"
1169-
[C#consumer{status = ?CONN_WAIT} | L];
1153+
[csr_status(C, ?CONN_WAIT) | L];
11701154
(C, L) ->
11711155
%% other consumers stay the same
11721156
[C | L]
@@ -1239,7 +1223,7 @@ update_groups(VirtualHost,
12391223
ConsumerName,
12401224
Group,
12411225
StreamGroups) ->
1242-
maps:put({VirtualHost, Stream, ConsumerName}, Group, StreamGroups).
1226+
StreamGroups#{{VirtualHost, Stream, ConsumerName} => Group}.
12431227

12441228
update_consumer_state_in_group(#group{consumers = Consumers0} = G,
12451229
Pid,
@@ -1248,7 +1232,7 @@ update_consumer_state_in_group(#group{consumers = Consumers0} = G,
12481232
CS1 = lists:map(fun(C0) ->
12491233
case C0 of
12501234
#consumer{pid = Pid, subscription_id = SubId} ->
1251-
C0#consumer{status = NewStatus};
1235+
csr_status(C0, NewStatus);
12521236
C -> C
12531237
end
12541238
end,
@@ -1257,7 +1241,7 @@ update_consumer_state_in_group(#group{consumers = Consumers0} = G,
12571241

12581242
update_connected_consumers(#group{consumers = Consumers0} = G, NewStatus) ->
12591243
Consumers1 = lists:map(fun(#consumer{status = {?CONNECTED, _}} = C) ->
1260-
C#consumer{status = NewStatus};
1244+
csr_status(C, NewStatus);
12611245
(C) ->
12621246
C
12631247
end, Consumers0),
@@ -1300,3 +1284,21 @@ compute_node_pid_group_dependencies(Node, Groups) ->
13001284
end, Acc, Consumers)
13011285
end, #{}, Groups).
13021286

1287+
-spec csr(pid(), subscription_id(), owner(),
1288+
{consumer_connectivity(), consumer_status()}) ->
1289+
consumer().
1290+
csr(Pid, Id, Owner, Status) ->
1291+
#consumer{pid = Pid,
1292+
subscription_id = Id,
1293+
owner = Owner,
1294+
status = Status,
1295+
ts = ts()}.
1296+
1297+
-spec csr_status(consumer(), {consumer_connectivity(), consumer_status()}) ->
1298+
consumer().
1299+
csr_status(C, Status) ->
1300+
C#consumer{status = Status, ts = ts()}.
1301+
1302+
ts() ->
1303+
erlang:system_time(millisecond).
1304+

deps/rabbit/src/rabbit_stream_sac_coordinator.hrl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525
-type consumer_status() :: active | waiting | deactivating.
2626
-type consumer_connectivity() :: connected | disconnected | forgotten.
2727
-type conf() :: map().
28+
-type timestamp() :: integer().
2829

2930
-record(consumer,
3031
{pid :: pid(),
3132
subscription_id :: subscription_id(),
3233
owner :: owner(), %% just a label
33-
status :: {consumer_connectivity(), consumer_status()}}).
34+
status :: {consumer_connectivity(), consumer_status()},
35+
ts :: timestamp()}).
3436
-record(group,
3537
{consumers :: [#consumer{}], partition_index :: integer()}).
3638
-record(rabbit_stream_sac_coordinator,
@@ -41,6 +43,7 @@
4143
reserved_1,
4244
reserved_2}).
4345

46+
-type consumer() :: #consumer{}.
4447
-type group() :: #group{}.
4548
-type groups() :: #{group_id() => group()}.
4649
%% inner map acts as a set

deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1511,7 +1511,7 @@ list_nodes_test(_) ->
15111511
list_nodes(#{Id0 => grp([csr(P1), csr(P2), csr(P2)]),
15121512
Id1 => grp([csr(P1), csr(P1), csr(P2)]),
15131513
Id2 => grp([csr(P2), csr(P2), csr(P2)])})),
1514-
?assertEqual([N0, N1, N2],
1514+
?assertEqual(lists:sort([N0, N1, N2]),
15151515
list_nodes(#{Id0 => grp([csr(P0), csr(P1), csr(P2)])})),
15161516
assertEmpty(list_nodes(#{})),
15171517

@@ -1569,9 +1569,20 @@ assertSize(Expected, List) when is_list(List) ->
15691569
assertEmpty(Data) ->
15701570
assertSize(0, Data).
15711571

1572-
assertHasGroup(GroupId, Group, Groups) ->
1573-
G = maps:get(GroupId, Groups),
1574-
?assertEqual(Group, G).
1572+
assertHasGroup(GroupId,
1573+
#group{partition_index = ExpectedPI, consumers = ExpectedCs},
1574+
Groups) ->
1575+
#{GroupId := #group{partition_index = CurrentPI, consumers = CurrentCs}} = Groups,
1576+
?assertEqual(ExpectedPI, CurrentPI),
1577+
assertSize(length(ExpectedCs), CurrentCs),
1578+
lists:foreach(fun(N) ->
1579+
Expected = lists:nth(N, ExpectedCs),
1580+
Current = lists:nth(N, CurrentCs),
1581+
assertCsrEqual(Expected, Current)
1582+
end, lists:seq(1, length(ExpectedCs))).
1583+
1584+
assertCsrEqual(Expected, Current) ->
1585+
?assertEqual(Expected#consumer{ts = 0}, Current#consumer{ts = 0}).
15751586

15761587
csr(Pid) ->
15771588
csr(Pid, {connected, waiting}).
@@ -1583,7 +1594,8 @@ csr(Pid, SubId, {Connectivity, Status}) ->
15831594
#consumer{pid = Pid,
15841595
subscription_id = SubId,
15851596
owner = <<"owning connection label">>,
1586-
status = {Connectivity, Status}};
1597+
status = {Connectivity, Status},
1598+
ts = erlang:system_time(millisecond)};
15871599
csr(Pid, SubId, Status) ->
15881600
csr(Pid, SubId, {connected, Status}).
15891601

0 commit comments

Comments
 (0)