@@ -1344,36 +1344,36 @@ register_rabbit_exchange_projection() ->
13441344 _VHost = ? KHEPRI_WILDCARD_STAR ,
13451345 _Name = ? KHEPRI_WILDCARD_STAR ),
13461346 KeyPos = # exchange .name ,
1347- register_simple_projection (Name , PathPattern , KeyPos ).
1347+ register_simple_projection (Name , PathPattern , KeyPos , true ).
13481348
13491349register_rabbit_queue_projection () ->
13501350 Name = rabbit_khepri_queue ,
13511351 PathPattern = rabbit_db_queue :khepri_queue_path (
13521352 _VHost = ? KHEPRI_WILDCARD_STAR ,
13531353 _Name = ? KHEPRI_WILDCARD_STAR ),
13541354 KeyPos = 2 , % % #amqqueue.name
1355- register_simple_projection (Name , PathPattern , KeyPos ).
1355+ register_simple_projection (Name , PathPattern , KeyPos , true ).
13561356
13571357register_rabbit_vhost_projection () ->
13581358 Name = rabbit_khepri_vhost ,
13591359 PathPattern = rabbit_db_vhost :khepri_vhost_path (
13601360 _VHost = ? KHEPRI_WILDCARD_STAR ),
13611361 KeyPos = 2 , % % #vhost.virtual_host
1362- register_simple_projection (Name , PathPattern , KeyPos ).
1362+ register_simple_projection (Name , PathPattern , KeyPos , false ).
13631363
13641364register_rabbit_users_projection () ->
13651365 Name = rabbit_khepri_user ,
13661366 PathPattern = rabbit_db_user :khepri_user_path (
13671367 _UserName = ? KHEPRI_WILDCARD_STAR ),
13681368 KeyPos = 2 , % % #internal_user.username
1369- register_simple_projection (Name , PathPattern , KeyPos ).
1369+ register_simple_projection (Name , PathPattern , KeyPos , false ).
13701370
13711371register_rabbit_global_runtime_parameters_projection () ->
13721372 Name = rabbit_khepri_global_rtparam ,
13731373 PathPattern = rabbit_db_rtparams :khepri_global_rp_path (
13741374 _Key = ? KHEPRI_WILDCARD_STAR_STAR ),
13751375 KeyPos = # runtime_parameters .key ,
1376- register_simple_projection (Name , PathPattern , KeyPos ).
1376+ register_simple_projection (Name , PathPattern , KeyPos , false ).
13771377
13781378register_rabbit_per_vhost_runtime_parameters_projection () ->
13791379 Name = rabbit_khepri_per_vhost_rtparam ,
@@ -1382,18 +1382,19 @@ register_rabbit_per_vhost_runtime_parameters_projection() ->
13821382 _Component = ? KHEPRI_WILDCARD_STAR_STAR ,
13831383 _Name = ? KHEPRI_WILDCARD_STAR_STAR ),
13841384 KeyPos = # runtime_parameters .key ,
1385- register_simple_projection (Name , PathPattern , KeyPos ).
1385+ register_simple_projection (Name , PathPattern , KeyPos , false ).
13861386
13871387register_rabbit_user_permissions_projection () ->
13881388 Name = rabbit_khepri_user_permission ,
13891389 PathPattern = rabbit_db_user :khepri_user_permission_path (
13901390 _UserName = ? KHEPRI_WILDCARD_STAR ,
13911391 _VHost = ? KHEPRI_WILDCARD_STAR ),
13921392 KeyPos = # user_permission .user_vhost ,
1393- register_simple_projection (Name , PathPattern , KeyPos ).
1393+ register_simple_projection (Name , PathPattern , KeyPos , false ).
13941394
1395- register_simple_projection (Name , PathPattern , KeyPos ) ->
1396- Options = #{keypos => KeyPos },
1395+ register_simple_projection (Name , PathPattern , KeyPos , ReadConcurrency ) ->
1396+ Options = #{keypos => KeyPos ,
1397+ read_concurrency => ReadConcurrency },
13971398 Projection = khepri_projection :new (Name , copy , Options ),
13981399 khepri :register_projection (? STORE_ID , PathPattern , Projection ).
13991400
@@ -1429,7 +1430,9 @@ register_rabbit_index_route_projection() ->
14291430 destination = Destination }
14301431 end ,
14311432 ProjectionFun = projection_fun_for_sets (MapFun ),
1432- Options = #{type => bag , keypos => # index_route .source_key },
1433+ Options = #{type => bag ,
1434+ keypos => # index_route .source_key ,
1435+ read_concurrency => true },
14331436 Projection = khepri_projection :new (
14341437 rabbit_khepri_index_route , ProjectionFun , Options ),
14351438 DirectOrFanout = # if_data_matches {
@@ -1497,7 +1500,8 @@ register_rabbit_topic_graph_projection() ->
14971500 end ,
14981501 Options = #{keypos => # topic_trie_edge .trie_edge ,
14991502 standalone_fun_options =>
1500- #{should_process_function => ShouldProcessFun }},
1503+ #{should_process_function => ShouldProcessFun },
1504+ read_concurrency => true },
15011505 ProjectionFun =
15021506 fun (Table , Path , OldProps , NewProps ) ->
15031507 {
0 commit comments