From 98b9af078dbd90b41d6f55b51b52afb999612031 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Mon, 16 Dec 2024 09:55:09 +0100 Subject: [PATCH] Disallow transient entities in RabbitMQ AMQP 1.0 Erlang client Transient (i.e. `durable=false`) exchanges and queues are deprecated. Khepri will store all entities durably. (Even exclusive queues will be stored durably. Exclusive queues are still deleted when the declaring connection is closed.) Similar to how the RabbitMQ AMQP 1.0 Java client already disallows the creation of transient exchanges and queues, this commit will prohibit the declaration of transient exchanges and queues in the RabbitMQ AMQP 1.0 Erlang client starting with RabbitMQ 4.1. --- .../src/rabbitmq_amqp_client.erl | 10 ++-------- .../test/management_SUITE.erl | 17 ++++++----------- ...bbit_exchange_type_consistent_hash_SUITE.erl | 1 - 3 files changed, 8 insertions(+), 20 deletions(-) diff --git a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl index ef385b6162e3..27c3159b22af 100644 --- a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl +++ b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl @@ -50,13 +50,11 @@ replicas => [binary()], leader => binary()}. --type queue_properties() :: #{durable => boolean(), - exclusive => boolean(), +-type queue_properties() :: #{exclusive => boolean(), auto_delete => boolean(), arguments => arguments()}. -type exchange_properties() :: #{type => binary(), - durable => boolean(), auto_delete => boolean(), internal => boolean(), arguments => arguments()}. @@ -161,9 +159,7 @@ get_queue(LinkPair, QueueName) -> {ok, queue_info()} | {error, term()}. declare_queue(LinkPair, QueueName, QueueProperties) -> Body0 = maps:fold( - fun(durable, V, L) when is_boolean(V) -> - [{{utf8, <<"durable">>}, {boolean, V}} | L]; - (exclusive, V, L) when is_boolean(V) -> + fun(exclusive, V, L) when is_boolean(V) -> [{{utf8, <<"exclusive">>}, {boolean, V}} | L]; (auto_delete, V, L) when is_boolean(V) -> [{{utf8, <<"auto_delete">>}, {boolean, V}} | L]; @@ -341,8 +337,6 @@ declare_exchange(LinkPair, ExchangeName, ExchangeProperties) -> Body0 = maps:fold( fun(type, V, L) when is_binary(V) -> [{{utf8, <<"type">>}, {utf8, V}} | L]; - (durable, V, L) when is_boolean(V) -> - [{{utf8, <<"durable">>}, {boolean, V}} | L]; (auto_delete, V, L) when is_boolean(V) -> [{{utf8, <<"auto_delete">>}, {boolean, V}} | L]; (internal, V, L) when is_boolean(V) -> diff --git a/deps/rabbitmq_amqp_client/test/management_SUITE.erl b/deps/rabbitmq_amqp_client/test/management_SUITE.erl index 3431ddecd8aa..55fb0dac0573 100644 --- a/deps/rabbitmq_amqp_client/test/management_SUITE.erl +++ b/deps/rabbitmq_amqp_client/test/management_SUITE.erl @@ -127,8 +127,7 @@ all_management_operations(Config) -> Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), QName = <<"my 🐇"/utf8>>, - QProps = #{durable => true, - exclusive => false, + QProps = #{exclusive => false, auto_delete => false, arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}, {ok, QInfo} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), @@ -193,7 +192,6 @@ all_management_operations(Config) -> XName = <<"my fanout exchange 🥳"/utf8>>, XProps = #{type => <<"fanout">>, - durable => false, auto_delete => true, internal => false, arguments => #{<<"x-📥"/utf8>> => {utf8, <<"📮"/utf8>>}}}, @@ -202,7 +200,7 @@ all_management_operations(Config) -> {ok, Exchange} = rpc(Config, rabbit_exchange, lookup, [rabbit_misc:r(<<"/">>, exchange, XName)]), ?assertMatch(#exchange{type = fanout, - durable = false, + durable = true, auto_delete = true, internal = false, arguments = [{<<"x-📥"/utf8>>, longstr, <<"📮"/utf8>>}]}, @@ -281,13 +279,12 @@ queue_defaults(Config) -> queue_properties(Config) -> Init = {_, LinkPair} = init(Config), QName = atom_to_binary(?FUNCTION_NAME), - {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{durable => false, - exclusive => true, + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{exclusive => true, auto_delete => true}), [Q] = rpc(Config, rabbit_amqqueue, list, []), - ?assertNot(rpc(Config, amqqueue, is_durable, [Q])), ?assert(rpc(Config, amqqueue, is_exclusive, [Q])), ?assert(rpc(Config, amqqueue, is_auto_delete, [Q])), + ?assert(rpc(Config, amqqueue, is_durable, [Q])), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = cleanup(Init). @@ -310,8 +307,7 @@ exchange_defaults(Config) -> queue_binding_args(Config) -> Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), QName = <<"my queue ~!@#$%^&*()_+🙈`-=[]\;',./"/utf8>>, - Q = #{durable => false, - exclusive => true, + Q = #{exclusive => true, auto_delete => false, arguments => #{<<"x-queue-type">> => {utf8, <<"classic">>}}}, {ok, #{}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, Q), @@ -589,8 +585,7 @@ declare_exchange_inequivalent_fields(Config) -> classic_queue_stopped(Config) -> Init2 = {_, LinkPair2} = init(Config, 2), QName = <<"👌"/utf8>>, - {ok, #{durable := true, - type := <<"classic">>}} = rabbitmq_amqp_client:declare_queue(LinkPair2, QName, #{}), + {ok, #{type := <<"classic">>}} = rabbitmq_amqp_client:declare_queue(LinkPair2, QName, #{}), ok = cleanup(Init2), ok = rabbit_ct_broker_helpers:stop_node(Config, 2), %% Classic queue is now stopped. diff --git a/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl b/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl index 85a76358df5e..15ee469cca67 100644 --- a/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl +++ b/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl @@ -204,7 +204,6 @@ amqp_dead_letter(Config) -> ok = rabbitmq_amqp_client:declare_exchange( LinkPair, XName, #{type => <<"x-consistent-hash">>, - durable => true, auto_delete => true, arguments => #{<<"hash-property">> => {utf8, <<"correlation_id">>}}}), {ok, #{type := <<"quorum">>}} = rabbitmq_amqp_client:declare_queue(