Skip to content

Commit de17a77

Browse files
committed
Queues with plugins - Enable adding queues with plugins, Management UI
1 parent 5fd3bdd commit de17a77

22 files changed

+662
-419
lines changed

deps/rabbitmq_management/priv/www/js/global.js

Lines changed: 241 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ var HELP = {
223223
'Optional replacement routing key to use when a message is dead-lettered. If this is not set, the message\'s original routing key will be used.<br/>(Sets the "<a target="_blank" href="https://rabbitmq.com/dlx.html">x-dead-letter-routing-key</a>" argument.)',
224224

225225
'queue-dead-letter-strategy':
226-
'Valid values are <code>at-most-once</code> or <code>at-least-once</code>. It defaults to <code>at-most-once</code>. This setting is understood only by quorum queues. If <code>at-least-once</code> is set, <code>Overflow behaviour</code> must be set to <code>reject-publish</code>. Otherwise, dead letter strategy will fall back to <code>at-most-once</code>.',
226+
'Valid values are <code>at-most-once</code> or <code>at-least-once</code>. It defaults to <code>at-most-once</code>. If <code>at-least-once</code> is set, <code>Overflow behaviour</code> must be set to <code>reject-publish</code>. Otherwise, dead letter strategy will fall back to <code>at-most-once</code>.',
227227

228228
'queue-single-active-consumer':
229229
'If set, makes sure only one consumer at a time consumes from the queue and fails over to another registered consumer in case the active one is cancelled or dies.<br/>(Sets the "<a target="_blank" href="https://rabbitmq.com/consumers.html#single-active-consumer">x-single-active-consumer</a>" argument.)',
@@ -235,7 +235,10 @@ var HELP = {
235235
'Sets the data retention for stream queues in time units </br>(Y=Years, M=Months, D=Days, h=hours, m=minutes, s=seconds).<br/>E.g. "1h" configures the stream to only keep the last 1 hour of received messages.</br></br>(Sets the x-max-age argument.)',
236236

237237
'queue-overflow':
238-
'Sets the <a target="_blank" href="https://www.rabbitmq.com/maxlength.html#overflow-behaviour">queue overflow behaviour</a>. This determines what happens to messages when the maximum length of a queue is reached. Valid values are <code>drop-head</code>, <code>reject-publish</code> or <code>reject-publish-dlx</code>. The quorum queue type only supports <code>drop-head</code> and <code>reject-publish</code>.',
238+
'Sets the <a target="_blank" href="https://www.rabbitmq.com/maxlength.html#overflow-behaviour">queue overflow behaviour</a>. This determines what happens to messages when the maximum length of a queue is reached. Valid values are <code>drop-head</code>, <code>reject-publish</code> or <code>reject-publish-dlx</code>',
239+
240+
'quorum-queue-overflow':
241+
'Sets the <a target="_blank" href="https://www.rabbitmq.com/maxlength.html#overflow-behaviour">queue overflow behaviour</a>. This determines what happens to messages when the maximum length of a queue is reached. Valid values for quorum queues are <code>drop-head</code> and <code>reject-publish</code>.',
239242

240243
'queue-master-locator':
241244
'Deprecated: please use `queue-leader-locator` instead. <a target="_blank" href="https://www.rabbitmq.com/docs/clustering#replica-placement">Controls which node the queue will be running on.</a>',
@@ -887,3 +890,239 @@ var chart_data = {};
887890
var last_page_out_of_range_error = 0;
888891

889892
var oauth;
893+
894+
895+
///////////////////////////////////////////////////////////////////////////
896+
// //
897+
// Queue types //
898+
// //
899+
///////////////////////////////////////////////////////////////////////////
900+
901+
/// this queue types are very well known to the server, at the very least
902+
/// this collection must be validated in terms of matching server queue
903+
/// types registry. I hope I will have time for this.
904+
905+
/// this one defaults to classic, How can a queue be without type?
906+
var QUEUE_TYPE = function (queue) {
907+
if (queue["arguments"]) {
908+
if (queue["arguments"]["x-queue-type"]) {
909+
return QUEUE_TYPE[queue["arguments"]["x-queue-type"]];
910+
} else {
911+
/// I observed that streams do not have
912+
/// (at least always) x-queue-type
913+
/// but all queues seems to be having
914+
/// type field.
915+
/// curiosuly is_[type] functions in main.js
916+
/// rely on x-queue-type. is_stream might be
917+
/// broken here.
918+
if (queue.hasOwnProperty("type")) {
919+
return QUEUE_TYPE[queue.type];
920+
}
921+
else {
922+
return QUEUE_TYPE["classic"];
923+
}
924+
}
925+
} else {
926+
return QUEUE_TYPE["classic"];
927+
}
928+
}
929+
// TODO: while this allows for custom queues
930+
// the proper way is to follow single source of truth
931+
// and generate most of this on the server from queue type metadata
932+
// including replacing tmpl's with data-driven generators
933+
// For example server knows policy_apply_to for each queue
934+
// and it knows what extra agruments each queue type accepts.
935+
// So for the latter case we dont' need a template that lists
936+
// queue args. We need iterator over server-supplied object.
937+
QUEUE_TYPE["default"] = {
938+
label: "Default",
939+
params: {},
940+
policy_apply_to: "classic_queue",
941+
actions: {
942+
get_message: true,
943+
purge: true
944+
},
945+
tmpl: {
946+
"arguments" : "classic-queue-arguments",
947+
// TODO: this must be generated from js objects of course.
948+
// and then those objects must be rendered by the server
949+
"user_policy_arguments": "classic-queue-user-policy-arguments",
950+
"operator_policy_arguments": "classic-queue-operator-policy-arguments",
951+
"list" : "classic-queue-list",
952+
"stats" : "classic-queue-stats",
953+
"node_details" : "classic-queue-node-details"
954+
}
955+
};
956+
957+
QUEUE_TYPE["classic"] = {
958+
label: "Classic",
959+
params: {},
960+
policy_apply_to: "classic_queue",
961+
actions: {
962+
get_message: true,
963+
purge: true
964+
},
965+
tmpl: {
966+
"arguments" : "classic-queue-arguments",
967+
"user_policy_arguments": "classic-queue-user-policy-arguments",
968+
"operator_policy_arguments": "classic-queue-operator-policy-arguments",
969+
"list" : "classic-queue-list",
970+
"stats" : "classic-queue-stats",
971+
"node_details" : "classic-queue-node-details"
972+
}
973+
};
974+
975+
QUEUE_TYPE["quorum"] = {
976+
label: "Quorum",
977+
params: {
978+
'durable': true,
979+
'auto_delete': false
980+
},
981+
policy_apply_to: "quorum_queues",
982+
actions: {
983+
get_message: true,
984+
purge: true
985+
},
986+
tmpl: {
987+
"arguments" : "quorum-queue-arguments",
988+
"user_policy_arguments": "quorum-queue-user-policy-arguments",
989+
"operator_policy_arguments": "quorum-queue-operator-policy-arguments",
990+
"list" : "quorum-queue-list",
991+
"stats": "quorum-queue-stats",
992+
"node_details" : "quorum-queue-node-details"
993+
}
994+
};
995+
996+
QUEUE_TYPE["stream"] = {
997+
label: "Stream",
998+
params: {
999+
'durable': true,
1000+
'auto_delete': false
1001+
},
1002+
policy_apply_to: "streams",
1003+
actions: {
1004+
get_message: false,
1005+
purge: false
1006+
},
1007+
tmpl: {
1008+
"arguments" : "stream-queue-arguments",
1009+
"user_policy_arguments": "quorum-queue-user-policy-arguments",
1010+
"operator_policy_arguments": "stream-queue-operator-policy-arguments",
1011+
"list" : "stream-queue-list",
1012+
"stats" : "stream-queue-stats",
1013+
"node_details" : "stream-queue-node-details"
1014+
}
1015+
};
1016+
1017+
// here I'll shortcut for now and let it be like that
1018+
// other queue types can inject themlves where they want.
1019+
// since the 'sections' object will likely keep key insertion
1020+
// order custom keys for queue type will be coming last.
1021+
1022+
// maybe add helper functions?
1023+
var MEMORY_STATISTICS = {
1024+
sections: {'queue_procs' : ['classic', 'Classic queues'],
1025+
'quorum_queue_procs' : ['quorum', 'Quorum queues'],
1026+
'quorum_queue_dlx_procs' : ['quorum', 'Dead letter workers'],
1027+
'stream_queue_procs' : ['stream', 'Stream queues'],
1028+
'stream_queue_replica_reader_procs' : ['stream', 'Stream queues (replica reader)'],
1029+
'stream_queue_coordinator_procs' : ['stream', 'Stream queues (coordinator)'],
1030+
'binary' : ['binary', 'Binaries'],
1031+
'connection_readers' : ['conn', 'Connection readers'],
1032+
'connection_writers' : ['conn', 'Connection writers'],
1033+
'connection_channels' : ['conn', 'Connection channels'],
1034+
'connection_other' : ['conn', 'Connections (other)'],
1035+
'mnesia' : ['table', 'Mnesia'],
1036+
'msg_index' : ['table', 'Message store index'],
1037+
'mgmt_db' : ['table', 'Management database'],
1038+
'quorum_ets' : ['table', 'Quorum queue ETS tables'],
1039+
'other_ets' : ['table', 'Other ETS tables'],
1040+
'plugins' : ['proc', 'Plugins'],
1041+
'other_proc' : ['proc', 'Other process memory'],
1042+
'code' : ['system', 'Code'],
1043+
'atom' : ['system', 'Atoms'],
1044+
'other_system' : ['system', 'Other system'],
1045+
'allocated_unused' : ['unused', 'Allocated unused'],
1046+
'reserved_unallocated': ['unused', 'Unallocated reserved by the OS']},
1047+
keys: [[{name: 'Classic Queues', colour: 'classic',
1048+
keys: [['queue_procs', 'queues']]},
1049+
{name: 'Quorum Queues', colour: 'quorum',
1050+
keys: [['quorum_queue_procs','quorum'],
1051+
['quorum_queue_dlx_procs', 'dead letter workers']]},
1052+
{name: 'Streams', colour: 'stream',
1053+
keys: [['stream_queue_procs', 'stream'],
1054+
['stream_queue_replica_reader_procs', 'stream replica reader'],
1055+
['stream_queue_coordinator_procs', 'stream coordinator']]},
1056+
{name: 'Binaries', colour: 'binary',
1057+
keys: [['binary', '']]}],
1058+
1059+
[{name: 'Connections', colour: 'conn',
1060+
keys: [['connection_readers', 'readers'],
1061+
['connection_writers', 'writers'],
1062+
['connection_channels', 'channels'],
1063+
['connection_other', 'other']]}],
1064+
1065+
[{name: 'Tables', colour: 'table',
1066+
keys: [['mnesia', 'internal database tables'],
1067+
['msg_index', 'message store index'],
1068+
['mgmt_db', 'management database'],
1069+
['quorum_ets', 'quorum queue tables'],
1070+
['other_ets', 'other']]}],
1071+
1072+
[{name: 'Processes', colour: 'proc',
1073+
keys: [['plugins', 'plugins'],
1074+
['metadata_store', 'metadata store'],
1075+
['other_proc', 'other']]},
1076+
{name: 'System', colour: 'system',
1077+
keys: [['code', 'code'],
1078+
['atom', 'atoms'],
1079+
['other_system', 'other']
1080+
]}],
1081+
1082+
[{name: 'Preallocated memory', colour: 'unused',
1083+
keys: [['allocated_unused', 'preallocated by runtime, unused'],
1084+
['reserved_unallocated', 'unallocated, reserved by the OS']]}]]
1085+
}
1086+
1087+
var BINARY_STATISTICS = {
1088+
sections: {'queue_procs' : ['classic', 'Classic queues'],
1089+
'quorum_queue_procs' : ['quorum', 'Quorum queues'],
1090+
'quorum_queue_dlx_procs' : ['quorum', 'Dead letter workers'],
1091+
'stream_queue_procs' : ['stream', 'Stream queues'],
1092+
'stream_queue_replica_reader_procs' : ['stream', 'Stream queues (replica reader)'],
1093+
'stream_queue_coordinator_procs' : ['stream', 'Stream queues (coordinator)'],
1094+
'connection_readers' : ['conn', 'Connection readers'],
1095+
'connection_writers' : ['conn', 'Connection writers'],
1096+
'connection_channels' : ['conn', 'Connection channels'],
1097+
'connection_other' : ['conn', 'Connections (other)'],
1098+
'msg_index' : ['table', 'Message store index'],
1099+
'mgmt_db' : ['table', 'Management database'],
1100+
'plugins' : ['proc', 'Plugins'],
1101+
'metadata_store' : ['metadata_store', 'Metadata store'],
1102+
'other' : ['system', 'Other binary references']},
1103+
key: [[{name: 'Classic Queues', colour: 'classic',
1104+
keys: [['queue_procs', 'queues']]},
1105+
{name: 'Quorum Queues', colour: 'quorum',
1106+
keys: [['quorum_queue_procs', 'quorum'],
1107+
['quorum_queue_dlx_procs', 'dead letter workers']]},
1108+
{name: 'Streams', colour: 'stream',
1109+
keys: [['stream_queue_procs', 'stream'],
1110+
['stream_queue_replica_reader_procs', 'stream replica reader'],
1111+
['stream_queue_coordinator_procs', 'stream coordinator']]}],
1112+
1113+
[{name: 'Connections', colour: 'conn',
1114+
keys: [['connection_readers', 'readers'],
1115+
['connection_writers', 'writers'],
1116+
['connection_channels', 'channels'],
1117+
['connection_other', 'other']]}],
1118+
1119+
[{name: 'Tables', colour: 'table',
1120+
keys: [['msg_index', 'message store index'],
1121+
['mgmt_db', 'management database']]}],
1122+
1123+
[{name: 'Processes', colour: 'proc',
1124+
keys: [['plugins', 'plugins'],
1125+
['metadata_store', 'metadata store']]},
1126+
{name: 'System', colour: 'system',
1127+
keys: [['other', 'other']]}]]
1128+
};

deps/rabbitmq_management/priv/www/js/main.js

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1578,11 +1578,8 @@ function collapse_multifields(params0) {
15781578
if (queue_type != 'default') {
15791579
params['arguments']['x-queue-type'] = queue_type;
15801580
}
1581-
if (queue_type == 'quorum' ||
1582-
queue_type == 'stream') {
1583-
params['durable'] = true;
1584-
params['auto_delete'] = false;
1585-
}
1581+
1582+
params = Object.assign(params, QUEUE_TYPE[queue_type].params)
15861583
}
15871584
return params;
15881585
}

deps/rabbitmq_management/priv/www/js/tmpl/binary.ejs

Lines changed: 8 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -5,56 +5,14 @@
55
Binary statistics not available.
66
</p>
77
<% } else { %>
8-
<%
9-
var sections = {'queue_procs' : ['classic', 'Classic queues'],
10-
'quorum_queue_procs' : ['quorum', 'Quorum queues'],
11-
'quorum_queue_dlx_procs' : ['quorum', 'Dead letter workers'],
12-
'stream_queue_procs' : ['stream', 'Stream queues'],
13-
'stream_queue_replica_reader_procs' : ['stream', 'Stream queues (replica reader)'],
14-
'stream_queue_coordinator_procs' : ['stream', 'Stream queues (coordinator)'],
15-
'connection_readers' : ['conn', 'Connection readers'],
16-
'connection_writers' : ['conn', 'Connection writers'],
17-
'connection_channels' : ['conn', 'Connection channels'],
18-
'connection_other' : ['conn', 'Connections (other)'],
19-
'msg_index' : ['table', 'Message store index'],
20-
'mgmt_db' : ['table', 'Management database'],
21-
'plugins' : ['proc', 'Plugins'],
22-
'metadata_store' : ['metadata_store', 'Metadata store'],
23-
'other' : ['system', 'Other binary references']};
24-
var total_out = [];
25-
%>
26-
<%= format('memory-bar', {sections: sections, memory: binary, total_out: total_out}) %>
27-
<span class="clear">&nbsp;</span>
28-
<div class="box">
29-
<%
30-
var key = [[{name: 'Classic Queues', colour: 'classic',
31-
keys: [['queue_procs', 'queues']]},
32-
{name: 'Quorum Queues', colour: 'quorum',
33-
keys: [['quorum_queue_procs', 'quorum'],
34-
['quorum_queue_dlx_procs', 'dead letter workers']]},
35-
{name: 'Streams', colour: 'stream',
36-
keys: [['stream_queue_procs', 'stream'],
37-
['stream_queue_replica_reader_procs', 'stream replica reader'],
38-
['stream_queue_coordinator_procs', 'stream coordinator']]}],
39-
40-
[{name: 'Connections', colour: 'conn',
41-
keys: [['connection_readers', 'readers'],
42-
['connection_writers', 'writers'],
43-
['connection_channels', 'channels'],
44-
['connection_other', 'other']]}],
45-
46-
[{name: 'Tables', colour: 'table',
47-
keys: [['msg_index', 'message store index'],
48-
['mgmt_db', 'management database']]}],
49-
50-
[{name: 'Processes', colour: 'proc',
51-
keys: [['plugins', 'plugins'],
52-
['metadata_store', 'metadata store']]},
53-
{name: 'System', colour: 'system',
54-
keys: [['other', 'other']]}]];
55-
%>
56-
<%= format('memory-table', {key: key, memory: binary}) %>
57-
</div>
8+
<%
9+
var total_out = [];
10+
%>
11+
<%= format('memory-bar', {sections: BINARY_STATISTICS.sections, memory: binary, total_out: total_out}) %>
12+
<span class="clear">&nbsp;</span>
13+
<div class="box">
14+
<%= format('memory-table', {key: BINARY_STATISTICS. key, memory: binary}) %>
15+
</div>
5816
5917
<div class="memory-info">
6018
Last updated: <b><%= fmt_date(new Date()) %></b>.<br/>
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<span class="argument-link" field="arguments" key="x-expires" type="number">Auto expire</span> <span class="help" id="queue-expires"></span> |
2+
<span class="argument-link" field="arguments" key="x-message-ttl" type="number">Message TTL</span> <span class="help" id="queue-message-ttl"></span> |
3+
<span class="argument-link" field="arguments" key="x-overflow" type="string">Overflow behaviour</span> <span class="help" id="queue-overflow"></span><br/>
4+
<span class="argument-link" field="arguments" key="x-single-active-consumer" type="boolean">Single active consumer</span> <span class="help" id="queue-single-active-consumer"></span> |
5+
<span class="argument-link" field="arguments" key="x-dead-letter-exchange" type="string">Dead letter exchange</span> <span class="help" id="queue-dead-letter-exchange"></span> |
6+
<span class="argument-link" field="arguments" key="x-dead-letter-routing-key" type="string">Dead letter routing key</span> <span class="help" id="queue-dead-letter-routing-key"></span><br/>
7+
<span class="argument-link" field="arguments" key="x-max-length" type="number">Max length</span> <span class="help" id="queue-max-length"></span> |
8+
<span class="argument-link" field="arguments" key="x-max-length-bytes" type="number">Max length bytes</span> <span class="help" id="queue-max-length-bytes"></span>
9+
| <span class="argument-link" field="arguments" key="x-max-priority" type="number">Maximum priority</span> <span class="help" id="queue-max-priority"></span>
10+
| <span class="argument-link" field="arguments" key="x-queue-leader-locator" type="string">Leader locator</span><span class="help" id="queue-leader-locator"></span>
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
<tr>
2+
<th>Node</th>
3+
<td><%= fmt_node(queue.node) %></td>
4+
</tr>

0 commit comments

Comments
 (0)