Skip to content

Commit c85ba2a

Browse files
Merge pull request #14530 from rabbitmq/khepri-read-concurrency
Optimise Khepri for concurrent reads
2 parents 5b9f98a + d29cac3 commit c85ba2a

File tree

1 file changed

+15
-11
lines changed

1 file changed

+15
-11
lines changed

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1344,36 +1344,36 @@ register_rabbit_exchange_projection() ->
13441344
_VHost = ?KHEPRI_WILDCARD_STAR,
13451345
_Name = ?KHEPRI_WILDCARD_STAR),
13461346
KeyPos = #exchange.name,
1347-
register_simple_projection(Name, PathPattern, KeyPos).
1347+
register_simple_projection(Name, PathPattern, KeyPos, true).
13481348

13491349
register_rabbit_queue_projection() ->
13501350
Name = rabbit_khepri_queue,
13511351
PathPattern = rabbit_db_queue:khepri_queue_path(
13521352
_VHost = ?KHEPRI_WILDCARD_STAR,
13531353
_Name = ?KHEPRI_WILDCARD_STAR),
13541354
KeyPos = 2, %% #amqqueue.name
1355-
register_simple_projection(Name, PathPattern, KeyPos).
1355+
register_simple_projection(Name, PathPattern, KeyPos, true).
13561356

13571357
register_rabbit_vhost_projection() ->
13581358
Name = rabbit_khepri_vhost,
13591359
PathPattern = rabbit_db_vhost:khepri_vhost_path(
13601360
_VHost = ?KHEPRI_WILDCARD_STAR),
13611361
KeyPos = 2, %% #vhost.virtual_host
1362-
register_simple_projection(Name, PathPattern, KeyPos).
1362+
register_simple_projection(Name, PathPattern, KeyPos, false).
13631363

13641364
register_rabbit_users_projection() ->
13651365
Name = rabbit_khepri_user,
13661366
PathPattern = rabbit_db_user:khepri_user_path(
13671367
_UserName = ?KHEPRI_WILDCARD_STAR),
13681368
KeyPos = 2, %% #internal_user.username
1369-
register_simple_projection(Name, PathPattern, KeyPos).
1369+
register_simple_projection(Name, PathPattern, KeyPos, false).
13701370

13711371
register_rabbit_global_runtime_parameters_projection() ->
13721372
Name = rabbit_khepri_global_rtparam,
13731373
PathPattern = rabbit_db_rtparams:khepri_global_rp_path(
13741374
_Key = ?KHEPRI_WILDCARD_STAR_STAR),
13751375
KeyPos = #runtime_parameters.key,
1376-
register_simple_projection(Name, PathPattern, KeyPos).
1376+
register_simple_projection(Name, PathPattern, KeyPos, false).
13771377

13781378
register_rabbit_per_vhost_runtime_parameters_projection() ->
13791379
Name = rabbit_khepri_per_vhost_rtparam,
@@ -1382,18 +1382,19 @@ register_rabbit_per_vhost_runtime_parameters_projection() ->
13821382
_Component = ?KHEPRI_WILDCARD_STAR_STAR,
13831383
_Name = ?KHEPRI_WILDCARD_STAR_STAR),
13841384
KeyPos = #runtime_parameters.key,
1385-
register_simple_projection(Name, PathPattern, KeyPos).
1385+
register_simple_projection(Name, PathPattern, KeyPos, false).
13861386

13871387
register_rabbit_user_permissions_projection() ->
13881388
Name = rabbit_khepri_user_permission,
13891389
PathPattern = rabbit_db_user:khepri_user_permission_path(
13901390
_UserName = ?KHEPRI_WILDCARD_STAR,
13911391
_VHost = ?KHEPRI_WILDCARD_STAR),
13921392
KeyPos = #user_permission.user_vhost,
1393-
register_simple_projection(Name, PathPattern, KeyPos).
1393+
register_simple_projection(Name, PathPattern, KeyPos, false).
13941394

1395-
register_simple_projection(Name, PathPattern, KeyPos) ->
1396-
Options = #{keypos => KeyPos},
1395+
register_simple_projection(Name, PathPattern, KeyPos, ReadConcurrency) ->
1396+
Options = #{keypos => KeyPos,
1397+
read_concurrency => ReadConcurrency},
13971398
Projection = khepri_projection:new(Name, copy, Options),
13981399
khepri:register_projection(?STORE_ID, PathPattern, Projection).
13991400

@@ -1429,7 +1430,9 @@ register_rabbit_index_route_projection() ->
14291430
destination = Destination}
14301431
end,
14311432
ProjectionFun = projection_fun_for_sets(MapFun),
1432-
Options = #{type => bag, keypos => #index_route.source_key},
1433+
Options = #{type => bag,
1434+
keypos => #index_route.source_key,
1435+
read_concurrency => true},
14331436
Projection = khepri_projection:new(
14341437
rabbit_khepri_index_route, ProjectionFun, Options),
14351438
DirectOrFanout = #if_data_matches{
@@ -1497,7 +1500,8 @@ register_rabbit_topic_graph_projection() ->
14971500
end,
14981501
Options = #{keypos => #topic_trie_edge.trie_edge,
14991502
standalone_fun_options =>
1500-
#{should_process_function => ShouldProcessFun}},
1503+
#{should_process_function => ShouldProcessFun},
1504+
read_concurrency => true},
15011505
ProjectionFun =
15021506
fun(Table, Path, OldProps, NewProps) ->
15031507
{

0 commit comments

Comments
 (0)