Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -737,15 +737,13 @@ translate_terminus_durability(configuration) -> 1;
translate_terminus_durability(unsettled_state) -> 2.

translate_filters(Filters)
when is_map(Filters) andalso
map_size(Filters) == 0 ->
when map_size(Filters) =:= 0 ->
undefined;
translate_filters(Filters)
when is_map(Filters) ->
translate_filters(Filters) ->
{map,
maps:fold(
fun
(<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
(<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
%% special case conversion
Key = sym(K),
[{Key, {described, Key, translate_legacy_amqp_headers_binding(V)}} | Acc];
Expand Down
5 changes: 4 additions & 1 deletion deps/amqp10_client/src/amqp10_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,10 @@ wrap_ap_value(V) when is_integer(V) ->
case V < 0 of
true -> {int, V};
false -> {uint, V}
end.
end;
wrap_ap_value(V) when is_number(V) ->
%% AMQP double and Erlang float are both 64-bit.
{double, V}.

%% LOCAL
header_value(durable, undefined) -> false;
Expand Down
2 changes: 1 addition & 1 deletion deps/amqp10_common/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def all_srcs(name = "all_srcs"):
)
filegroup(
name = "public_hdrs",
srcs = ["include/amqp10_framing.hrl", "include/amqp10_types.hrl"],
srcs = ["include/amqp10_filtex.hrl", "include/amqp10_framing.hrl", "include/amqp10_types.hrl"],
)
filegroup(
name = "private_hdrs",
Expand Down
15 changes: 15 additions & 0 deletions deps/amqp10_common/include/amqp10_filtex.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.


%% AMQP Filter Expressions Version 1.0 Working Draft 09
%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227

-define(DESCRIPTOR_NAME_PROPERTIES_FILTER, <<"amqp:properties-filter">>).
-define(DESCRIPTOR_CODE_PROPERTIES_FILTER, 16#173).

-define(DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER, <<"amqp:application-properties-filter">>).
-define(DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER, 16#174).
16 changes: 16 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,7 @@ rabbitmq_integration_suite(
name = "amqp_client_SUITE",
size = "large",
additional_beam = [
":test_amqp_utils_beam",
":test_event_recorder_beam",
],
shard_count = 3,
Expand All @@ -1215,6 +1216,16 @@ rabbitmq_integration_suite(
],
)

rabbitmq_integration_suite(
name = "amqp_filtex_SUITE",
additional_beam = [
":test_amqp_utils_beam",
],
runtime_deps = [
"//deps/rabbitmq_amqp_client:erlang_app",
],
)

rabbitmq_integration_suite(
name = "amqp_proxy_protocol_SUITE",
size = "medium",
Expand All @@ -1235,6 +1246,7 @@ rabbitmq_integration_suite(
rabbitmq_integration_suite(
name = "amqp_auth_SUITE",
additional_beam = [
":test_amqp_utils_beam",
":test_event_recorder_beam",
],
shard_count = 2,
Expand All @@ -1246,6 +1258,9 @@ rabbitmq_integration_suite(
rabbitmq_integration_suite(
name = "amqp_address_SUITE",
shard_count = 2,
additional_beam = [
":test_amqp_utils_beam",
],
runtime_deps = [
"//deps/rabbitmq_amqp_client:erlang_app",
],
Expand Down Expand Up @@ -1358,6 +1373,7 @@ eunit(
":test_clustering_utils_beam",
":test_event_recorder_beam",
":test_rabbit_ct_hook_beam",
":test_amqp_utils_beam",
],
target = ":test_erlang_app",
test_env = {
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ define ct_master.erl
endef

PARALLEL_CT_SET_1_A = amqp_client unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_system signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filtex amqp_system signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit

Expand Down
20 changes: 20 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_access_control.erl",
"src/rabbit_alarm.erl",
"src/rabbit_amqp1_0.erl",
"src/rabbit_amqp_filtex.erl",
"src/rabbit_amqp_management.erl",
"src/rabbit_amqp_reader.erl",
"src/rabbit_amqp_session.erl",
Expand Down Expand Up @@ -302,6 +303,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_access_control.erl",
"src/rabbit_alarm.erl",
"src/rabbit_amqp1_0.erl",
"src/rabbit_amqp_filtex.erl",
"src/rabbit_amqp_management.erl",
"src/rabbit_amqp_reader.erl",
"src/rabbit_amqp_session.erl",
Expand Down Expand Up @@ -578,6 +580,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_access_control.erl",
"src/rabbit_alarm.erl",
"src/rabbit_amqp1_0.erl",
"src/rabbit_amqp_filtex.erl",
"src/rabbit_amqp_management.erl",
"src/rabbit_amqp_reader.erl",
"src/rabbit_amqp_session.erl",
Expand Down Expand Up @@ -2195,3 +2198,20 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "amqp_filtex_SUITE_beam_files",
testonly = True,
srcs = ["test/amqp_filtex_SUITE.erl"],
outs = ["test/amqp_filtex_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp10_common:erlang_app"],
)
erlang_bytecode(
name = "test_amqp_utils_beam",
testonly = True,
srcs = ["test/amqp_utils.erl"],
outs = ["test/amqp_utils.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
)
1 change: 1 addition & 0 deletions deps/rabbit/ct.test.spec
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
, amqp_auth_SUITE
, amqp_client_SUITE
, amqp_credit_api_v2_SUITE
, amqp_filtex_SUITE
, amqp_proxy_protocol_SUITE
, amqp_system_SUITE
, amqpl_consumer_ack_SUITE
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ message_id(BasicMsg) ->
mc_compat:message_id(BasicMsg).

-spec property(atom(), state()) ->
{utf8, binary()} | undefined.
tagged_value().
property(Property, #?MODULE{protocol = Proto,
data = Data}) ->
Proto:property(Property, Data);
Expand Down
42 changes: 29 additions & 13 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

-define(MESSAGE_ANNOTATIONS_GUESS_SIZE, 100).

-define(SIMPLE_VALUE(V),
-define(IS_SIMPLE_VALUE(V),
is_binary(V) orelse
is_number(V) orelse
is_boolean(V)).
Expand Down Expand Up @@ -145,16 +145,32 @@ property(Prop, #v1{bare_and_footer = Bin,
Props = amqp10_framing:decode(PropsDescribed),
property0(Prop, Props).

property0(correlation_id, #'v1_0.properties'{correlation_id = Corr}) ->
Corr;
property0(message_id, #'v1_0.properties'{message_id = MsgId}) ->
MsgId;
property0(user_id, #'v1_0.properties'{user_id = UserId}) ->
UserId;
property0(subject, #'v1_0.properties'{subject = Subject}) ->
Subject;
property0(to, #'v1_0.properties'{to = To}) ->
To;
property0(message_id, #'v1_0.properties'{message_id = Val}) ->
Val;
property0(user_id, #'v1_0.properties'{user_id = Val}) ->
Val;
property0(to, #'v1_0.properties'{to = Val}) ->
Val;
property0(subject, #'v1_0.properties'{subject = Val}) ->
Val;
property0(reply_to, #'v1_0.properties'{reply_to = Val}) ->
Val;
property0(correlation_id, #'v1_0.properties'{correlation_id = Val}) ->
Val;
property0(content_type, #'v1_0.properties'{content_type = Val}) ->
Val;
property0(content_encoding, #'v1_0.properties'{content_encoding = Val}) ->
Val;
property0(absolute_expiry_time, #'v1_0.properties'{absolute_expiry_time = Val}) ->
Val;
property0(creation_time, #'v1_0.properties'{creation_time = Val}) ->
Val;
property0(group_id, #'v1_0.properties'{group_id = Val}) ->
Val;
property0(group_sequence, #'v1_0.properties'{group_sequence = Val}) ->
Val;
property0(reply_to_group_id, #'v1_0.properties'{reply_to_group_id = Val}) ->
Val;
property0(_Prop, #'v1_0.properties'{}) ->
undefined.

Expand Down Expand Up @@ -454,7 +470,7 @@ message_annotations_as_simple_map(#v1{message_annotations = Content}) ->
message_annotations_as_simple_map0(Content) ->
%% the section record format really is terrible
lists:filtermap(fun({{symbol, K}, {_T, V}})
when ?SIMPLE_VALUE(V) ->
when ?IS_SIMPLE_VALUE(V) ->
{true, {K, V}};
(_) ->
false
Expand All @@ -480,7 +496,7 @@ application_properties_as_simple_map(
application_properties_as_simple_map0(Content, L) ->
%% the section record format really is terrible
lists:foldl(fun({{utf8, K}, {_T, V}}, Acc)
when ?SIMPLE_VALUE(V) ->
when ?IS_SIMPLE_VALUE(V) ->
[{K, V} | Acc];
({{utf8, K}, V}, Acc)
when V =:= undefined orelse is_boolean(V) ->
Expand Down
Loading