Skip to content

Commit 23c6730

Browse files
committed
Display AMQP filters in Management UI
## What? This commit displays effective filters of AMQP receivers in the Management UI. There is a new column `Filters` for outgoing links. Solves #13429 ## Why? This allows validating if the desired filters set by the receiver are actually in place by the server. In addition, it's convenient for a developer to check any filter values including SQL filter expressions. ## How? The session process stores the the formatted and effective filters in its state. The Management UI displays a box containing the filter name. This way the table for the outgoing links is kept concise. Hovering with the mouse over a box will show additionally the descriptor and the actual filter-value/definition.
1 parent 93db480 commit 23c6730

File tree

4 files changed

+171
-53
lines changed

4 files changed

+171
-53
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@
213213
dynamic :: boolean(),
214214
send_settled :: boolean(),
215215
max_message_size :: unlimited | pos_integer(),
216+
filter :: list(),
216217

217218
%% When feature flag rabbitmq_4.0.0 becomes required,
218219
%% the following 2 fields should be deleted.
@@ -1487,6 +1488,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
14871488
dynamic = default(Source#'v1_0.source'.dynamic, false),
14881489
send_settled = SndSettled,
14891490
max_message_size = MaxMessageSize,
1491+
filter = format_filter(EffectiveFilter),
14901492
credit_api_version = CreditApiVsn,
14911493
delivery_count = DeliveryCount,
14921494
client_flow_ctl = ClientFlowCtl,
@@ -3984,7 +3986,7 @@ info_incoming_link(Handle, LinkName, SndSettleMode, TargetAddress,
39843986

39853987
info_outgoing_management_links(Links) ->
39863988
[info_outgoing_link(Handle, Name, ?MANAGEMENT_NODE_ADDRESS, <<>>,
3987-
true, MaxMessageSize, DeliveryCount, Credit)
3989+
true, MaxMessageSize, [], DeliveryCount, Credit)
39883990
|| Handle := #management_link{
39893991
name = Name,
39903992
max_message_size = MaxMessageSize,
@@ -4001,28 +4003,49 @@ info_outgoing_links(Links) ->
40014003
{'', ''}
40024004
end,
40034005
info_outgoing_link(Handle, Name, SourceAddress, QueueName#resource.name,
4004-
SendSettled, MaxMessageSize, DeliveryCount, Credit)
4006+
SendSettled, MaxMessageSize, Filter, DeliveryCount, Credit)
40054007

40064008
end
40074009
|| Handle := #outgoing_link{
40084010
name = Name,
40094011
source_address = SourceAddress,
40104012
queue_name = QueueName,
4011-
max_message_size = MaxMessageSize,
40124013
send_settled = SendSettled,
4014+
max_message_size = MaxMessageSize,
4015+
filter = Filter,
40134016
client_flow_ctl = ClientFlowCtl} <- Links].
40144017

40154018
info_outgoing_link(Handle, LinkName, SourceAddress, QueueNameBin, SendSettled,
4016-
MaxMessageSize, DeliveryCount, Credit) ->
4019+
MaxMessageSize, Filter, DeliveryCount, Credit) ->
40174020
[{handle, Handle},
40184021
{link_name, LinkName},
40194022
{source_address, SourceAddress},
40204023
{queue_name, QueueNameBin},
40214024
{send_settled, SendSettled},
40224025
{max_message_size, MaxMessageSize},
4026+
{filter, Filter},
40234027
{delivery_count, DeliveryCount},
40244028
{credit, Credit}].
40254029

4030+
format_filter(undefined) ->
4031+
[];
4032+
format_filter({map, KVList}) ->
4033+
[[{name, Name},
4034+
{descriptor, Descriptor},
4035+
{value, format_filter_value(Value)}]
4036+
|| {{symbol, Name}, {described, {_Type, Descriptor}, Value}} <- KVList].
4037+
4038+
format_filter_value({list, List}) ->
4039+
lists:map(fun format_filter_value/1, List);
4040+
format_filter_value({map, KVList}) ->
4041+
[[{key, Key},
4042+
{value, format_filter_value(Val)}]
4043+
|| {{_T, Key}, Val} <- KVList, is_binary(Key)];
4044+
format_filter_value({_Type, Val}) ->
4045+
Val;
4046+
format_filter_value(Val) ->
4047+
Val.
4048+
40264049
unwrap_simple_type(V = {list, _}) ->
40274050
V;
40284051
unwrap_simple_type(V = {map, _}) ->

