diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index ba6460a9c064..7dda47984c5a 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -1344,7 +1344,7 @@ register_rabbit_exchange_projection() -> _VHost = ?KHEPRI_WILDCARD_STAR, _Name = ?KHEPRI_WILDCARD_STAR), KeyPos = #exchange.name, - register_simple_projection(Name, PathPattern, KeyPos). + register_simple_projection(Name, PathPattern, KeyPos, true). register_rabbit_queue_projection() -> Name = rabbit_khepri_queue, @@ -1352,28 +1352,28 @@ register_rabbit_queue_projection() -> _VHost = ?KHEPRI_WILDCARD_STAR, _Name = ?KHEPRI_WILDCARD_STAR), KeyPos = 2, %% #amqqueue.name - register_simple_projection(Name, PathPattern, KeyPos). + register_simple_projection(Name, PathPattern, KeyPos, true). register_rabbit_vhost_projection() -> Name = rabbit_khepri_vhost, PathPattern = rabbit_db_vhost:khepri_vhost_path( _VHost = ?KHEPRI_WILDCARD_STAR), KeyPos = 2, %% #vhost.virtual_host - register_simple_projection(Name, PathPattern, KeyPos). + register_simple_projection(Name, PathPattern, KeyPos, false). register_rabbit_users_projection() -> Name = rabbit_khepri_user, PathPattern = rabbit_db_user:khepri_user_path( _UserName = ?KHEPRI_WILDCARD_STAR), KeyPos = 2, %% #internal_user.username - register_simple_projection(Name, PathPattern, KeyPos). + register_simple_projection(Name, PathPattern, KeyPos, false). register_rabbit_global_runtime_parameters_projection() -> Name = rabbit_khepri_global_rtparam, PathPattern = rabbit_db_rtparams:khepri_global_rp_path( _Key = ?KHEPRI_WILDCARD_STAR_STAR), KeyPos = #runtime_parameters.key, - register_simple_projection(Name, PathPattern, KeyPos). + register_simple_projection(Name, PathPattern, KeyPos, false). register_rabbit_per_vhost_runtime_parameters_projection() -> Name = rabbit_khepri_per_vhost_rtparam, @@ -1382,7 +1382,7 @@ register_rabbit_per_vhost_runtime_parameters_projection() -> _Component = ?KHEPRI_WILDCARD_STAR_STAR, _Name = ?KHEPRI_WILDCARD_STAR_STAR), KeyPos = #runtime_parameters.key, - register_simple_projection(Name, PathPattern, KeyPos). + register_simple_projection(Name, PathPattern, KeyPos, false). register_rabbit_user_permissions_projection() -> Name = rabbit_khepri_user_permission, @@ -1390,10 +1390,11 @@ register_rabbit_user_permissions_projection() -> _UserName = ?KHEPRI_WILDCARD_STAR, _VHost = ?KHEPRI_WILDCARD_STAR), KeyPos = #user_permission.user_vhost, - register_simple_projection(Name, PathPattern, KeyPos). + register_simple_projection(Name, PathPattern, KeyPos, false). -register_simple_projection(Name, PathPattern, KeyPos) -> - Options = #{keypos => KeyPos}, +register_simple_projection(Name, PathPattern, KeyPos, ReadConcurrency) -> + Options = #{keypos => KeyPos, + read_concurrency => ReadConcurrency}, Projection = khepri_projection:new(Name, copy, Options), khepri:register_projection(?STORE_ID, PathPattern, Projection). @@ -1429,7 +1430,9 @@ register_rabbit_index_route_projection() -> destination = Destination} end, ProjectionFun = projection_fun_for_sets(MapFun), - Options = #{type => bag, keypos => #index_route.source_key}, + Options = #{type => bag, + keypos => #index_route.source_key, + read_concurrency => true}, Projection = khepri_projection:new( rabbit_khepri_index_route, ProjectionFun, Options), DirectOrFanout = #if_data_matches{ @@ -1497,7 +1500,8 @@ register_rabbit_topic_graph_projection() -> end, Options = #{keypos => #topic_trie_edge.trie_edge, standalone_fun_options => - #{should_process_function => ShouldProcessFun}}, + #{should_process_function => ShouldProcessFun}, + read_concurrency => true}, ProjectionFun = fun(Table, Path, OldProps, NewProps) -> {