Skip to content

Commit 5b5c8a4

Browse files
committed
Fix consumer offset and offset lag calculation
The offset lag for a consumer is the difference between the last committed offset (offset of the first message in the last chunk confirmed by a quorum of stream members) and the current offset of the consumer (offset of the first message in the last chunk dispatched to the consumer). The calculation is simple for most cases, but it needs to be refined by using more context for edge cases (subscription to a stream that has not messages yet, subscription at the very end of a quiet stream). Example: subscription at "next" (waiting for new messages) in a quiet stream (no messages published). The previous implementation would return consumer offset = 0 and offset lag = last committed offset, where we would expect to get consumer offset = next offset and offset lag = 0. This commit fixes the calculation for these edge cases.
1 parent c57e790 commit 5b5c8a4

File tree

3 files changed

+175
-10
lines changed

3 files changed

+175
-10
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3908,16 +3908,21 @@ consumer_i(messages_consumed,
39083908
#consumer_configuration{counters = Counters}}) ->
39093909
messages_consumed(Counters);
39103910
consumer_i(offset,
3911-
#consumer{configuration =
3912-
#consumer_configuration{counters = Counters}}) ->
3913-
consumer_offset(Counters);
3911+
#consumer{configuration = #consumer_configuration{counters = Counters},
3912+
last_listener_offset = LLO}) ->
3913+
rabbit_stream_utils:consumer_offset(consumer_offset(Counters),
3914+
messages_consumed(Counters),
3915+
LLO);
39143916
consumer_i(offset_lag, #consumer{log = undefined}) ->
39153917
0;
39163918
consumer_i(offset_lag,
3917-
#consumer{configuration =
3918-
#consumer_configuration{counters = Counters},
3919+
#consumer{configuration = #consumer_configuration{counters = Counters},
3920+
last_listener_offset = LLO,
39193921
log = Log}) ->
3920-
stream_stored_offset(Log) - consumer_offset(Counters);
3922+
rabbit_stream_utils:offset_lag(stream_stored_offset(Log),
3923+
consumer_offset(Counters),
3924+
messages_consumed(Counters),
3925+
LLO);
39213926
consumer_i(connection_pid, _) ->
39223927
self();
39233928
consumer_i(node, _) ->

deps/rabbitmq_stream/src/rabbit_stream_utils.erl

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
filter_spec/1,
3636
command_versions/0,
3737
filtering_supported/0,
38-
check_super_stream_management_permitted/4]).
38+
check_super_stream_management_permitted/4,
39+
offset_lag/4,
40+
consumer_offset/3]).
3941

4042
-include_lib("rabbit_common/include/rabbit.hrl").
4143
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
@@ -330,3 +332,26 @@ q(VirtualHost, Name) ->
330332

331333
e(VirtualHost, Name) ->
332334
rabbit_misc:r(VirtualHost, exchange, Name).
335+
336+
-spec consumer_offset(ConsumerOffsetFromCounter :: integer(),
337+
MessageConsumed :: non_neg_integer(),
338+
LastListenerOffset :: integer() | undefined) -> integer().
339+
consumer_offset(0, 0, undefined) ->
340+
0;
341+
consumer_offset(0, 0, LastListenerOffset) when LastListenerOffset > 0 ->
342+
%% consumer at "next" waiting for messages most likely
343+
LastListenerOffset;
344+
consumer_offset(ConsumerOffsetFromCounter, _, _) ->
345+
ConsumerOffsetFromCounter.
346+
347+
-spec offset_lag(CommittedOffset :: integer(),
348+
ConsumerOffsetFromCounter :: integer(),
349+
MessageConsumed :: non_neg_integer(),
350+
LastListenerOffset :: integer() | undefined) -> integer().
351+
offset_lag(-1, _, _, _) ->
352+
0;
353+
offset_lag(_, 0, 0, LastListenerOffset) when LastListenerOffset > 0 ->
354+
%% consumer waiting for messages at the end of the stream, most likely
355+
0;
356+
offset_lag(CommittedOffset, ConsumerOffset, _, _) ->
357+
CommittedOffset - ConsumerOffset.

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 138 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ groups() ->
6060
vhost_queue_limit,
6161
connection_should_be_closed_on_token_expiry,
6262
should_receive_metadata_update_after_update_secret,
63-
store_offset_requires_read_access
63+
store_offset_requires_read_access,
64+
offset_lag_calculation
6465
]},
6566
%% Run `test_global_counters` on its own so the global metrics are
6667
%% initialised to 0 for each testcase
@@ -814,6 +815,116 @@ store_offset_requires_read_access(Config) ->
814815
closed = wait_for_socket_close(T, S, 10),
815816
ok.
816817