deps/rabbitmq_management/priv/www/js/global.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,10 @@ var HELP = {
617617
'"true" if the sender sends all deliveries settled to the receiver. "false" if the sender sends all deliveries initially unsettled to the receiver.',
618618

619619
'outgoing-unsettled-deliveries':
620-
'Number of messages that have been sent to consumers but have not yet been settled/acknowledged.'
620+
'Number of messages that have been sent to consumers but have not yet been settled/acknowledged.',
621+
622+
'amqp-filter':
623+
'Filters are predicates that define which messages RabbitMQ sends to the receiver. Each filter in the <a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-filter-set">Filter Set</a> has a name displayed in the boxes below. Hovering over a box will display the filter descriptor and the filter definition.'
621624
};
622625

623626
///////////////////////////////////////////////////////////////////////////

deps/rabbitmq_management/priv/www/js/tmpl/sessions-list.ejs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,46 @@ function getAddressClass(address) {
66
function getCreditClass(credit) {
77
return credit === 0 || credit === '0' ? 'yellow-background' : '';
88
}
9+
10+
function fmt_amqp_filter(filters) {
11+
if (!filters || filters.length === 0) {
12+
return '';
13+
}
14+
15+
var entries = [];
16+
for (var i = 0; i < filters.length; i++) {
17+
var filter = filters[i];
18+
var formatted_value = fmt_filter_value(filter.value);
19+
var entry = '<abbr title="(descriptor: ' + fmt_escape_html(filter.descriptor) + ') ' +
20+
fmt_escape_html(formatted_value) + '">' +
21+
fmt_escape_html(filter.name) + '</abbr>';
22+
entries.push(entry);
23+
}
24+
return entries.join(' ');
25+
}
26+
27+
function fmt_filter_value(value) {
28+
if (typeof value === 'string') {
29+
return value;
30+
} else if (Array.isArray(value)) {
31+
if (value.length === 0) return '[]';
32+
33+
if (value[0] && value[0].key !== undefined) {
34+
// array of key-value pairs
35+
var props = value.map(function(kv) {
36+
return kv.key + '=' + fmt_filter_value(kv.value);
37+
}).join(', ');
38+
return '{' + props + '}';
39+
} else {
40+
// regular array
41+
return '[' + value.map(fmt_filter_value).join(', ') + ']';
42+
}
43+
} else if (typeof value === 'object' && value !== null) {
44+
return JSON.stringify(value);
45+
} else {
46+
return String(value);
47+
}
48+
}
949
%>
1050

1151
<% if (sessions.length > 0) { %>
@@ -91,6 +131,7 @@ function getCreditClass(credit) {
91131
<th>max-message-size (bytes)</th>
92132
<th>delivery-count</th>
93133
<th>link-credit</th>
134+
<th>Filters<span class="help" id="amqp-filter"></span></th>
94135
</tr>
95136
</thead>
96137
<tbody>
@@ -107,6 +148,7 @@ function getCreditClass(credit) {
107148
<td class="c"><%= fmt_string(out_link.max_message_size) %></td>
108149
<td class="c"><%= fmt_string(out_link.delivery_count) %></td>
109150
<td class="c"><%= fmt_string(out_link.credit) %></td>
151+
<td class="c"><%= fmt_amqp_filter(out_link.filter) %></td>
110152
</tr>
111153
<% } %>
112154
</tbody>

deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl

Lines changed: 98 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
-module(rabbit_mgmt_http_SUITE).
99

1010
-include_lib("amqp_client/include/amqp_client.hrl").
11+
-include_lib("amqp10_common/include/amqp10_filter.hrl").
12+
-include_lib("amqp10_client/include/amqp10_client.hrl").
1113
-include_lib("common_test/include/ct.hrl").
1214
-include_lib("eunit/include/eunit.hrl").
1315
-include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl").
@@ -1127,16 +1129,34 @@ amqp_sessions(Config) ->
11271129
{ok, Session1} = amqp10_client:begin_session_sync(C),
11281130
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(
11291131
Session1, <<"my link pair">>),
1130-
QName = <<"my queue">>,
1131-
{ok, #{}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}),
1132+
QName = <<"my stream">>,
1133+
QProps = #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}},
1134+
{ok, #{}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps),
11321135
{ok, Sender} = amqp10_client:attach_sender_link_sync(
1133-
Session1,
1134-
<<"my sender">>,
1136+
Session1, <<"my sender">>,
11351137
rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"my key">>)),
1138+
1139+
Filter = #{<<"ts filter">> => #filter{descriptor = <<"rabbitmq:stream-offset-spec">>,
1140+
value = {timestamp, 1751023462000}},
1141+
<<"bloom filter">> => #filter{descriptor = <<"rabbitmq:stream-filter">>,
1142+
value = {list, [{utf8, <<"complaint">>},
1143+
{utf8, <<"user1">>}]}},
1144+
<<"match filter">> => #filter{descriptor = <<"rabbitmq:stream-match-unfiltered">>,
1145+
value = {boolean, true}},
1146+
<<"prop filter">> => #filter{descriptor = ?DESCRIPTOR_CODE_PROPERTIES_FILTER,
1147+
value = {map, [{{symbol, <<"subject">>},
1148+
{utf8, <<"complaint">>}},
1149+
{{symbol, <<"user-id">>},
1150+
{binary, <<"user1">>}}
1151+
]}},
1152+
<<"app prop filter">> => #filter{descriptor = ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER,
1153+
value = {map, [{{utf8, <<"k1">>}, {int, -4}},
1154+
{{utf8, <<"☀️"/utf8>>}, {utf8, <<"🙂"/utf8>>}}
1155+
]}}},
11361156
{ok, Receiver} = amqp10_client:attach_receiver_link(
1137-
Session1,
1138-
<<"my receiver">>,
1139-
rabbitmq_amqp_address:queue(QName)),
1157+
Session1, <<"my receiver">>,
1158+
rabbitmq_amqp_address:queue(QName),
1159+
settled, none, Filter),
11401160
receive {amqp10_event, {link, Receiver, attached}} -> ok
11411161
after 5000 -> ct:fail({missing_event, ?LINE})
11421162
end,
@@ -1155,54 +1175,84 @@ amqp_sessions(Config) ->
11551175
next_outgoing_id := NextOutgoingId,
11561176
remote_incoming_window := RemoteIncomingWindow,
11571177
remote_outgoing_window := RemoteOutgoingWindow,
1158-
outgoing_unsettled_deliveries := 0,
1159-
incoming_links := [#{handle := 0,
1160-
link_name := <<"my link pair">>,
1161-
target_address := <<"/management">>,
1162-
delivery_count := DeliveryCount1,
1163-
credit := Credit1,
1164-
snd_settle_mode := <<"settled">>,
1165-
max_message_size := IncomingMaxMsgSize,
1166-
unconfirmed_messages := 0},
1167-
#{handle := 2,
1168-
link_name := <<"my sender">>,
1169-
target_address := <<"/exchanges/amq.direct/my%20key">>,
1170-
delivery_count := DeliveryCount2,
1171-
credit := Credit2,
1172-
snd_settle_mode := <<"mixed">>,
1173-
max_message_size := IncomingMaxMsgSize,
1174-
unconfirmed_messages := 0}],
1175-
outgoing_links := [#{handle := 1,
1176-
link_name := <<"my link pair">>,
1177-
source_address := <<"/management">>,
1178-
queue_name := <<>>,
1179-
delivery_count := DeliveryCount3,
1180-
credit := 0,
1181-
max_message_size := <<"unlimited">>,
1182-
send_settled := true},
1183-
#{handle := 3,
1184-
link_name := <<"my receiver">>,
1185-
source_address := <<"/queues/my%20queue">>,
1186-
queue_name := <<"my queue">>,
1187-
delivery_count := DeliveryCount4,
1188-
credit := 5000,
1189-
max_message_size := <<"unlimited">>,
1190-
send_settled := true}]
1178+
outgoing_unsettled_deliveries := 0
11911179
} when is_integer(HandleMax) andalso
11921180
is_integer(NextIncomingId) andalso
11931181
is_integer(IncomingWindow) andalso
11941182
is_integer(NextOutgoingId) andalso
11951183
is_integer(RemoteIncomingWindow) andalso
1196-
is_integer(RemoteOutgoingWindow) andalso
1197-
is_integer(Credit1) andalso
1198-
is_integer(Credit2) andalso
1199-
is_integer(IncomingMaxMsgSize) andalso
1200-
is_integer(DeliveryCount1) andalso
1201-
is_integer(DeliveryCount2) andalso
1202-
is_integer(DeliveryCount3) andalso
1203-
is_integer(DeliveryCount4),
1184+
is_integer(RemoteOutgoingWindow),
12041185
Session),
12051186

