Skip to content

Commit e1a31e0

Browse files
Merge pull request #10538 from rabbitmq/fix-offset-lag-calculation
Fix consumer offset and offset lag calculation
2 parents c57e790 + 5b5c8a4 commit e1a31e0

File tree

3 files changed

+175
-10
lines changed

3 files changed

+175
-10
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3908,16 +3908,21 @@ consumer_i(messages_consumed,
39083908
#consumer_configuration{counters = Counters}}) ->
39093909
messages_consumed(Counters);
39103910
consumer_i(offset,
3911-
#consumer{configuration =
3912-
#consumer_configuration{counters = Counters}}) ->
3913-
consumer_offset(Counters);
3911+
#consumer{configuration = #consumer_configuration{counters = Counters},
3912+
last_listener_offset = LLO}) ->
3913+
rabbit_stream_utils:consumer_offset(consumer_offset(Counters),
3914+
messages_consumed(Counters),
3915+
LLO);
39143916
consumer_i(offset_lag, #consumer{log = undefined}) ->
39153917
0;
39163918
consumer_i(offset_lag,
3917-
#consumer{configuration =
3918-
#consumer_configuration{counters = Counters},
3919+
#consumer{configuration = #consumer_configuration{counters = Counters},
3920+
last_listener_offset = LLO,
39193921
log = Log}) ->
3920-
stream_stored_offset(Log) - consumer_offset(Counters);
3922+
rabbit_stream_utils:offset_lag(stream_stored_offset(Log),
3923+
consumer_offset(Counters),
3924+
messages_consumed(Counters),
3925+
LLO);
39213926
consumer_i(connection_pid, _) ->
39223927
self();
39233928
consumer_i(node, _) ->

deps/rabbitmq_stream/src/rabbit_stream_utils.erl

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
filter_spec/1,
3636
command_versions/0,
3737
filtering_supported/0,
38-
check_super_stream_management_permitted/4]).
38+
check_super_stream_management_permitted/4,
39+
offset_lag/4,
40+
consumer_offset/3]).
3941

4042
-include_lib("rabbit_common/include/rabbit.hrl").
4143
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
@@ -330,3 +332,26 @@ q(VirtualHost, Name) ->
330332

331333
e(VirtualHost, Name) ->
332334
rabbit_misc:r(VirtualHost, exchange, Name).
335+
336+
-spec consumer_offset(ConsumerOffsetFromCounter :: integer(),
337+
MessageConsumed :: non_neg_integer(),
338+
LastListenerOffset :: integer() | undefined) -> integer().
339+
consumer_offset(0, 0, undefined) ->
340+
0;
341+
consumer_offset(0, 0, LastListenerOffset) when LastListenerOffset > 0 ->
342+
%% consumer at "next" waiting for messages most likely
343+
LastListenerOffset;
344+
consumer_offset(ConsumerOffsetFromCounter, _, _) ->
345+
ConsumerOffsetFromCounter.
346+
347+
-spec offset_lag(CommittedOffset :: integer(),
348+
ConsumerOffsetFromCounter :: integer(),
349+
MessageConsumed :: non_neg_integer(),
350+
LastListenerOffset :: integer() | undefined) -> integer().
351+
offset_lag(-1, _, _, _) ->
352+
0;
353+
offset_lag(_, 0, 0, LastListenerOffset) when LastListenerOffset > 0 ->
354+
%% consumer waiting for messages at the end of the stream, most likely
355+
0;
356+
offset_lag(CommittedOffset, ConsumerOffset, _, _) ->
357+
CommittedOffset - ConsumerOffset.

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 138 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ groups() ->
6060
vhost_queue_limit,
6161
connection_should_be_closed_on_token_expiry,
6262
should_receive_metadata_update_after_update_secret,
63-
store_offset_requires_read_access
63+
store_offset_requires_read_access,
64+
offset_lag_calculation
6465
]},
6566
%% Run `test_global_counters` on its own so the global metrics are
6667
%% initialised to 0 for each testcase
@@ -814,6 +815,116 @@ store_offset_requires_read_access(Config) ->
814815
closed = wait_for_socket_close(T, S, 10),
815816
ok.
816817

