Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -216,5 +216,6 @@
{'rabbitmq_4.2.0',
#{desc => "Allows rolling upgrades to 4.2.x",
stability => stable,
depends_on => ['rabbitmq_4.1.0']
depends_on => ['rabbitmq_4.1.0'],
callbacks => #{enable => {rabbit_khepri, enable_feature_flag}}
}}).
16 changes: 5 additions & 11 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
-define(MNESIA_REVERSE_TABLE, rabbit_reverse_route).
-define(MNESIA_INDEX_TABLE, rabbit_index_route).
-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_binding).
-define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route).
-define(KHEPRI_ROUTE_BY_SOURCE_KEY_PROJECTION, rabbit_khepri_route_by_source_key).
-define(KHEPRI_ROUTE_BY_SOURCE_PROJECTION, rabbit_khepri_route_by_source).

%% -------------------------------------------------------------------
%% exists().
Expand Down Expand Up @@ -707,21 +708,14 @@ match_routing_key_in_mnesia(SrcName, RoutingKeys, UseIndex) ->
end.

match_routing_key_in_khepri(Src, ['_']) ->
try
MatchHead = #index_route{source_key = {Src, '_'},
destination = '$1',
_ = '_'},
ets:select(?KHEPRI_INDEX_ROUTE_PROJECTION, [{MatchHead, [], ['$1']}])
catch
error:badarg ->
[]
end;
ets:lookup_element(?KHEPRI_ROUTE_BY_SOURCE_PROJECTION,
Src, #route_by_source.destination, []);
match_routing_key_in_khepri(Src, RoutingKeys) ->
lists:foldl(
fun(RK, Acc) ->
try
Dst = ets:lookup_element(
?KHEPRI_INDEX_ROUTE_PROJECTION,
?KHEPRI_ROUTE_BY_SOURCE_KEY_PROJECTION,
{Src, RK},
#index_route.destination),
Dst ++ Acc
Expand Down
157 changes: 114 additions & 43 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
%% executed. If the migration runs concurrently, whether it started before or
%% during the execution of the Mnesia-specific anonymous function, {@link
%% handle_fallback/1} will watch for "no exists" table exceptions from Mnesia
%% and will retry the Mnesia functino or run the Khepri function accordingly.
%% and will retry the Mnesia function or run the Khepri function accordingly.
%% The Mnesia function must be idempotent because it can be executed multiple
%% times.
%%
Expand Down Expand Up @@ -170,6 +170,7 @@
%% equivalent cluster
-export([khepri_db_migration_enable/1,
khepri_db_migration_post_enable/1,
enable_feature_flag/1,
is_enabled/0, is_enabled/1,
get_feature_state/0, get_feature_state/1,
handle_fallback/1]).
Expand Down Expand Up @@ -332,8 +333,10 @@ init(IsVirgin) ->
"up to the Raft cluster leader", [],
#{domain => ?RMQLOG_DOMAIN_DB}),
ok ?= case IsVirgin of
true -> register_projections();
false -> ok
true ->
register_projections();
false ->
register_4_2_0_projections()
end,
%% Delete transient queues on init.
%% Note that we also do this in the
Expand Down Expand Up @@ -1318,25 +1321,34 @@ register_projections() ->
fun register_rabbit_per_vhost_runtime_parameters_projection/0,
fun register_rabbit_user_permissions_projection/0,
fun register_rabbit_bindings_projection/0,
fun register_rabbit_index_route_projection/0,
fun register_rabbit_route_by_source_key_projection/0,
fun register_rabbit_route_by_source_projection/0,
fun register_rabbit_topic_graph_projection/0],
rabbit_misc:for_each_while_ok(
fun(RegisterFun) ->
case RegisterFun() of
ok ->
ok;
%% Before Khepri v0.13.0, `khepri:register_projection/1,2,3`
%% would return `{error, exists}` for projections which
%% already exist.
{error, exists} ->
ok;
%% In v0.13.0+, Khepri returns a `?khepri_error(..)` instead.
{error, {khepri, projection_already_exists, _Info}} ->
ok;
{error, _} = Error ->
Error
end
end, RegFuns).
rabbit_misc:for_each_while_ok(fun register_projection/1, RegFuns).

%% This function registers projections introduced in 4.2.0. In a mixed version
%% cluster, these new projections will appear but won't be used on older nodes.
%% This function can be deleted after feature flag rabbitmq_4.2.0 becomes required.
register_4_2_0_projections() ->
RegFuns = [fun register_rabbit_route_by_source_key_projection/0,
fun register_rabbit_route_by_source_projection/0],
rabbit_misc:for_each_while_ok(fun register_projection/1, RegFuns).

register_projection(RegisterFun) ->
case RegisterFun() of
ok ->
ok;
%% Before Khepri v0.13.0, `khepri:register_projection/1,2,3`
%% would return `{error, exists}` for projections which
%% already exist.
{error, exists} ->
ok;
%% In v0.13.0+, Khepri returns a `?khepri_error(..)` instead.
{error, {khepri, projection_already_exists, _Info}} ->
ok;
{error, _} = Error ->
Error
end.

register_rabbit_exchange_projection() ->
Name = rabbit_khepri_exchange,
Expand Down Expand Up @@ -1414,35 +1426,78 @@ register_rabbit_bindings_projection() ->
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
khepri:register_projection(?STORE_ID, PathPattern, Projection).

register_rabbit_index_route_projection() ->
MapFun = fun(Path, _) ->
{
VHost,
ExchangeName,
Kind,
DstName,
RoutingKey
} = rabbit_db_binding:khepri_route_path_to_args(Path),
Exchange = rabbit_misc:r(VHost, exchange, ExchangeName),
Destination = rabbit_misc:r(VHost, Kind, DstName),
SourceKey = {Exchange, RoutingKey},
#index_route{source_key = SourceKey,
destination = Destination}
register_rabbit_route_by_source_key_projection() ->
MapFun = fun(_Path, #binding{source = Source,
key = Key,
destination = Destination,
args = Args}) ->
#index_route{source_key = {Source, Key},
destination = Destination,
args = Args}
end,
ProjectionFun = projection_fun_for_sets(MapFun),
Options = #{type => bag,
keypos => #index_route.source_key,
read_concurrency => true},
Projection = khepri_projection:new(
rabbit_khepri_index_route, ProjectionFun, Options),
DirectOrFanout = #if_data_matches{
pattern = #exchange{type = '$1', _ = '_'},
conditions = [{'andalso',
{'=/=', '$1', headers},
{'=/=', '$1', topic}}]},
Projection = khepri_projection:new(rabbit_khepri_route_by_source_key,
ProjectionFun,
Options),
Exchange = #if_data_matches{
pattern = #exchange{type = '$1', _ = '_'},
conditions = [{'andalso',
{'=/=', '$1', headers},
{'=/=', '$1', topic},
{'=/=', '$1', fanout},
{'=/=', '$1', 'x-jms-topic'},
{'=/=', '$1', 'x-random'}
}]},
PathPattern = rabbit_db_binding:khepri_route_path(
_VHost = ?KHEPRI_WILDCARD_STAR,
_Exchange = DirectOrFanout,
Exchange,
_Kind = ?KHEPRI_WILDCARD_STAR,
_DstName = ?KHEPRI_WILDCARD_STAR,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
khepri:register_projection(?STORE_ID, PathPattern, Projection).

register_rabbit_route_by_source_projection() ->
MapFun = fun(_Path, #binding{source = Source,
key = Key,
destination = Destination,
args = Args}) ->
#route_by_source{source = Source,
key = Key,
destination = Destination,
args = Args}
end,
ProjectionFun = projection_fun_for_sets(MapFun),
Options = #{type => bag,
keypos => #route_by_source.source,
read_concurrency => true},
Projection = khepri_projection:new(rabbit_khepri_route_by_source,
ProjectionFun,
Options),
%% For some exchange types we know that they won't use this projection.
%% So we exclude such bindings for two reasons:
%% 1. Lower overall ETS memory usage
%% 2. "Avoid inserting an extensive amount of objects with the same key.
%% It will hurt insert and lookup performance as well as real time characteristics
%% of the runtime environment (hash bucket linear search do not yield)."
%% Example: same source direct exchange with 100k different binding keys.
%% In future, rather than exchange types exclusion as done here, a nicer approach
%% would be that each exchange requiring routing lookup by only source exchange
%% advertises this access pattern, e.g. as a boolean flag in the #exchange.options field.
Exchange = #if_data_matches{
pattern = #exchange{type = '$1', _ = '_'},
conditions = [{'andalso',
{'=/=', '$1', headers},
{'=/=', '$1', topic},
{'=/=', '$1', direct},
{'=/=', '$1', 'x-local-random'},
{'=/=', '$1', 'x-jms-topic'}
}]},
PathPattern = rabbit_db_binding:khepri_route_path(
_VHost = ?KHEPRI_WILDCARD_STAR,
Exchange,
_Kind = ?KHEPRI_WILDCARD_STAR,
_DstName = ?KHEPRI_WILDCARD_STAR,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
Expand Down Expand Up @@ -1761,6 +1816,22 @@ khepri_db_migration_post_enable(
_ = mnesia_to_khepri:rollback_table_copy(?STORE_ID, ?MIGRATION_ID),
ok.

enable_feature_flag(#{command := enable,
feature_name := 'rabbitmq_4.2.0' = FeatureName}) ->
%% We unregister this projection because it's superseded by
%% rabbit_khepri_route_by_source_key introduced in 4.2.0
ProjectionName = rabbit_khepri_index_route,
Result = try khepri:unregister_projections(?STORE_ID, [ProjectionName])
catch _:Reason -> Reason
end,
?LOG_DEBUG(
"enabling feature flag ~s unregisters projection ~s: ~tp",
[FeatureName, ProjectionName, Result],
#{domain => ?RMQLOG_DOMAIN_DB}),
ok;
enable_feature_flag(_) ->
ok.

-spec sync_cluster_membership_from_mnesia(FeatureName) -> Ret when
FeatureName :: rabbit_feature_flags:feature_name(),
Ret :: ok | {error, Reason},
Expand Down
Loading
Loading