Skip to content

Commit 5681510

Browse files
HoloRinmichaelklishin
authored andcommitted
Merge pull request #3177 from rabbitmq/stream-commit-offset-becomes-store-offset
Use "store" instead of "commit" for offset tracking (cherry picked from commit 860333a)
1 parent 5f19831 commit 5681510

File tree

4 files changed

+19
-19
lines changed

4 files changed

+19
-19
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,7 @@ messages_confirmed(Counters) ->
500500
messages_errored(Counters) ->
501501
atomics:get(Counters, 3).
502502

503-
stream_committed_offset(Log) ->
503+
stream_stored_offset(Log) ->
504504
osiris_log:committed_offset(Log).
505505

506506
augment_infos_with_user_provided_connection_name(Infos,
@@ -1802,7 +1802,7 @@ handle_frame_post_auth(_Transport,
18021802
user = User} =
18031803
Connection,
18041804
State,
1805-
{commit_offset, Reference, Stream, Offset}) ->
1805+
{store_offset, Reference, Stream, Offset}) ->
18061806
case rabbit_stream_utils:check_write_permitted(#resource{name =
18071807
Stream,
18081808
kind = queue,
@@ -1813,17 +1813,17 @@ handle_frame_post_auth(_Transport,
18131813
ok ->
18141814
case lookup_leader(Stream, Connection) of
18151815
cluster_not_found ->
1816-
rabbit_log:warning("Could not find leader to commit offset on ~p",
1816+
rabbit_log:warning("Could not find leader to store offset on ~p",
18171817
[Stream]),
1818-
%% FIXME commit offset is fire-and-forget, so no response even if error, change this?
1818+
%% FIXME store offset is fire-and-forget, so no response even if error, change this?
18191819
{Connection, State};
18201820
{ClusterLeader, Connection1} ->
18211821
osiris:write_tracking(ClusterLeader, Reference, Offset),
18221822
{Connection1, State}
18231823
end;
18241824
error ->
1825-
%% FIXME commit offset is fire-and-forget, so no response even if error, change this?
1826-
rabbit_log:info("Not authorized to commit offset on ~p", [Stream]),
1825+
%% FIXME store offset is fire-and-forget, so no response even if error, change this?
1826+
rabbit_log:info("Not authorized to store offset on ~p", [Stream]),
18271827
{Connection, State}
18281828
end;
18291829
handle_frame_post_auth(Transport,
@@ -2629,7 +2629,7 @@ consumer_i(offset, #consumer{counters = Counters}) ->
26292629
consumer_offset(Counters);
26302630
consumer_i(offset_lag,
26312631
#consumer{counters = Counters, segment = Log}) ->
2632-
stream_committed_offset(Log) - consumer_offset(Counters);
2632+
stream_stored_offset(Log) - consumer_offset(Counters);
26332633
consumer_i(connection_pid, _) ->
26342634
self();
26352635
consumer_i(properties, #consumer{properties = Properties}) ->

deps/rabbitmq_stream_common/include/rabbit_stream.hrl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
-define(COMMAND_SUBSCRIBE, 7).
88
-define(COMMAND_DELIVER, 8).
99
-define(COMMAND_CREDIT, 9).
10-
-define(COMMAND_COMMIT_OFFSET, 10).
10+
-define(COMMAND_STORE_OFFSET, 10).
1111
-define(COMMAND_QUERY_OFFSET, 11).
1212
-define(COMMAND_UNSUBSCRIBE, 12).
1313
-define(COMMAND_CREATE_STREAM, 13).

deps/rabbitmq_stream_common/src/rabbit_stream_core.erl

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
{deliver, subscription_id(), Chunk :: binary()} |
7373
{credit, subscription_id(), Credit :: non_neg_integer()} |
7474
{metadata_update, stream_name(), response_code()} |
75-
{commit_offset, offset_ref(), stream_name(), osiris:offset()} |
75+
{store_offset, offset_ref(), stream_name(), osiris:offset()} |
7676
heartbeat |
7777
{tune, FrameMax :: non_neg_integer(),
7878
HeartBeat :: non_neg_integer()} |
@@ -236,11 +236,11 @@ frame({metadata_update, Stream, ResponseCode}) ->
236236
ResponseCode:16,
237237
StreamSize:16,
238238
Stream/binary>>);
239-
frame({commit_offset, Reference, Stream, Offset}) ->
239+
frame({store_offset, Reference, Stream, Offset}) ->
240240
ReferenceSize = byte_size(Reference),
241241
StreamSize = byte_size(Stream),
242242
wrap_in_frame(<<?REQUEST:1,
243-
?COMMAND_COMMIT_OFFSET:15,
243+
?COMMAND_STORE_OFFSET:15,
244244
?VERSION_1:16,
245245
ReferenceSize:16,
246246
Reference/binary,
@@ -473,7 +473,7 @@ request_body({subscribe = Tag,
473473
end,
474474
{Tag,
475475
[<<SubscriptionId:8, ?STRING(Stream), Data/binary>> | PropertiesBin]};
476-
request_body({commit_offset = Tag, OffsetRef, Stream, Offset}) ->
476+
request_body({store_offset = Tag, OffsetRef, Stream, Offset}) ->
477477
{Tag, <<?STRING(OffsetRef), ?STRING(Stream), Offset:64>>};
478478
request_body({query_offset = Tag, OffsetRef, Stream}) ->
479479
{Tag, <<?STRING(OffsetRef), ?STRING(Stream)>>};
@@ -575,12 +575,12 @@ parse_request(<<?REQUEST:1,
575575
Stream:StreamSize/binary>>) ->
576576
{metadata_update, Stream, ResponseCode};
577577
parse_request(<<?REQUEST:1,
578-
?COMMAND_COMMIT_OFFSET:15,
578+
?COMMAND_STORE_OFFSET:15,
579579
?VERSION_1:16,
580580
?STRING(RefSize, OffsetRef),
581581
?STRING(SSize, Stream),
582582
Offset:64>>) ->
583-
{commit_offset, OffsetRef, Stream, Offset};
583+
{store_offset, OffsetRef, Stream, Offset};
584584
parse_request(<<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>) ->
585585
heartbeat;
586586
parse_request(<<?REQUEST:1,
@@ -909,8 +909,8 @@ command_id(deliver) ->
909909
?COMMAND_DELIVER;
910910
command_id(credit) ->
911911
?COMMAND_CREDIT;
912-
command_id(commit_offset) ->
913-
?COMMAND_COMMIT_OFFSET;
912+
command_id(store_offset) ->
913+
?COMMAND_STORE_OFFSET;
914914
command_id(query_offset) ->
915915
?COMMAND_QUERY_OFFSET;
916916
command_id(unsubscribe) ->
@@ -960,8 +960,8 @@ parse_command_id(?COMMAND_DELIVER) ->
960960
deliver;
961961
parse_command_id(?COMMAND_CREDIT) ->
962962
credit;
963-
parse_command_id(?COMMAND_COMMIT_OFFSET) ->
964-
commit_offset;
963+
parse_command_id(?COMMAND_STORE_OFFSET) ->
964+
store_offset;
965965
parse_command_id(?COMMAND_QUERY_OFFSET) ->
966966
query_offset;
967967
parse_command_id(?COMMAND_UNSUBSCRIBE) ->

deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ roundtrip(_Config) ->
6060
test_roundtrip({credit, 53, 12}),
6161
test_roundtrip({metadata_update, <<"stream1">>,
6262
?RESPONSE_VHOST_ACCESS_FAILURE}),
63-
test_roundtrip({commit_offset, <<"offset_ref">>, <<"stream">>, 12}),
63+
test_roundtrip({store_offset, <<"offset_ref">>, <<"stream">>, 12}),
6464
test_roundtrip(heartbeat),
6565
test_roundtrip({tune, 53, 12}),
6666
%% REQUESTS

0 commit comments

Comments
 (0)