@@ -66,87 +66,42 @@ start_link() ->
6666% % Note that the next supervisor down, rabbit_federation_link_sup, is common
6767% % between exchanges and queues.
6868start_child (X ) ->
69- Result =
70- case child_exists (X ) orelse
71- mirrored_supervisor :start_child (
72- ? SUPERVISOR ,
73- {id (X ), {rabbit_federation_link_sup , start_link , [X ]},
74- transient , ? SUPERVISOR_WAIT , supervisor ,
75- [rabbit_federation_link_sup ]}) of
76- true ->
77- already_started ;
78- {ok , _Pid } ->
79- ok ;
80- {error , {already_started , _Pid }} ->
81- already_started ;
82- % % A link returned {stop, gone}, the link_sup shut down, that's OK.
83- {error , {shutdown , _ }} ->
84- ok
85- end ,
86- case Result of
87- ok ->
88- ok ;
89- already_started ->
90- # exchange {name = ExchangeName } = X ,
91- rabbit_log_federation :debug (" Federation link for exchange ~tp was already started" ,
92- [rabbit_misc :rs (ExchangeName )]),
93- ok
69+ case mirrored_supervisor :start_child (
70+ ? SUPERVISOR ,
71+ {id (X ), {rabbit_federation_link_sup , start_link , [X ]},
72+ transient , ? SUPERVISOR_WAIT , supervisor ,
73+ [rabbit_federation_link_sup ]}) of
74+ {ok , _Pid } -> ok ;
75+ {error , {already_started , _Pid }} ->
76+ # exchange {name = ExchangeName } = X ,
77+ rabbit_log_federation :debug (" Federation link for exchange ~tp was already started" ,
78+ [rabbit_misc :rs (ExchangeName )]),
79+ ok ;
80+ % % A link returned {stop, gone}, the link_sup shut down, that's OK.
81+ {error , {shutdown , _ }} -> ok
9482 end .
9583
96-
97- child_exists (Name ) ->
98- Id = id (Name ),
99- % % older format, pre-3.13.0
100- OldId = old_id (Name ),
101- lists :any (fun ({ChildId , _ , _ , _ }) ->
102- ChildId =:= Id orelse ChildId =:= OldId
103- end ,
104- mirrored_supervisor :which_children (? SUPERVISOR )).
105-
10684adjust ({clear_upstream , VHost , UpstreamName }) ->
107- _ = [rabbit_federation_link_sup :adjust (Pid , exchange_record_from_child_id (Id ), {clear_upstream , UpstreamName }) ||
108- {Id , Pid , _ , _ } <- mirrored_supervisor :which_children (? SUPERVISOR ),
109- virtual_host_name_from_child_id (Id ) =:= VHost
110- ],
85+ _ = [rabbit_federation_link_sup :adjust (Pid , X , {clear_upstream , UpstreamName }) ||
86+ {{_ , # exchange {name = Name } = X }, Pid , _ , _ } <- mirrored_supervisor :which_children (? SUPERVISOR ),
87+ Name # resource .virtual_host == VHost ],
11188 ok ;
11289adjust (Reason ) ->
113- _ = [rabbit_federation_link_sup :adjust (Pid , exchange_record_from_child_id (Id ), Reason ) ||
114- {Id , Pid , _ , _ } <- mirrored_supervisor :which_children (? SUPERVISOR )
115- ],
90+ _ = [rabbit_federation_link_sup :adjust (Pid , X , Reason ) ||
91+ {{_ , X }, Pid , _ , _ } <- mirrored_supervisor :which_children (? SUPERVISOR )],
11692 ok .
11793
11894stop_child (X ) ->
119- Result =
120- case stop_and_delete_child (id (X )) of
121- ok -> ok ;
122- {error , not_found } = Error ->
123- case rabbit_khepri :is_enabled () of
124- true ->
125- % % Old id format is not supported by Khepri and cannot exist there
126- Error ;
127- false ->
128- % % try old format, pre-3.13.0
129- stop_and_delete_child (old_id (X ))
130- end
131- end ,
132- case Result of
133- ok ->
134- ok ;
135- {error , Err } ->
136- # exchange {name = ExchangeName } = X ,
137- rabbit_log_federation :warning (
138- " Attempt to stop a federation link for exchange ~tp failed: ~tp " ,
139- [rabbit_misc :rs (ExchangeName ), Err ]),
140- ok
141- end .
142-
143- stop_and_delete_child (Id ) ->
144- case mirrored_supervisor :terminate_child (? SUPERVISOR , Id ) of
145- ok ->
146- ok = mirrored_supervisor :delete_child (? SUPERVISOR , Id );
147- {error , not_found } = Error ->
148- Error
149- end .
95+ case mirrored_supervisor :terminate_child (? SUPERVISOR , id (X )) of
96+ ok -> ok ;
97+ {error , Err } ->
98+ # exchange {name = ExchangeName } = X ,
99+ rabbit_log_federation :warning (
100+ " Attempt to stop a federation link for exchange ~tp failed: ~tp " ,
101+ [rabbit_misc :rs (ExchangeName ), Err ]),
102+ ok
103+ end ,
104+ ok = mirrored_supervisor :delete_child (? SUPERVISOR , id (X )).
150105
151106% %----------------------------------------------------------------------------
152107
@@ -160,20 +115,3 @@ id(X = #exchange{policy = Policy}) ->
160115
161116simple_id (# exchange {name = # resource {virtual_host = VHost , name = Name }}) ->
162117 [exchange , VHost , Name ].
163-
164- % % Old child id format, pre 3.13.0
165- old_id (X = # exchange {policy = Policy }) ->
166- X1 = rabbit_exchange :immutable (X ),
167- X1 # exchange {policy = Policy }.
168-
169- % % New child id format, introduced in 3.13.0 for Khepri
170- exchange_record_from_child_id ({_ , # exchange {} = XR }) ->
171- XR ;
172- % % Old child id format, pre-3.13.0
173- exchange_record_from_child_id (# exchange {} = XR ) ->
174- XR .
175-
176- virtual_host_name_from_child_id ({_ , # exchange {name = Res }}) ->
177- Res # resource .virtual_host ;
178- virtual_host_name_from_child_id (# exchange {name = Res }) ->
179- Res # resource .virtual_host .
0 commit comments