|
60 | 60 | %% executed. If the migration runs concurrently, whether it started before or |
61 | 61 | %% during the execution of the Mnesia-specific anonymous function, {@link |
62 | 62 | %% handle_fallback/1} will watch for "no exists" table exceptions from Mnesia |
63 | | -%% and will retry the Mnesia functino or run the Khepri function accordingly. |
| 63 | +%% and will retry the Mnesia function or run the Khepri function accordingly. |
64 | 64 | %% The Mnesia function must be idempotent because it can be executed multiple |
65 | 65 | %% times. |
66 | 66 | %% |
@@ -1415,34 +1415,28 @@ register_rabbit_bindings_projection() -> |
1415 | 1415 | khepri:register_projection(?STORE_ID, PathPattern, Projection). |
1416 | 1416 |
|
1417 | 1417 | register_rabbit_index_route_projection() -> |
1418 | | - MapFun = fun(Path, _) -> |
1419 | | - { |
1420 | | - VHost, |
1421 | | - ExchangeName, |
1422 | | - Kind, |
1423 | | - DstName, |
1424 | | - RoutingKey |
1425 | | - } = rabbit_db_binding:khepri_route_path_to_args(Path), |
1426 | | - Exchange = rabbit_misc:r(VHost, exchange, ExchangeName), |
1427 | | - Destination = rabbit_misc:r(VHost, Kind, DstName), |
1428 | | - SourceKey = {Exchange, RoutingKey}, |
1429 | | - #index_route{source_key = SourceKey, |
1430 | | - destination = Destination} |
| 1418 | + MapFun = fun(_Path, #binding{source = Source, |
| 1419 | + key = Key, |
| 1420 | + destination = Destination, |
| 1421 | + args = Args}) -> |
| 1422 | + #index_route{source_key = {Source, Key}, |
| 1423 | + destination = Destination, |
| 1424 | + args = Args} |
1431 | 1425 | end, |
1432 | 1426 | ProjectionFun = projection_fun_for_sets(MapFun), |
1433 | 1427 | Options = #{type => bag, |
1434 | 1428 | keypos => #index_route.source_key, |
1435 | 1429 | read_concurrency => true}, |
1436 | 1430 | Projection = khepri_projection:new( |
1437 | 1431 | rabbit_khepri_index_route, ProjectionFun, Options), |
1438 | | - DirectOrFanout = #if_data_matches{ |
1439 | | - pattern = #exchange{type = '$1', _ = '_'}, |
1440 | | - conditions = [{'andalso', |
1441 | | - {'=/=', '$1', headers}, |
1442 | | - {'=/=', '$1', topic}}]}, |
| 1432 | + IgnoreHeadersAndTopic = #if_data_matches{ |
| 1433 | + pattern = #exchange{type = '$1', _ = '_'}, |
| 1434 | + conditions = [{'andalso', |
| 1435 | + {'=/=', '$1', headers}, |
| 1436 | + {'=/=', '$1', topic}}]}, |
1443 | 1437 | PathPattern = rabbit_db_binding:khepri_route_path( |
1444 | 1438 | _VHost = ?KHEPRI_WILDCARD_STAR, |
1445 | | - _Exchange = DirectOrFanout, |
| 1439 | + _Exchange = IgnoreHeadersAndTopic, |
1446 | 1440 | _Kind = ?KHEPRI_WILDCARD_STAR, |
1447 | 1441 | _DstName = ?KHEPRI_WILDCARD_STAR, |
1448 | 1442 | _RoutingKey = ?KHEPRI_WILDCARD_STAR), |
|
0 commit comments