818+
offset_lag_calculation(Config) ->
819+
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
820+
T = gen_tcp,
821+
Port = get_port(T, Config),
822+
Opts = get_opts(T),
823+
{ok, S} = T:connect("localhost", Port, Opts),
824+
C = rabbit_stream_core:init(0),
825+
ConnectionName = FunctionName,
826+
test_peer_properties(T, S, #{<<"connection_name">> => ConnectionName}, C),
827+
test_authenticate(T, S, C),
828+
829+
Stream = FunctionName,
830+
test_create_stream(T, S, Stream, C),
831+
832+
SubId = 1,
833+
TheFuture = os:system_time(millisecond) + 60 * 60 * 1_000,
834+
lists:foreach(fun(OffsetSpec) ->
835+
test_subscribe(T, S, SubId, Stream,
836+
OffsetSpec, 10, #{},
837+
?RESPONSE_CODE_OK, C),
838+
ConsumerInfo = consumer_offset_info(Config, ConnectionName),
839+
?assertEqual({0, 0}, ConsumerInfo),
840+
test_unsubscribe(T, S, SubId, C)
841+
end, [first, last, next, 0, 1_000, {timestamp, TheFuture}]),
842+
843+
844+
PublisherId = 1,
845+
test_declare_publisher(T, S, PublisherId, Stream, C),
846+
MessageCount = 10,
847+
Body = <<"hello">>,
848+
lists:foreach(fun(_) ->
849+
test_publish_confirm(T, S, PublisherId, Body, C)
850+
end, lists:seq(1, MessageCount - 1)),
851+
%% to make sure to have 2 chunks
852+
timer:sleep(200),
853+
test_publish_confirm(T, S, PublisherId, Body, C),
854+
test_delete_publisher(T, S, PublisherId, C),
855+
856+
NextOffset = MessageCount,
857+
lists:foreach(fun({OffsetSpec, ReceiveDeliver, CheckFun}) ->
858+
test_subscribe(T, S, SubId, Stream,
859+
OffsetSpec, 1, #{},
860+
?RESPONSE_CODE_OK, C),
861+
case ReceiveDeliver of
862+
true ->
863+
{{deliver, SubId, _}, _} = receive_commands(T, S, C);
864+
_ ->
865+
ok
866+
end,
867+
{Offset, Lag} = consumer_offset_info(Config, ConnectionName),
868+
CheckFun(Offset, Lag),
869+
test_unsubscribe(T, S, SubId, C)
870+
end, [{first, true,
871+
fun(Offset, Lag) ->
872+
?assert(Offset >= 0, "first, at least one chunk consumed"),
873+
?assert(Lag > 0, "first, not all messages consumed")
874+
end},
875+
{last, true,
876+
fun(Offset, _Lag) ->
877+
?assert(Offset > 0, "offset expected for last")
878+
end},
879+
{next, false,
880+
fun(Offset, Lag) ->
881+
?assertEqual(NextOffset, Offset, "next, offset should be at the end of the stream"),
882+
?assert(Lag =:= 0, "next, offset lag should be 0")
883+
end},
884+
{0, true,
885+
fun(Offset, Lag) ->
886+
?assert(Offset >= 0, "offset spec = 0, at least one chunk consumed"),
887+
?assert(Lag > 0, "offset spec = 0, not all messages consumed")
888+
end},
889+
{1_000, false,
890+
fun(Offset, Lag) ->
891+
?assertEqual(NextOffset, Offset, "offset spec = 1000, offset should be at the end of the stream"),
892+
?assert(Lag =:= 0, "offset spec = 1000, offset lag should be 0")
893+
end},
894+
{{timestamp, TheFuture}, false,
895+
fun(Offset, Lag) ->
896+
?assertEqual(NextOffset, Offset, "offset spec in future, offset should be at the end of the stream"),
897+
?assert(Lag =:= 0, "offset spec in future , offset lag should be 0")
898+
end}]),
899+
900+
test_delete_stream(T, S, Stream, C, false),
901+
test_close(T, S, C),
902+
903+
ok.
904+
905+
consumer_offset_info(Config, ConnectionName) ->
906+
[[{offset, Offset},
907+
{offset_lag, Lag}]] = rpc(Config, 0, ?MODULE,
908+
list_consumer_info, [ConnectionName, [offset, offset_lag]]),
909+
{Offset, Lag}.
910+
911+
list_consumer_info(ConnectionName, Infos) ->
912+
Pids = rabbit_stream:list(<<"/">>),
913+
[ConnPid] = lists:filter(fun(ConnectionPid) ->
914+
ConnectionPid ! {infos, self()},
915+
receive
916+
{ConnectionPid,
917+
#{<<"connection_name">> := ConnectionName}} ->
918+
true;
919+
{ConnectionPid, _ClientProperties} ->
920+
false
921+
after 1000 ->
922+
false
923+
end
924+
end,
925+
Pids),
926+
rabbit_stream_reader:consumers_info(ConnPid, Infos).
927+
817928
store_offset(Transport, S, Reference, Stream, Offset) ->
818929
StoreFrame = rabbit_stream_core:frame({store_offset, Reference, Stream, Offset}),
819930
ok = Transport:send(S, StoreFrame).
@@ -983,7 +1094,10 @@ test_server(Transport, Stream, Config) ->
9831094
ok.
9841095

9851096
test_peer_properties(Transport, S, C0) ->
986-
PeerPropertiesFrame = request({peer_properties, #{}}),
1097+
test_peer_properties(Transport, S, #{}, C0).
1098+
1099+
test_peer_properties(Transport, S, Properties, C0) ->
1100+
PeerPropertiesFrame = request({peer_properties, Properties}),
9871101
ok = Transport:send(S, PeerPropertiesFrame),
9881102
{Cmd, C} = receive_commands(Transport, S, C0),
9891103
?assertMatch({response, 1, {peer_properties, ?RESPONSE_CODE_OK, _}},
@@ -1133,6 +1247,13 @@ test_publish_confirm(Transport, S, publish_v2 = PublishCmd, PublisherId,
11331247
end,
11341248
C.
11351249

1250+
test_delete_publisher(Transport, Socket, PublisherId, C0) ->
1251+
Frame = request({delete_publisher, PublisherId}),
1252+
ok = Transport:send(Socket, Frame),
1253+
{Cmd, C} = receive_commands(Transport, Socket, C0),
1254+
?assertMatch({response, 1, {delete_publisher, ?RESPONSE_CODE_OK}}, Cmd),
1255+
C.
1256+
11361257
test_subscribe(Transport, S, SubscriptionId, Stream, C0) ->
11371258
test_subscribe(Transport,
11381259
S,
@@ -1149,7 +1270,21 @@ test_subscribe(Transport,
11491270
SubscriptionProperties,
11501271
ExpectedResponseCode,
11511272
C0) ->
1152-
SubscribeFrame = request({subscribe, SubscriptionId, Stream, 0, 10, SubscriptionProperties}),
1273+
test_subscribe(Transport, S, SubscriptionId, Stream, 0, 10,
1274+
SubscriptionProperties,
1275+
ExpectedResponseCode, C0).
1276+
1277+
test_subscribe(Transport,
1278+
S,
1279+
SubscriptionId,
1280+
Stream,
1281+
OffsetSpec,
1282+
Credit,
1283+
SubscriptionProperties,
1284+
ExpectedResponseCode,
1285+
C0) ->
1286+
SubscribeFrame = request({subscribe, SubscriptionId, Stream,
1287+
OffsetSpec, Credit, SubscriptionProperties}),
11531288
ok = Transport:send(S, SubscribeFrame),
11541289
{Cmd, C} = receive_commands(Transport, S, C0),
11551290
?assertMatch({response, 1, {subscribe, ExpectedResponseCode}}, Cmd),

0 commit comments

Comments
 (0)