Skip to content

Commit dca0723

Browse files
committed
khepri_tx:api_version()
1 parent 1253b9c commit dca0723

File tree

3 files changed

+39
-16
lines changed

3 files changed

+39
-16
lines changed

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -411,8 +411,13 @@ delete_in_khepri(QueueName, OnlyDurable) ->
411411
rabbit_khepri:transaction(
412412
fun () ->
413413
Path = khepri_queue_path(QueueName),
414+
TxApiVersion = rabbit_khepri:tx_api_version(),
414415
case khepri_tx_adv:delete(Path) of
415-
{ok, #{Path := #{data := _}}} ->
416+
{ok, #{Path := #{data := _}}} when TxApiVersion >= 1 ->
417+
%% we want to execute some things, as decided by rabbit_exchange,
418+
%% after the transaction.
419+
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
420+
{ok, #{data := _}} when TxApiVersion =:= 0 ->
416421
%% we want to execute some things, as decided by rabbit_exchange,
417422
%% after the transaction.
418423
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
@@ -1139,20 +1144,26 @@ do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
11391144
do_delete_transient_queues_in_khepri_tx([], Acc) ->
11401145
{ok, Acc};
11411146
do_delete_transient_queues_in_khepri_tx([{Path, Vsn, QName} | Rest], Acc) ->
1142-
%% Also see `delete_in_khepri/2'.
1143-
VersionedPath = khepri_path:combine_with_conditions(
1144-
Path, [#if_payload_version{version = Vsn}]),
1145-
case khepri_tx_adv:delete(VersionedPath) of
1146-
{ok, #{Path := #{data := _}}} ->
1147-
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1148-
QName, false),
1149-
Acc1 = [{QName, Deletions} | Acc],
1150-
do_delete_transient_queues_in_khepri_tx(Rest, Acc1);
1151-
{ok, _} ->
1152-
do_delete_transient_queues_in_khepri_tx(Rest, Acc);
1153-
{error, _} = Error ->
1154-
Error
1155-
end.
1147+
%% Also see `delete_in_khepri/2'.
1148+
VersionedPath = khepri_path:combine_with_conditions(
1149+
Path, [#if_payload_version{version = Vsn}]),
1150+
TxApiVersion = rabbit_khepri:tx_api_version(),
1151+
case khepri_tx_adv:delete(VersionedPath) of
1152+
{ok, #{Path := #{data := _}}} when TxApiVersion >= 1 ->
1153+
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1154+
QName, false),
1155+
Acc1 = [{QName, Deletions} | Acc],
1156+
do_delete_transient_queues_in_khepri_tx(Rest, Acc1);
1157+
{ok, #{data := _}} when TxApiVersion =:= 0 ->
1158+
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1159+
QName, false),
1160+
Acc1 = [{QName, Deletions} | Acc],
1161+
do_delete_transient_queues_in_khepri_tx(Rest, Acc1);
1162+
{ok, _} ->
1163+
do_delete_transient_queues_in_khepri_tx(Rest, Acc);
1164+
{error, _} = Error ->
1165+
Error
1166+
end.
11561167

11571168
%% -------------------------------------------------------------------
11581169
%% foreach_transient().

deps/rabbit/src/rabbit_db_rtparams.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,11 @@ set_in_khepri_tx(Key, Term) ->
113113
Path = khepri_rp_path(Key),
114114
Record = #runtime_parameters{key = Key,
115115
value = Term},
116+
TxApiVersion = rabbit_khepri:tx_api_version(),
116117
case khepri_tx_adv:put(Path, Record) of
117-
{ok, #{Path := #{data := Params}}} ->
118+
{ok, #{Path := #{data := Params}}} when TxApiVersion >= 1 ->
119+
{old, Params#runtime_parameters.value};
120+
{ok, #{data := Params}} when TxApiVersion =:= 0 ->
118121
{old, Params#runtime_parameters.value};
119122
{ok, _} ->
120123
new

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@
145145
transaction/1,
146146
transaction/2,
147147
transaction/3,
148+
tx_api_version/0,
148149

149150
clear_store/0,
150151

@@ -1057,6 +1058,14 @@ transaction(Fun, ReadWrite, Options) ->
10571058
{error, Reason} -> throw({error, Reason})
10581059
end.
10591060

1061+
tx_api_version() ->
1062+
try
1063+
khepri_tx:api_version()
1064+
catch
1065+
error:undef ->
1066+
0
1067+
end.
1068+
10601069
clear_store() ->
10611070
khepri:delete_many(?STORE_ID, "*", ?DEFAULT_COMMAND_OPTIONS).
10621071

0 commit comments

Comments
 (0)