Skip to content

Commit d04cb67

Browse files
Merge pull request #855 from rabbitmq/rabbitmq-server-839
Use separate index to store dying clients file offsets
2 parents aeb5c6b + 730ffa5 commit d04cb67

File tree

1 file changed

+32
-8
lines changed

1 file changed

+32
-8
lines changed

src/rabbit_msg_store.erl

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@
9191
flying_ets,
9292
%% set of dying clients
9393
dying_clients,
94+
%% index of file positions for client death messages
95+
dying_client_index,
9496
%% map of references of all registered clients
9597
%% to callbacks
9698
clients,
@@ -131,6 +133,12 @@
131133
msg_store
132134
}).
133135

136+
-record(dying_client,
137+
{ client_ref,
138+
file,
139+
offset
140+
}).
141+
134142
%%----------------------------------------------------------------------------
135143

136144
-export_type([gc_state/0, file_num/0]).
@@ -416,6 +424,10 @@
416424
%% performance with many healthy clients and few, if any, dying
417425
%% clients, which is the typical case.
418426
%%
427+
%% Client termination messages are stored in a separate ets index to
428+
%% avoid filling primary message store index and message files with
429+
%% client termination messages.
430+
%%
419431
%% When the msg_store has a backlog (i.e. it has unprocessed messages
420432
%% in its mailbox / gen_server priority queue), a further optimisation
421433
%% opportunity arises: we can eliminate pairs of 'write' and 'remove'
@@ -687,7 +699,9 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
687699
end.
688700

689701
clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
690-
dying_clients = DyingClients }) ->
702+
dying_clients = DyingClients,
703+
dying_client_index = DyingIndex }) ->
704+
ets:delete(DyingIndex, CRef),
691705
State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
692706
dying_clients = sets:del_element(CRef, DyingClients) }.
693707

@@ -741,6 +755,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
741755
[ordered_set, public]),
742756
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
743757
FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]),
758+
DyingIndex = ets:new(rabbit_msg_store_dying_client_index,
759+
[set, public, {keypos, #dying_client.client_ref}]),
744760

745761
{ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
746762

@@ -772,6 +788,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
772788
cur_file_cache_ets = CurFileCacheEts,
773789
flying_ets = FlyingEts,
774790
dying_clients = sets:new(),
791+
dying_client_index = DyingIndex,
775792
clients = Clients,
776793
successfully_recovered = CleanShutdown,
777794
file_size_limit = FileSizeLimit,
@@ -848,15 +865,21 @@ handle_call({contains, MsgId}, From, State) ->
848865
noreply(State1).
849866

850867
handle_cast({client_dying, CRef},
851-
State = #msstate { dying_clients = DyingClients }) ->
868+
State = #msstate { dying_clients = DyingClients,
869+
dying_client_index = DyingIndex,
870+
current_file_handle = CurHdl,
871+
current_file = CurFile }) ->
852872
DyingClients1 = sets:add_element(CRef, DyingClients),
853-
noreply(write_message(CRef, <<>>,
854-
State #msstate { dying_clients = DyingClients1 }));
873+
{ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
874+
true = ets:insert_new(DyingIndex, #dying_client{client_ref = CRef,
875+
file = CurFile,
876+
offset = CurOffset}),
877+
noreply(State #msstate { dying_clients = DyingClients1 });
855878

856879
handle_cast({client_delete, CRef},
857880
State = #msstate { clients = Clients }) ->
858881
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
859-
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
882+
noreply(clear_client(CRef, State1));
860883

861884
handle_cast({write, CRef, MsgId, Flow},
862885
State = #msstate { cur_file_cache_ets = CurFileCacheEts,
@@ -1334,16 +1357,17 @@ blind_confirm(CRef, MsgIds, ActionTaken, State) ->
13341357
%% msg and thus should be ignored. Note that this (correctly) returns
13351358
%% false when testing to remove the death msg itself.
13361359
should_mask_action(CRef, MsgId,
1337-
State = #msstate { dying_clients = DyingClients }) ->
1360+
State = #msstate { dying_clients = DyingClients,
1361+
dying_client_index = DyingIndex }) ->
13381362
case {sets:is_element(CRef, DyingClients), index_lookup(MsgId, State)} of
13391363
{false, Location} ->
13401364
{false, Location};
13411365
{true, not_found} ->
13421366
{true, not_found};
13431367
{true, #msg_location { file = File, offset = Offset,
13441368
ref_count = RefCount } = Location} ->
1345-
#msg_location { file = DeathFile, offset = DeathOffset } =
1346-
index_lookup(CRef, State),
1369+
[#dying_client { file = DeathFile, offset = DeathOffset }] =
1370+
ets:lookup(DyingIndex, CRef),
13471371
{case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of
13481372
{true, _} -> true;
13491373
{false, 0} -> false_if_increment;

0 commit comments

Comments
 (0)