818+
offset_lag_calculation(Config) ->
819+
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
820+
T = gen_tcp,
821+
Port = get_port(T, Config),
822+
Opts = get_opts(T),
823+
{ok, S} = T:connect("localhost", Port, Opts),
824+
C = rabbit_stream_core:init(0),
825+
ConnectionName = FunctionName,
826+
test_peer_properties(T, S, #{<<"connection_name">> => ConnectionName}, C),
827+
test_authenticate(T, S, C),
828+
829+
Stream = FunctionName,
830+
test_create_stream(T, S, Stream, C),
831+
832+
SubId = 1,
833+
TheFuture = os:system_time(millisecond) + 60 * 60 * 1_000,
834+
lists:foreach(fun(OffsetSpec) ->
835+
test_subscribe(T, S, SubId, Stream,
836+
OffsetSpec, 10, #{},
837+
?RESPONSE_CODE_OK, C),
838+
ConsumerInfo = consumer_offset_info(Config, ConnectionName),
839+
?assertEqual({0, 0}, ConsumerInfo),
840+
test_unsubscribe(T, S, SubId, C)
841+
end, [first, last, next, 0, 1_000, {timestamp, TheFuture}]),
842+
843+
844+
PublisherId = 1,
845+
test_declare_publisher(T, S, PublisherId, Stream, C),
846+
MessageCount = 10,
847+
Body = <<"hello">>,
848+
lists:foreach(fun(_) ->
849+
test_publish_confirm(T, S, PublisherId, Body, C)
850+
end, lists:seq(1, MessageCount - 1)),
851+
%% to make sure to have 2 chunks
852+
timer:sleep(200),
853+
test_publish_confirm(T, S, PublisherId, Body, C),
854+
test_delete_publisher(T, S, PublisherId, C),
855+
856+
NextOffset = MessageCount,
857+
lists:foreach(fun({OffsetSpec, ReceiveDeliver, CheckFun}) ->
858+
test_subscribe(T, S, SubId, Stream,
859+
OffsetSpec, 1, #{},
860+
?RESPONSE_CODE_OK, C),
861+
case ReceiveDeliver of
862+
true ->
863+
{{deliver, SubId, _}, _} = receive_commands(T, S, C);
864+
_ ->
865+
ok
866+
end,
867+
{Offset, Lag} = consumer_offset_info(Config, ConnectionName),
868+
CheckFun(Offset, Lag),
869+
test_unsubscribe(T, S, SubId, C)
870+
end, [{first, true,
871+
fun(Offset, Lag) ->
872+
?assert(Offset >= 0, "first, at least one chunk consumed"),
873+
?assert(Lag > 0, "first, not all messages consumed")
874+
end},
875+
{last, true,
876+
fun(Offset, _Lag) ->
877+
?assert(Offset > 0, "offset expected for last")
878+
end},
879+
{next, false,
880+
fun(Offset, Lag) ->
881+
?assertEqual(NextOffset, Offset, "next, offset should be at the end of the stream"),
882+
?assert(Lag =:= 0, "next, offset lag should be 0")
883+
end},
884+
{0, true,
885+
fun(Offset, Lag) ->
886+
?assert(Offset >= 0, "offset spec = 0, at least one chunk consumed"),
887+
?assert(Lag > 0, "offset spec = 0, not all messages consumed")
888+
end},
889+
{1_000, false,
890+
fun(Offset, Lag) ->
891+
?assertEqual(NextOffset, Offset, "offset spec = 1000, offset should be at the end of the stream"),
892+
?assert(Lag =:= 0, "offset spec = 1000, offset lag should be 0")
893+
end},
894+
{{timestamp, TheFuture}, false,
895+
fun(Offset, Lag) ->
896+
?assertEqual(NextOffset, Offset, "offset spec in future, offset should be at the end of the stream"),
897+
?assert(Lag =:= 0, "offset spec in future , offset lag should be 0")
898+
end}]),
899+
900+
test_delete_stream(T, S, Stream, C, false),
901+
test_close(T, S, C),
902+
903+
ok.
904+
905+
consumer_offset_info(Config, ConnectionName) ->
906+
[[{offset, Offset},
907+
{offset_lag, Lag}]] = rpc(Config, 0, ?MODULE,
908+
list_consumer_info, [ConnectionName, [offset, offset_lag]]),
909+
{Offset, Lag}.
910+
911+
list_consumer_info(ConnectionName, Infos) ->
912+
Pids = rabbit_stream:list(<<"/">>),
913+
[ConnPid] = lists:filter(fun(ConnectionPid) ->
914+
ConnectionPid ! {infos, self()},
915+
receive
916+
{ConnectionPid,
917+
#{<<"connection_name">> := ConnectionName}} ->
918+
true;
919+
{ConnectionPid, _ClientProperties} ->
920+
false
921+
after 1000 ->
922+
false
923+
end
924+
end,
925+
Pids),
926+
rabbit_stream_reader:consumers_info(ConnPid, Infos).
927+
817928
store_offset(Transport, S, Reference, Stream, Offset) ->
818929
StoreFrame = rabbit_stream_core:frame({store_offset, Reference, Stream, Offset}),
819930
ok = Transport:send(S, StoreFrame).
@@ -983,7 +1094,10 @@ test_server(Transport, Stream, Config) ->
9831094
ok.
9841095

9851096
test_peer_properties(Transport, S, C0) ->
986-
PeerPropertiesFrame = request({peer_properties, #{}}),
1097+
test_peer_properties(Transport, S, #{}, C0).
1098+
1099+
test_peer_properties(Transport, S, Properties, C0) ->
1100+
PeerPropertiesFrame = request({peer_properties, Properties}),
9871101
ok = Transport:send(S, PeerPropertiesFrame),
9881102
{Cmd, C} = receive_commands(Transport, S, C0),
9891103
?assertMatch({response, 1, {peer_properties, ?RESPONSE_CODE_OK, _}},
@@ -1133,6 +1247,13 @@ test_publish_confirm(Transport, S, publish_v2 = PublishCmd, PublisherId,
11331247
end,
11341248
C.
11351249

1250+
test_delete_publisher(Transport, Socket, PublisherId, C0) ->
1251+
Frame = request({delete_publisher, PublisherId}),
1252+
ok = Transport:send(Socket, Frame),
1253+
{Cmd, C} = receive_commands(Transport, Socket, C0),
1254+
?assertMatch({response, 1, {delete_publisher, ?RESPONSE_CODE_OK}}, Cmd),
1255+
C.
1256+
11361257
test_subscribe(Transport, S, SubscriptionId, Stream, C0) ->
11371258
test_subscribe(Transport,
11381259
S,
@@ -1149,7 +1270,21 @@ test_subscribe(Transport,
11491270
SubscriptionProperties,
11501271
ExpectedResponseCode,
11511272
C0) ->
1152-
SubscribeFrame = request({subscribe, SubscriptionId, Stream, 0, 10, SubscriptionProperties}),
1273+
test_subscribe(Transport, S, SubscriptionId, Stream, 0, 10,
1274+
SubscriptionProperties,
1275+
ExpectedResponseCode, C0).
1276+
1277+
test_subscribe(Transport,
1278+
S,
1279+
SubscriptionId,
1280+
Stream,
1281+
OffsetSpec,
1282+
Credit,
1283+
SubscriptionProperties,
1284+
ExpectedResponseCode,
1285+
C0) ->
1286+
SubscribeFrame = request({subscribe, SubscriptionId, Stream,
1287+
OffsetSpec, Credit, SubscriptionProperties}),
11531288
ok = Transport:send(S, SubscribeFrame),
11541289
{Cmd, C} = receive_commands(Transport, S, C0),
11551290
?assertMatch({response, 1, {subscribe, ExpectedResponseCode}}, Cmd),

0 commit comments

Comments
 (0)