1187+
{ok, IncomingLinks} = maps:find(incoming_links, Session),
1188+
{ok, OutgoingLinks} = maps:find(outgoing_links, Session),
1189+
?assertEqual(2, length(IncomingLinks)),
1190+
?assertEqual(2, length(OutgoingLinks)),
1191+
1192+
?assertMatch([#{handle := 0,
1193+
link_name := <<"my link pair">>,
1194+
target_address := <<"/management">>,
1195+
delivery_count := DeliveryCount1,
1196+
credit := Credit1,
1197+
snd_settle_mode := <<"settled">>,
1198+
max_message_size := IncomingMaxMsgSize,
1199+
unconfirmed_messages := 0},
1200+
#{handle := 2,
1201+
link_name := <<"my sender">>,
1202+
target_address := <<"/exchanges/amq.direct/my%20key">>,
1203+
delivery_count := DeliveryCount2,
1204+
credit := Credit2,
1205+
snd_settle_mode := <<"mixed">>,
1206+
max_message_size := IncomingMaxMsgSize,
1207+
unconfirmed_messages := 0}]
1208+
when is_integer(Credit1) andalso
1209+
is_integer(Credit2) andalso
1210+
is_integer(IncomingMaxMsgSize) andalso
1211+
is_integer(DeliveryCount1) andalso
1212+
is_integer(DeliveryCount2),
1213+
IncomingLinks),
1214+
1215+
[OutLink1, OutLink2] = OutgoingLinks,
1216+
?assertMatch(#{handle := 1,
1217+
link_name := <<"my link pair">>,
1218+
source_address := <<"/management">>,
1219+
queue_name := <<>>,
1220+
delivery_count := DeliveryCount3,
1221+
credit := 0,
1222+
max_message_size := <<"unlimited">>,
1223+
send_settled := true}
1224+
when is_integer(DeliveryCount3),
1225+
OutLink1),
1226+
#{handle := 3,
1227+
link_name := <<"my receiver">>,
1228+
source_address := <<"/queues/my%20stream">>,
1229+
queue_name := <<"my stream">>,
1230+
delivery_count := DeliveryCount4,
1231+
credit := 5000,
1232+
max_message_size := <<"unlimited">>,
1233+
send_settled := true,
1234+
filter := ActualFilter} = OutLink2,
1235+
?assert(is_integer(DeliveryCount4)),
1236+
ExpectedFilter = [#{name => <<"ts filter">>,
1237+
descriptor => <<"rabbitmq:stream-offset-spec">>,
1238+
value => 1751023462000},
1239+
#{name => <<"bloom filter">>,
1240+
descriptor => <<"rabbitmq:stream-filter">>,
1241+
value => [<<"complaint">>, <<"user1">>]},
1242+
#{name => <<"match filter">>,
1243+
descriptor => <<"rabbitmq:stream-match-unfiltered">>,
1244+
value => true},
1245+
#{name => <<"prop filter">>,
1246+
descriptor => ?DESCRIPTOR_CODE_PROPERTIES_FILTER,
1247+
value => [#{key => <<"subject">>, value => <<"complaint">>},
1248+
#{key => <<"user-id">>, value => <<"user1">>}]},
1249+
#{name => <<"app prop filter">>,
1250+
descriptor => ?DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER,
1251+
value => [#{key => <<"k1">>, value => -4},
1252+
#{key => <<"☀️"/utf8>>, value => <<"🙂"/utf8>>}]}],
1253+
?assertEqual(lists:sort(ExpectedFilter),
1254+
lists:sort(ActualFilter)),
1255+
12061256
{ok, _Session2} = amqp10_client:begin_session_sync(C),
12071257
Sessions = http_get(Config, Path),
12081258
?assertEqual(2, length(Sessions)),

0 commit comments

Comments
 (0)