8787
8888-module (rabbit_khepri ).
8989
90+ -feature (maybe_expr , enable ).
91+
9092-include_lib (" kernel/include/logger.hrl" ).
9193-include_lib (" stdlib/include/assert.hrl" ).
9294
98100
99101-export ([setup /0 ,
100102 setup /1 ,
101- init /0 ,
103+ register_projections /0 ,
104+ init /1 ,
102105 can_join_cluster /1 ,
103106 add_member /2 ,
104107 remove_member /1 ,
@@ -267,7 +270,6 @@ setup(_) ->
267270 RetryTimeout = retry_timeout (),
268271 case khepri_cluster :wait_for_leader (? STORE_ID , RetryTimeout ) of
269272 ok ->
270- wait_for_register_projections (),
271273 ? LOG_DEBUG (
272274 " Khepri-based " ? RA_FRIENDLY_NAME " ready" ,
273275 #{domain => ? RMQLOG_DOMAIN_GLOBAL }),
@@ -287,49 +289,46 @@ retry_timeout() ->
287289 undefined -> 30000
288290 end .
289291
290- retry_limit () ->
291- case application :get_env (rabbit , khepri_leader_wait_retry_limit ) of
292- {ok , T } -> T ;
293- undefined -> 10
294- end .
295-
296- wait_for_register_projections () ->
297- wait_for_register_projections (retry_timeout (), retry_limit ()).
298-
299- wait_for_register_projections (_Timeout , 0 ) ->
300- exit (timeout_waiting_for_khepri_projections );
301- wait_for_register_projections (Timeout , Retries ) ->
302- rabbit_log :info (" Waiting for Khepri projections for ~tp ms, ~tp retries left" ,
303- [Timeout , Retries - 1 ]),
304- try
305- register_projections ()
306- catch
307- throw : timeout ->
308- wait_for_register_projections (Timeout , Retries - 1 )
309- end .
310-
311292% % @private
312293
313- -spec init () -> Ret when
294+ -spec init (IsVirgin ) -> Ret when
295+ IsVirgin :: boolean (),
314296 Ret :: ok | timeout_error ().
315297
316- init () ->
298+ init (IsVirgin ) ->
317299 case members () of
318300 [] ->
319301 timer :sleep (1000 ),
320- init ();
302+ init (IsVirgin );
321303 Members ->
322304 ? LOG_NOTICE (
323305 " Found the following metadata store members: ~p " , [Members ],
324306 #{domain => ? RMQLOG_DOMAIN_DB }),
325- % % Delete transient queues on init.
326- % % Note that we also do this in the
327- % % `rabbit_amqqueue:on_node_down/1' callback. We must try this
328- % % deletion during init because the cluster may have been in a
329- % % minority when this node went down. We wait for a majority while
330- % % booting (via `rabbit_khepri:setup/0') though so this deletion is
331- % % likely to succeed.
332- rabbit_amqqueue :delete_transient_queues_on_node (node ())
307+ maybe
308+ ? LOG_DEBUG (
309+ " Khepri-based " ? RA_FRIENDLY_NAME " catching up on "
310+ " replication to the Raft cluster leader" , [],
311+ #{domain => ? RMQLOG_DOMAIN_DB }),
312+ ok ?= fence (retry_timeout ()),
313+ ? LOG_DEBUG (
314+ " local Khepri-based " ? RA_FRIENDLY_NAME " member is caught "
315+ " up to the Raft cluster leader" , [],
316+ #{domain => ? RMQLOG_DOMAIN_DB }),
317+ ok ?= case IsVirgin of
318+ true ->
319+ register_projections ();
320+ false ->
321+ ok
322+ end ,
323+ % % Delete transient queues on init.
324+ % % Note that we also do this in the
325+ % % `rabbit_amqqueue:on_node_down/1' callback. We must try this
326+ % % deletion during init because the cluster may have been in a
327+ % % minority when this node went down. We wait for a majority
328+ % % while registering projections above though so this deletion
329+ % % is likely to succeed.
330+ rabbit_amqqueue :delete_transient_queues_on_node (node ())
331+ end
333332 end .
334333
335334% % @private
@@ -1063,6 +1062,9 @@ info() ->
10631062handle_async_ret (RaEvent ) ->
10641063 khepri :handle_async_ret (? STORE_ID , RaEvent ).
10651064
1065+ fence (Timeout ) ->
1066+ khepri :fence (? STORE_ID , Timeout ).
1067+
10661068% % -------------------------------------------------------------------
10671069% % collect_payloads().
10681070% % -------------------------------------------------------------------
@@ -1105,6 +1107,27 @@ collect_payloads(Props, Acc0) when is_map(Props) andalso is_list(Acc0) ->
11051107 Acc
11061108 end , Acc0 , Props ).
11071109
1110+ -spec unregister_all_projections () -> Ret when
1111+ Ret :: ok | timeout_error ().
1112+
1113+ unregister_all_projections () ->
1114+ % % Note that we don't use `all' since `khepri_mnesia_migration' also
1115+ % % creates a projection table which we don't want to unregister. Instead
1116+ % % we list all of the currently used projection names:
1117+ Names = [
1118+ rabbit_khepri_exchange ,
1119+ rabbit_khepri_queue ,
1120+ rabbit_khepri_vhost ,
1121+ rabbit_khepri_users ,
1122+ rabbit_khepri_global_rtparams ,
1123+ rabbit_khepri_per_vhost_rtparams ,
1124+ rabbit_khepri_user_permissions ,
1125+ rabbit_khepri_bindings ,
1126+ rabbit_khepri_index_route ,
1127+ rabbit_khepri_topic_trie
1128+ ],
1129+ khepri :unregister_projections (? STORE_ID , Names ).
1130+
11081131register_projections () ->
11091132 RegFuns = [fun register_rabbit_exchange_projection /0 ,
11101133 fun register_rabbit_queue_projection /0 ,
@@ -1116,20 +1139,23 @@ register_projections() ->
11161139 fun register_rabbit_bindings_projection /0 ,
11171140 fun register_rabbit_index_route_projection /0 ,
11181141 fun register_rabbit_topic_graph_projection /0 ],
1119- [case RegisterFun () of
1120- ok ->
1121- ok ;
1122- % % Before Khepri v0.13.0, `khepri:register_projection/1,2,3` would
1123- % % return `{error, exists}` for projections which already exist.
1124- {error , exists } ->
1125- ok ;
1126- % % In v0.13.0+, Khepri returns a `?khepri_error(..)` instead.
1127- {error , {khepri , projection_already_exists , _Info }} ->
1128- ok ;
1129- {error , Error } ->
1130- throw (Error )
1131- end || RegisterFun <- RegFuns ],
1132- ok .
1142+ rabbit_misc :for_each_while_ok (
1143+ fun (RegisterFun ) ->
1144+ case RegisterFun () of
1145+ ok ->
1146+ ok ;
1147+ % % Before Khepri v0.13.0, `khepri:register_projection/1,2,3`
1148+ % % would return `{error, exists}` for projections which
1149+ % % already exist.
1150+ {error , exists } ->
1151+ ok ;
1152+ % % In v0.13.0+, Khepri returns a `?khepri_error(..)` instead.
1153+ {error , {khepri , projection_already_exists , _Info }} ->
1154+ ok ;
1155+ {error , _ } = Error ->
1156+ Error
1157+ end
1158+ end , RegFuns ).
11331159
11341160register_rabbit_exchange_projection () ->
11351161 Name = rabbit_khepri_exchange ,
@@ -1188,7 +1214,7 @@ register_rabbit_user_permissions_projection() ->
11881214register_simple_projection (Name , PathPattern , KeyPos ) ->
11891215 Options = #{keypos => KeyPos },
11901216 Projection = khepri_projection :new (Name , copy , Options ),
1191- khepri :register_projection (? RA_CLUSTER_NAME , PathPattern , Projection ).
1217+ khepri :register_projection (? STORE_ID , PathPattern , Projection ).
11921218
11931219register_rabbit_bindings_projection () ->
11941220 MapFun = fun (_Path , Binding ) ->
@@ -1204,7 +1230,7 @@ register_rabbit_bindings_projection() ->
12041230 _Kind = ? KHEPRI_WILDCARD_STAR ,
12051231 _DstName = ? KHEPRI_WILDCARD_STAR ,
12061232 _RoutingKey = ? KHEPRI_WILDCARD_STAR ),
1207- khepri :register_projection (? RA_CLUSTER_NAME , PathPattern , Projection ).
1233+ khepri :register_projection (? STORE_ID , PathPattern , Projection ).
12081234
12091235register_rabbit_index_route_projection () ->
12101236 MapFun = fun (Path , _ ) ->
@@ -1236,7 +1262,7 @@ register_rabbit_index_route_projection() ->
12361262 _Kind = ? KHEPRI_WILDCARD_STAR ,
12371263 _DstName = ? KHEPRI_WILDCARD_STAR ,
12381264 _RoutingKey = ? KHEPRI_WILDCARD_STAR ),
1239- khepri :register_projection (? RA_CLUSTER_NAME , PathPattern , Projection ).
1265+ khepri :register_projection (? STORE_ID , PathPattern , Projection ).
12401266
12411267% % Routing information is stored in the Khepri store as a `set'.
12421268% % In order to turn these bindings into records in an ETS `bag', we use a
@@ -1337,7 +1363,7 @@ register_rabbit_topic_graph_projection() ->
13371363 _Kind = ? KHEPRI_WILDCARD_STAR ,
13381364 _DstName = ? KHEPRI_WILDCARD_STAR ,
13391365 _RoutingKey = ? KHEPRI_WILDCARD_STAR ),
1340- khepri :register_projection (? RA_CLUSTER_NAME , PathPattern , Projection ).
1366+ khepri :register_projection (? STORE_ID , PathPattern , Projection ).
13411367
13421368- spec follow_down_update (Table , Exchange , Words , UpdateFn ) -> Ret when
13431369 Table :: ets :tid (),
@@ -1515,9 +1541,11 @@ get_feature_state(Node) ->
15151541% % @private
15161542
15171543khepri_db_migration_enable (#{feature_name := FeatureName }) ->
1518- case sync_cluster_membership_from_mnesia (FeatureName ) of
1519- ok -> migrate_mnesia_tables (FeatureName );
1520- Error -> Error
1544+ maybe
1545+ ok ?= sync_cluster_membership_from_mnesia (FeatureName ),
1546+ ok ?= unregister_all_projections (),
1547+ ok ?= register_projections (),
1548+ migrate_mnesia_tables (FeatureName )
15211549 end .
15221550
15231551% % @private
0 commit comments