Skip to content

Commit 537fe75

Browse files
Merge pull request #12416 from rabbitmq/md/binding-deletions-map-refactor
2 parents b3bcf97 + 4aa68ca commit 537fe75

File tree

7 files changed

+217
-131
lines changed

7 files changed

+217
-131
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1818,8 +1818,8 @@ internal_delete(Queue, ActingUser, Reason) ->
18181818
{error, timeout} = Err ->
18191819
Err;
18201820
Deletions ->
1821-
_ = rabbit_binding:process_deletions(Deletions),
1822-
rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER),
1821+
ok = rabbit_binding:process_deletions(Deletions),
1822+
ok = rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER),
18231823
rabbit_core_metrics:queue_deleted(QueueName),
18241824
ok = rabbit_event:notify(queue_deleted,
18251825
[{name, QueueName},
@@ -1942,14 +1942,14 @@ filter_transient_queues_to_delete(Node) ->
19421942
end.
19431943

19441944
notify_queue_binding_deletions(QueueDeletions) when is_list(QueueDeletions) ->
1945-
Deletions = rabbit_binding:process_deletions(
1946-
lists:foldl(fun rabbit_binding:combine_deletions/2,
1947-
rabbit_binding:new_deletions(),
1948-
QueueDeletions)),
1945+
Deletions = lists:foldl(
1946+
fun rabbit_binding:combine_deletions/2,
1947+
rabbit_binding:new_deletions(), QueueDeletions),
1948+
ok = rabbit_binding:process_deletions(Deletions),
19491949
rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER);
19501950
notify_queue_binding_deletions(QueueDeletions) ->
1951-
Deletions = rabbit_binding:process_deletions(QueueDeletions),
1952-
rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER).
1951+
ok = rabbit_binding:process_deletions(QueueDeletions),
1952+
rabbit_binding:notify_deletions(QueueDeletions, ?INTERNAL_USER).
19531953

19541954
notify_transient_queues_deleted(QueueDeletions) ->
19551955
lists:foreach(

deps/rabbit/src/rabbit_binding.erl

Lines changed: 158 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
-export([list/1, list_for_source/1, list_for_destination/1,
1414
list_for_source_and_destination/2, list_for_source_and_destination/3,
1515
list_explicit/0]).
16-
-export([new_deletions/0, combine_deletions/2, add_deletion/3,
16+
-export([new_deletions/0, combine_deletions/2, add_deletion/5,
1717
process_deletions/1, notify_deletions/2, group_bindings_fold/3]).
1818
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4]).
1919

@@ -22,6 +22,9 @@
2222
-export([reverse_route/1, index_route/1]).
2323
-export([binding_type/2]).
2424

25+
%% For testing only
26+
-export([fetch_deletion/2]).
27+
2528
-define(DEFAULT_EXCHANGE(VHostPath), #resource{virtual_host = VHostPath,
2629
kind = exchange,
2730
name = <<>>}).
@@ -50,9 +53,12 @@
5053
rabbit_types:ok_or_error(rabbit_types:amqp_error())).
5154
-type bindings() :: [rabbit_types:binding()].
5255

53-
%% TODO this should really be opaque but that seems to confuse 17.1's
54-
%% dialyzer into objecting to everything that uses it.
55-
-type deletions() :: dict:dict().
56+
-record(deletion, {exchange :: rabbit_types:exchange(),
57+
%% Whether the exchange was deleted.
58+
deleted :: boolean(),
59+
bindings :: sets:set(rabbit_types:binding())}).
60+
61+
-opaque deletions() :: #{XName :: rabbit_exchange:name() => #deletion{}}.
5662

5763
%%----------------------------------------------------------------------------
5864

@@ -159,6 +165,19 @@ binding_type0(false, true) ->
159165
binding_type0(_, _) ->
160166
transient.
161167

168+
binding_checks(Binding, InnerFun) ->
169+
fun(Src, Dst) ->
170+
case rabbit_exchange:validate_binding(Src, Binding) of
171+
ok ->
172+
%% this argument is used to check queue exclusivity;
173+
%% in general, we want to fail on that in preference to
174+
%% anything else
175+
InnerFun(Src, Dst);
176+
Err ->
177+
Err
178+
end
179+
end.
180+
162181
-spec remove(rabbit_types:binding(), rabbit_types:username()) -> bind_res().
163182
remove(Binding, ActingUser) -> remove(Binding, fun (_Src, _Dst) -> ok end, ActingUser).
164183

