@@ -778,6 +778,7 @@ declare_args() ->
778778 {<<" x-message-ttl" >>, fun check_message_ttl_arg /2 },
779779 {<<" x-dead-letter-exchange" >>, fun check_dlxname_arg /2 },
780780 {<<" x-dead-letter-routing-key" >>, fun check_dlxrk_arg /2 },
781+ {<<" x-dead-letter-strategy" >>, fun check_dlxstrategy_arg /2 },
781782 {<<" x-max-length" >>, fun check_non_neg_int_arg /2 },
782783 {<<" x-max-length-bytes" >>, fun check_non_neg_int_arg /2 },
783784 {<<" x-max-in-memory-length" >>, fun check_non_neg_int_arg /2 },
@@ -946,6 +947,22 @@ check_dlxrk_arg(Val, Args) when is_binary(Val) ->
946947check_dlxrk_arg (_Val , _Args ) ->
947948 {error , {unacceptable_type , " expected a string" }}.
948949
950+ - define (KNOWN_DLX_STRATEGIES , [<<" at-most-once" >>, <<" at-least-once" >>]).
951+ check_dlxstrategy_arg ({longstr , Val }, _Args ) ->
952+ case lists :member (Val , ? KNOWN_DLX_STRATEGIES ) of
953+ true -> ok ;
954+ false -> {error , invalid_dlx_strategy }
955+ end ;
956+ check_dlxstrategy_arg ({Type , _ }, _Args ) ->
957+ {error , {unacceptable_type , Type }};
958+ check_dlxstrategy_arg (Val , _Args ) when is_binary (Val ) ->
959+ case lists :member (Val , ? KNOWN_DLX_STRATEGIES ) of
960+ true -> ok ;
961+ false -> {error , invalid_dlx_strategy }
962+ end ;
963+ check_dlxstrategy_arg (_Val , _Args ) ->
964+ {error , invalid_dlx_strategy }.
965+
949966- define (KNOWN_OVERFLOW_MODES , [<<" drop-head" >>, <<" reject-publish" >>, <<" reject-publish-dlx" >>]).
950967check_overflow ({longstr , Val }, _Args ) ->
951968 case lists :member (Val , ? KNOWN_OVERFLOW_MODES ) of
@@ -1503,7 +1520,8 @@ get_queue_consumer_info(Q, ConsumerInfoKeys) ->
15031520 [lists :zip (ConsumerInfoKeys ,
15041521 [amqqueue :get_name (Q ), ChPid , CTag ,
15051522 AckRequired , Prefetch , Active , ActivityStatus , Args ]) ||
1506- {ChPid , CTag , AckRequired , Prefetch , Active , ActivityStatus , Args , _ } <- consumers (Q )].
1523+ {ChPid , CTag , AckRequired , Prefetch , Active , ActivityStatus , Args , _ }
1524+ <- consumers (Q )].
15071525
15081526- spec stat (amqqueue :amqqueue ()) ->
15091527 {'ok' , non_neg_integer (), non_neg_integer ()}.
@@ -1657,8 +1675,8 @@ credit(Q, CTag, Credit, Drain, QStates) ->
16571675 {'ok' , non_neg_integer (), qmsg (), rabbit_queue_type :state ()} |
16581676 {'empty' , rabbit_queue_type :state ()} |
16591677 {protocol_error , Type :: atom (), Reason :: string (), Args :: term ()}.
1660- basic_get (Q , NoAck , LimiterPid , CTag , QStates0 ) ->
1661- rabbit_queue_type :dequeue (Q , NoAck , LimiterPid , CTag , QStates0 ).
1678+ basic_get (Q , NoAck , LimiterPid , CTag , QStates ) ->
1679+ rabbit_queue_type :dequeue (Q , NoAck , LimiterPid , CTag , QStates ).
16621680
16631681
16641682- spec basic_consume (amqqueue :amqqueue (), boolean (), pid (), pid (), boolean (),
@@ -1670,7 +1688,7 @@ basic_get(Q, NoAck, LimiterPid, CTag, QStates0) ->
16701688 {protocol_error , Type :: atom (), Reason :: string (), Args :: term ()}.
16711689basic_consume (Q , NoAck , ChPid , LimiterPid ,
16721690 LimiterActive , ConsumerPrefetchCount , ConsumerTag ,
1673- ExclusiveConsume , Args , OkMsg , ActingUser , Contexts ) ->
1691+ ExclusiveConsume , Args , OkMsg , ActingUser , QStates ) ->
16741692
16751693 QName = amqqueue :get_name (Q ),
16761694 % % first phase argument validation
@@ -1686,7 +1704,7 @@ basic_consume(Q, NoAck, ChPid, LimiterPid,
16861704 args => Args ,
16871705 ok_msg => OkMsg ,
16881706 acting_user => ActingUser },
1689- rabbit_queue_type :consume (Q , Spec , Contexts ).
1707+ rabbit_queue_type :consume (Q , Spec , QStates ).
16901708
16911709- spec basic_cancel (amqqueue :amqqueue (), rabbit_types :ctag (), any (),
16921710 rabbit_types :username (),
0 commit comments