Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test-make-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:
- parallel-ct-set-2
- parallel-ct-set-3
- parallel-ct-set-4
- parallel-ct-set-5
- ct-amqp_client
- ct-clustering_management
- eunit ct-dead_lettering
Expand Down
16 changes: 11 additions & 5 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@
-type max_message_size() :: undefined | non_neg_integer().
-type footer_opt() :: crc32 | adler32.

-type attach_args() :: #{name => binary(),
role => attach_role(),
-type attach_args() :: #{name := binary(),
role := attach_role(),
snd_settle_mode => snd_settle_mode(),
rcv_settle_mode => rcv_settle_mode(),
filter => filter(),
Expand Down Expand Up @@ -739,13 +739,19 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) ->

make_source(#{role := {sender, _}}) ->
#'v1_0.source'{};
make_source(#{role := {receiver, Source, _Pid},
filter := Filter}) ->
make_source(#{role := {receiver, Source, _Pid}} = AttachArgs) ->
Durable = translate_terminus_durability(maps:get(durable, Source, none)),
ExpiryPolicy = case Source of
#{expiry_policy := Policy} when is_binary(Policy) ->
{symbol, Policy};
_ ->
undefined
end,
Dynamic = maps:get(dynamic, Source, false),
TranslatedFilter = translate_filters(Filter),
TranslatedFilter = translate_filters(maps:get(filter, AttachArgs, #{})),
#'v1_0.source'{address = make_address(Source),
durable = {uint, Durable},
expiry_policy = ExpiryPolicy,
dynamic = Dynamic,
filter = TranslatedFilter,
capabilities = make_capabilities(Source)}.
Expand Down
16 changes: 13 additions & 3 deletions deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,19 @@ define ct_master.erl
{ok, Pid2, _} = peer:start(StartOpts#{name => "rabbit_shard2"}),
{ok, Pid3, _} = peer:start(StartOpts#{name => "rabbit_shard3"}),
{ok, Pid4, _} = peer:start(StartOpts#{name => "rabbit_shard4"}),
{ok, Pid5, _} = peer:start(StartOpts#{name => "rabbit_shard5"}),
peer:call(Pid1, net_kernel, set_net_ticktime, [5]),
peer:call(Pid2, net_kernel, set_net_ticktime, [5]),
peer:call(Pid3, net_kernel, set_net_ticktime, [5]),
peer:call(Pid4, net_kernel, set_net_ticktime, [5]),
peer:call(Pid5, net_kernel, set_net_ticktime, [5]),
peer:call(Pid1, persistent_term, put, [rabbit_ct_tcp_port_base, 16000]),
peer:call(Pid2, persistent_term, put, [rabbit_ct_tcp_port_base, 20000]),
peer:call(Pid3, persistent_term, put, [rabbit_ct_tcp_port_base, 24000]),
peer:call(Pid4, persistent_term, put, [rabbit_ct_tcp_port_base, 28000]),
peer:call(Pid5, persistent_term, put, [rabbit_ct_tcp_port_base, 32000]),
[{[_], {ok, Results}}] = ct_master_fork:run("$1"),
peer:stop(Pid5),
peer:stop(Pid4),
peer:stop(Pid3),
peer:stop(Pid2),
Expand All @@ -258,7 +262,7 @@ endef

PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit

PARALLEL_CT_SET_2_A = cluster confirms_rejects consumer_timeout rabbit_access_control rabbit_confirms rabbit_core_metrics_gc rabbit_cuttlefish rabbit_db_binding rabbit_db_exchange
Expand All @@ -276,13 +280,16 @@ PARALLEL_CT_SET_4_B = per_user_connection_tracking per_vhost_connection_limit ra
PARALLEL_CT_SET_4_C = msg_size_metrics unit_msg_size_metrics per_vhost_msg_store per_vhost_queue_limit priority_queue upgrade_preparation vhost
PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info publisher_confirms_parallel queue_type rabbitmq_queues_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing rabbit_amqqueue

PARALLEL_CT_SET_5_A = rabbit_direct_reply_to_prop direct_reply_to_amqpl direct_reply_to_amqp

PARALLEL_CT_SET_1 = $(sort $(PARALLEL_CT_SET_1_A) $(PARALLEL_CT_SET_1_B) $(PARALLEL_CT_SET_1_C) $(PARALLEL_CT_SET_1_D))
PARALLEL_CT_SET_2 = $(sort $(PARALLEL_CT_SET_2_A) $(PARALLEL_CT_SET_2_B) $(PARALLEL_CT_SET_2_C) $(PARALLEL_CT_SET_2_D))
PARALLEL_CT_SET_3 = $(sort $(PARALLEL_CT_SET_3_A) $(PARALLEL_CT_SET_3_B) $(PARALLEL_CT_SET_3_C) $(PARALLEL_CT_SET_3_D))
PARALLEL_CT_SET_4 = $(sort $(PARALLEL_CT_SET_4_A) $(PARALLEL_CT_SET_4_B) $(PARALLEL_CT_SET_4_C) $(PARALLEL_CT_SET_4_D))
PARALLEL_CT_SET_5 = $(PARALLEL_CT_SET_5_A)

SEQUENTIAL_CT_SUITES = amqp_client clustering_management dead_lettering feature_flags metadata_store_clustering quorum_queue rabbit_stream_queue rabbit_fifo_prop
PARALLEL_CT_SUITES = $(PARALLEL_CT_SET_1) $(PARALLEL_CT_SET_2) $(PARALLEL_CT_SET_3) $(PARALLEL_CT_SET_4)
PARALLEL_CT_SUITES = $(PARALLEL_CT_SET_1) $(PARALLEL_CT_SET_2) $(PARALLEL_CT_SET_3) $(PARALLEL_CT_SET_4) $(PARALLEL_CT_SET_5)

ifeq ($(filter-out $(SEQUENTIAL_CT_SUITES) $(PARALLEL_CT_SUITES),$(CT_SUITES)),)
parallel-ct-sanity-check:
Expand All @@ -308,16 +315,19 @@ define tpl_parallel_ct_test_spec
{node, shard2, 'rabbit_shard2@localhost'}.
{node, shard3, 'rabbit_shard3@localhost'}.
{node, shard4, 'rabbit_shard4@localhost'}.
{node, shard5, 'rabbit_shard5@localhost'}.

{define, 'Set1', [$(call comma_list,$(addsuffix _SUITE,$1))]}.
{define, 'Set2', [$(call comma_list,$(addsuffix _SUITE,$2))]}.
{define, 'Set3', [$(call comma_list,$(addsuffix _SUITE,$3))]}.
{define, 'Set4', [$(call comma_list,$(addsuffix _SUITE,$4))]}.
{define, 'Set5', [$(call comma_list,$(addsuffix _SUITE,$5))]}.

{suites, shard1, "test/", 'Set1'}.
{suites, shard2, "test/", 'Set2'}.
{suites, shard3, "test/", 'Set3'}.
{suites, shard4, "test/", 'Set4'}.
{suites, shard5, "test/", 'Set5'}.
endef

define parallel_ct_set_target
Expand All @@ -330,7 +340,7 @@ parallel-ct-set-$(1): test-build
$$(call erlang,$$(call ct_master.erl,ct.set-$(1).spec),-sname parallel_ct_$(PROJECT)@localhost -hidden -kernel net_ticktime 5)
endef

$(foreach set,1 2 3 4,$(eval $(call parallel_ct_set_target,$(set))))
$(foreach set,1 2 3 4 5,$(eval $(call parallel_ct_set_target,$(set))))

# --------------------------------------------------------------------
# Compilation.
Expand Down
32 changes: 23 additions & 9 deletions deps/rabbit/src/mc_amqpl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,14 @@ convert_from(mc_amqp, Sections, Env) ->
%% drop it, what else can we do?
undefined
end,

ReplyTo = case unwrap_shortstr(ReplyTo0) of
<<"/queues/", Queue/binary>> ->
try cow_uri:urldecode(Queue)
catch error:_ -> undefined
end;
Other ->
Other
end,
BP = #'P_basic'{message_id = MsgId091,
delivery_mode = DelMode,
expiration = Expiration,
Expand All @@ -237,7 +244,7 @@ convert_from(mc_amqp, Sections, Env) ->
[] -> undefined;
AllHeaders -> AllHeaders
end,
reply_to = unwrap_shortstr(ReplyTo0),
reply_to = ReplyTo,
type = Type,
app_id = unwrap_shortstr(GroupId),
priority = Priority,
Expand Down Expand Up @@ -349,7 +356,7 @@ convert_to(mc_amqp, #content{payload_fragments_rev = PFR} = Content, Env) ->
delivery_mode = DelMode,
headers = Headers0,
user_id = UserId,
reply_to = ReplyTo,
reply_to = ReplyTo0,
type = Type,
priority = Priority,
app_id = AppId,
Expand Down Expand Up @@ -382,25 +389,32 @@ convert_to(mc_amqp, #content{payload_fragments_rev = PFR} = Content, Env) ->
ttl = wrap(uint, Ttl),
%% TODO: check Priority is a ubyte?
priority = wrap(ubyte, Priority)},
ReplyTo = case ReplyTo0 of
undefined ->
undefined;
_ ->
Queue = uri_string:quote(ReplyTo0),
{utf8, <<"/queues/", Queue/binary>>}
end,
CorrId = case mc_util:urn_string_to_uuid(CorrId0) of
{ok, CorrUUID} ->
{uuid, CorrUUID};
_ ->
wrap(utf8, CorrId0)
end,
MsgId = case mc_util:urn_string_to_uuid(MsgId0) of
{ok, MsgUUID} ->
{uuid, MsgUUID};
_ ->
wrap(utf8, MsgId0)
end,
{ok, MsgUUID} ->
{uuid, MsgUUID};
_ ->
wrap(utf8, MsgId0)
end,
P = case amqp10_section_header(?AMQP10_PROPERTIES_HEADER, Headers) of
undefined ->
#'v1_0.properties'{message_id = MsgId,
user_id = wrap(binary, UserId),
to = undefined,
% subject = wrap(utf8, RKey),
reply_to = wrap(utf8, ReplyTo),
reply_to = ReplyTo,
correlation_id = CorrId,
content_type = wrap(symbol, ContentType),
content_encoding = wrap(symbol, ContentEncoding),
Expand Down
60 changes: 0 additions & 60 deletions deps/rabbit/src/pid_recomposition.erl

This file was deleted.

1 change: 0 additions & 1 deletion deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1157,7 +1157,6 @@ pg_local_amqp_connection() ->
pg_local_scope(Prefix) ->
list_to_atom(io_lib:format("~s_~s", [Prefix, node()])).


-spec update_cluster_tags() -> 'ok'.

update_cluster_tags() ->
Expand Down
10 changes: 8 additions & 2 deletions deps/rabbit/src/rabbit_amqp_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,14 @@ handle_http_req(<<"GET">>,
QName,
fun(Q) ->
{ok, NumMsgs, NumConsumers} = rabbit_amqqueue:stat(Q),
RespPayload = encode_queue(Q, NumMsgs, NumConsumers),
{ok, {<<"200">>, RespPayload, PermCaches}}
case rabbit_volatile_queue:is(QNameBin) andalso
not rabbit_volatile_queue:exists(QName) of
true ->
{error, not_found};
false ->
RespPayload = encode_queue(Q, NumMsgs, NumConsumers),
{ok, {<<"200">>, RespPayload, PermCaches}}
end
end) of
{ok, Result} ->
Result;
Expand Down
Loading
Loading