@@ -360,93 +379,155 @@ index_route(#route{binding = #binding{source = Source,
360379
%% ----------------------------------------------------------------------------
361380
%% Binding / exchange deletion abstraction API
362381
%% ----------------------------------------------------------------------------
363-
364-
anything_but( NotThis, NotThis, NotThis) -> NotThis;
365-
anything_but( NotThis, NotThis, This) -> This;
366-
anything_but( NotThis, This, NotThis) -> This;
367-
anything_but(_NotThis, This, This) -> This.
382+
%%
383+
%% `deletions()' describe a set of removals of bindings and/or exchanges from
384+
%% the metadata store.
385+
%%
386+
%% This deletion collection is used for two purposes:
387+
%%
388+
%% <ul>
389+
%% <li>"<em>Processing</em>" of deletions. Processing here means that the
390+
%% exchanges and bindings are passed into the {@link rabbit_exchange}
391+
%% callbacks. When an exchange is deleted the `rabbit_exchange:delete/1'
392+
%% callback is invoked and when the exchange is not deleted but some bindings
393+
%% are deleted the `rabbit_exchange:remove_bindings/2' is invoked.</li>
394+
%% <li><em>Notification</em> of metadata deletion. Like other internal
395+
%% notifications, {@link rabbit_binding:notify_deletions()} uses {@link
396+
%% rabbit_event} to notify any interested consumers of a resource deletion.
397+
%% An example consumer of {@link rabbit_event} is the `rabbitmq_event_exchange'
398+
%% plugin which publishes these notifications as messages.</li>
399+
%% </ul>
400+
%%
401+
%% The point of collecting deletions into this opaque type is to be able to
402+
%% collect all bindings deleted for a given exchange into a list. This allows
403+
%% us to invoke the `rabbit_exchange:remove_bindings/2' callback with all
404+
%% deleted bindings at once rather than passing each deleted binding
405+
%% individually.
368406

369407
-spec new_deletions() -> deletions().
370408

371-
new_deletions() -> dict:new().
372-
373-
-spec add_deletion
374-
(rabbit_exchange:name(),
375-
{'undefined' | rabbit_types:exchange(),
376-
'deleted' | 'not_deleted',
377-
bindings()},
378-
deletions()) ->
379-
deletions().
380-
381-
add_deletion(XName, Entry, Deletions) ->
382-
dict:update(XName, fun (Entry1) -> merge_entry(Entry1, Entry) end,
383-
Entry, Deletions).
409+
new_deletions() -> #{}.
410+
411+
-spec add_deletion(XName, X, XDeleted, Bindings, Deletions) -> Deletions1
412+
when
413+
XName :: rabbit_exchange:name(),
414+
X :: rabbit_types:exchange(),
415+
XDeleted :: deleted | not_deleted,
416+
Bindings :: bindings(),
417+
Deletions :: deletions(),
418+
Deletions1 :: deletions().
419+
420+
add_deletion(XName, X, WasDeleted, Bindings, Deletions)
421+
when (WasDeleted =:= deleted orelse WasDeleted =:= not_deleted) andalso
422+
is_list(Bindings) andalso is_map(Deletions) ->
423+
WasDeleted1 = case WasDeleted of
424+
deleted -> true;
425+
not_deleted -> false
426+
end,
427+
Bindings1 = sets:from_list(Bindings, [{version, 2}]),
428+
Deletion = #deletion{exchange = X,
429+
deleted = WasDeleted1,
430+
bindings = Bindings1},
431+
maps:update_with(
432+
XName,
433+
fun(Deletion1) ->
434+
merge_deletion(Deletion1, Deletion)
435+
end, Deletion, Deletions).
384436

385437
-spec combine_deletions(deletions(), deletions()) -> deletions().
386438

387-
combine_deletions(Deletions1, Deletions2) ->
388-
dict:merge(fun (_XName, Entry1, Entry2) -> merge_entry(Entry1, Entry2) end,
389-
Deletions1, Deletions2).
390-
391-
merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
392-
{anything_but(undefined, X1, X2),
393-
anything_but(not_deleted, Deleted1, Deleted2),
394-
Bindings1 ++ Bindings2};
395-
merge_entry({X1, Deleted1, Bindings1, none}, {X2, Deleted2, Bindings2, none}) ->
396-
{anything_but(undefined, X1, X2),
397-
anything_but(not_deleted, Deleted1, Deleted2),
398-
Bindings1 ++ Bindings2, none}.
399-
400-
notify_deletions({error, not_found}, _) ->
401-
ok;
402-
notify_deletions(Deletions, ActingUser) ->
403-
dict:fold(fun (XName, {_X, deleted, Bs, _}, ok) ->
404-
notify_exchange_deletion(XName, ActingUser),
405-
notify_bindings_deletion(Bs, ActingUser);
406-
(_XName, {_X, not_deleted, Bs, _}, ok) ->
407-
notify_bindings_deletion(Bs, ActingUser);
408-
(XName, {_X, deleted, Bs}, ok) ->
439+
combine_deletions(Deletions1, Deletions2)
440+
when is_map(Deletions1) andalso is_map(Deletions2) ->
441+
maps:merge_with(
442+
fun (_XName, Deletion1, Deletion2) ->
443+
merge_deletion(Deletion1, Deletion2)
444+
end, Deletions1, Deletions2).
445+
446+
merge_deletion(
447+
#deletion{deleted = Deleted1, bindings = Bindings1},
448+
#deletion{exchange = X2, deleted = Deleted2, bindings = Bindings2}) ->
449+
%% Assume that X2 is more up to date than X1.
450+
X = X2,
451+
Deleted = Deleted1 orelse Deleted2,
452+
Bindings = sets:union(Bindings1, Bindings2),
453+
#deletion{exchange = X,
454+
deleted = Deleted,
455+
bindings = Bindings}.
456+
457+
-spec notify_deletions(Deletions, ActingUser) -> ok when
458+
Deletions :: rabbit_binding:deletions(),
459+
ActingUser :: rabbit_types:username().
460+
461+
notify_deletions(Deletions, ActingUser) when is_map(Deletions) ->
462+
maps:foreach(
463+
fun (XName, #deletion{deleted = XDeleted, bindings = Bindings}) ->
464+
case XDeleted of
465+
true ->
409466
notify_exchange_deletion(XName, ActingUser),
410-
notify_bindings_deletion(Bs, ActingUser);
411-
(_XName, {_X, not_deleted, Bs}, ok) ->
412-
notify_bindings_deletion(Bs, ActingUser)
413-
end, ok, Deletions).
467+
notify_bindings_deletion(Bindings, ActingUser);
468+
false ->
469+
notify_bindings_deletion(Bindings, ActingUser)
470+
end
471+
end, Deletions).
414472

415473
notify_exchange_deletion(XName, ActingUser) ->
416474
ok = rabbit_event:notify(
417475
exchange_deleted,
418476
[{name, XName},
419477
{user_who_performed_action, ActingUser}]).
420478

421-
notify_bindings_deletion(Bs, ActingUser) ->
422-
[rabbit_event:notify(binding_deleted,
423-
info(B) ++ [{user_who_performed_action, ActingUser}])
424-
|| B <- Bs],
425-
ok.
479+
notify_bindings_deletion(Bindings, ActingUser) ->
480+
sets:fold(
481+
fun(Binding, ok) ->
482+
rabbit_event:notify(
483+
binding_deleted,
484+
info(Binding) ++ [{user_who_performed_action, ActingUser}]),
485+
ok
486+
end, ok, Bindings).
426487

427-
-spec process_deletions(deletions()) -> deletions().
488+
-spec process_deletions(deletions()) -> ok.
428489
process_deletions(Deletions) ->
429-
dict:map(fun (_XName, {X, deleted, Bindings}) ->
430-
Bs = lists:flatten(Bindings),
431-
Serial = rabbit_exchange:serial(X),
432-
rabbit_exchange:callback(X, delete, Serial, [X]),
433-
{X, deleted, Bs, none};
434-
(_XName, {X, not_deleted, Bindings}) ->
435-
Bs = lists:flatten(Bindings),
436-
Serial = rabbit_exchange:serial(X),
437-
rabbit_exchange:callback(X, remove_bindings, Serial, [X, Bs]),
438-
{X, not_deleted, Bs, none}
439-
end, Deletions).
440-
441-
binding_checks(Binding, InnerFun) ->
442-
fun(Src, Dst) ->
443-
case rabbit_exchange:validate_binding(Src, Binding) of
444-
ok ->
445-
%% this argument is used to check queue exclusivity;
446-
%% in general, we want to fail on that in preference to
447-
%% anything else
448-
InnerFun(Src, Dst);
449-
Err ->
450-
Err
451-
end
490+
maps:foreach(
491+
fun (_XName, #deletion{exchange = X,
492+
deleted = XDeleted,
493+
bindings = Bindings}) ->
494+
Serial = rabbit_exchange:serial(X),
495+
case XDeleted of
496+
true ->
497+
rabbit_exchange:callback(X, delete, Serial, [X]);
498+
false ->
499+
Bindings1 = sets:to_list(Bindings),
500+
rabbit_exchange:callback(
501+
X, remove_bindings, Serial, [X, Bindings1])
502+
end
503+
end, Deletions).
504+
505+
-spec fetch_deletion(XName, Deletions) -> Ret when
506+
XName :: rabbit_exchange:name(),
507+
Deletions :: deletions(),
508+
Ret :: {X, WasDeleted, Bindings},
509+
X :: rabbit_types:exchange(),
510+
WasDeleted :: deleted | not_deleted,
511+
Bindings :: bindings().
512+
%% @doc Fetches the deletions for the given exchange name.
513+
%%
514+
%% This function is only intended for use in tests.
515+
%%
516+
%% @private
517+
518+
fetch_deletion(XName, Deletions) ->
519+
case maps:find(XName, Deletions) of
520+
{ok, #deletion{exchange = X,
521+
deleted = Deleted,
522+
bindings = Bindings}} ->
523+
WasDeleted = case Deleted of
524+
true ->
525+
deleted;
526+
false ->
527+
not_deleted
528+
end,
529+
Bindings1 = sets:to_list(Bindings),
530+
{X, WasDeleted, Bindings1};
531+
error ->
532+
error
452533
end.

deps/rabbit/src/rabbit_db_binding.erl

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,10 @@ delete_in_mnesia(Src, Dst, B) ->
302302
should_index_table(Src), fun delete/3),
303303
Deletions0 = maybe_auto_delete_exchange_in_mnesia(
304304
B#binding.source, [B], rabbit_binding:new_deletions(), false),
305-
fun() -> {ok, rabbit_binding:process_deletions(Deletions0)} end.
305+
fun() ->
306+
ok = rabbit_binding:process_deletions(Deletions0),
307+
{ok, Deletions0}
308+
end.
306309

307310
absent_errs_only_in_mnesia(Names) ->
308311
Errs = [E || Name <- Names,
@@ -352,7 +355,8 @@ delete_in_khepri(#binding{source = SrcName,
352355
{error, _} = Err ->
353356
Err;
354357
Deletions ->
355-
{ok, rabbit_binding:process_deletions(Deletions)}
358+
ok = rabbit_binding:process_deletions(Deletions),
359+
{ok, Deletions}
356360
end.
357361

358362
exists_in_khepri(Path, Binding) ->
@@ -379,15 +383,18 @@ delete_in_khepri(Binding) ->
379383
end.
380384

381385
maybe_auto_delete_exchange_in_khepri(XName, Bindings, Deletions, OnlyDurable) ->
382-
{Entry, Deletions1} =
383-
case rabbit_db_exchange:maybe_auto_delete_in_khepri(XName, OnlyDurable) of
384-
{not_deleted, X} ->
385-
{{X, not_deleted, Bindings}, Deletions};
386-
{deleted, X, Deletions2} ->
387-
{{X, deleted, Bindings},
388-
rabbit_binding:combine_deletions(Deletions, Deletions2)}
389-
end,
390-
rabbit_binding:add_deletion(XName, Entry, Deletions1).
386+
case rabbit_db_exchange:maybe_auto_delete_in_khepri(XName, OnlyDurable) of
387+
{not_deleted, undefined} ->
388+
Deletions;
389+
{not_deleted, X} ->
390+
rabbit_binding:add_deletion(
391+
XName, X, not_deleted, Bindings, Deletions);
392+
{deleted, X, Deletions1} ->
393+
Deletions2 = rabbit_binding:combine_deletions(
394+
Deletions, Deletions1),
395+
rabbit_binding:add_deletion(
396+
XName, X, deleted, Bindings, Deletions2)
397+
end.
391398

392399
%% -------------------------------------------------------------------
393400
%% get_all().
@@ -1152,15 +1159,18 @@ sync_index_route(_, _, _) ->
11521159
OnlyDurable :: boolean(),
11531160
Ret :: rabbit_binding:deletions().
11541161
maybe_auto_delete_exchange_in_mnesia(XName, Bindings, Deletions, OnlyDurable) ->
1155-
{Entry, Deletions1} =
1156-
case rabbit_db_exchange:maybe_auto_delete_in_mnesia(XName, OnlyDurable) of
1157-
{not_deleted, X} ->
1158-
{{X, not_deleted, Bindings}, Deletions};
1159-
{deleted, X, Deletions2} ->
1160-
{{X, deleted, Bindings},
1161-
rabbit_binding:combine_deletions(Deletions, Deletions2)}
1162-
end,
1163-
rabbit_binding:add_deletion(XName, Entry, Deletions1).
1162+
case rabbit_db_exchange:maybe_auto_delete_in_mnesia(XName, OnlyDurable) of
1163+
{not_deleted, undefined} ->
1164+
Deletions;
1165+
{not_deleted, X} ->
1166+
rabbit_binding:add_deletion(
1167+
XName, X, not_deleted, Bindings, Deletions);
1168+
{deleted, X, Deletions1} ->
1169+
Deletions2 = rabbit_binding:combine_deletions(
1170+
Deletions, Deletions1),
1171+
rabbit_binding:add_deletion(
1172+
XName, X, deleted, Bindings, Deletions2)
1173+
end.
11641174

11651175
%% Instead of locking entire table on remove operations we can lock the
11661176
%% affected resource only.

0 commit comments

Comments
 (0)