Skip to content

Commit dcc7718

Browse files
Merge branch 'main' into rabbitmq-server-8323
2 parents 68eabb1 + c36670c commit dcc7718

File tree

17 files changed

+83
-156
lines changed

17 files changed

+83
-156
lines changed

MODULE.bazel

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,8 @@ erlang_package.hex_package(
179179
erlang_package.hex_package(
180180
name = "csv",
181181
build_file = "@rabbitmq-server//bazel:BUILD.csv",
182-
sha256 = "54508938ac67e27966b10ef49606e3ad5995d665d7fc2688efb3eab1307c9079",
183-
version = "2.4.1",
182+
sha256 = "cbbe5455c93df5f3f2943e995e28b7a8808361ba34cf3e44267d77a01eaf1609",
183+
version = "3.0.5",
184184
)
185185

186186
erlang_package.hex_package(
@@ -258,13 +258,6 @@ erlang_package.git_package(
258258
tag = "v1.5.1",
259259
)
260260

261-
erlang_package.hex_package(
262-
name = "parallel_stream",
263-
build_file = "@rabbitmq-server//bazel:BUILD.parallel_stream",
264-
sha256 = "639b2e8749e11b87b9eb42f2ad325d161c170b39b288ac8d04c4f31f8f0823eb",
265-
version = "1.0.6",
266-
)
267-
268261
erlang_package.hex_package(
269262
name = "prometheus",
270263
build_file = "@rabbitmq-server//bazel:BUILD.prometheus",
@@ -359,7 +352,6 @@ use_repo(
359352
"json",
360353
"observer_cli",
361354
"osiris",
362-
"parallel_stream",
363355
"prometheus",
364356
"ra",
365357
"ranch",

bazel/BUILD.csv

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
load("@rules_erlang//:erlang_bytecode2.bzl", "erlc_opts")
2+
13
filegroup(
24
name = "sources",
35
srcs = [
@@ -8,3 +10,17 @@ filegroup(
810
]),
911
visibility = ["//visibility:public"],
1012
)
13+
14+
erlc_opts(
15+
name = "erlc_opts",
16+
values = select({
17+
"@rules_erlang//:debug_build": [
18+
"+debug_info",
19+
],
20+
"//conditions:default": [
21+
"+debug_info",
22+
"+deterministic",
23+
],
24+
}),
25+
visibility = [":__subpackages__"],
26+
)

bazel/BUILD.parallel_stream

Lines changed: 0 additions & 10 deletions
This file was deleted.

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1744,17 +1744,17 @@ force_shrink_member_to_current_member(VHost, Name) ->
17441744
force_all_queues_shrink_member_to_current_member() ->
17451745
rabbit_log:warning("Disaster recovery procedure: shrinking all quorum queues to a single node cluster"),
17461746
Node = node(),
1747-
[begin
1748-
QName = amqqueue:get_name(Q),
1749-
{RaName, _} = amqqueue:get_pid(Q),
1750-
rabbit_log:warning("Disaster recovery procedure: shrinking queue ~p", [QName]),
1751-
ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}),
1752-
Fun = fun (QQ) ->
1753-
TS0 = amqqueue:get_type_state(QQ),
1754-
TS = TS0#{nodes => [Node]},
1755-
amqqueue:set_type_state(QQ, TS)
1756-
end,
1757-
_ = rabbit_amqqueue:update(QName, Fun)
1758-
end || Q <- rabbit_amqqueue:list(), amqqueue:get_type(Q) == ?MODULE],
1747+
_ = [begin
1748+
QName = amqqueue:get_name(Q),
1749+
{RaName, _} = amqqueue:get_pid(Q),
1750+
rabbit_log:warning("Disaster recovery procedure: shrinking queue ~p", [QName]),
1751+
ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}),
1752+
Fun = fun (QQ) ->
1753+
TS0 = amqqueue:get_type_state(QQ),
1754+
TS = TS0#{nodes => [Node]},
1755+
amqqueue:set_type_state(QQ, TS)
1756+
end,
1757+
_ = rabbit_amqqueue:update(QName, Fun)
1758+
end || Q <- rabbit_amqqueue:list(), amqqueue:get_type(Q) == ?MODULE],
17591759
rabbit_log:warning("Disaster recovery procedure: shrinking finished"),
17601760
ok.

deps/rabbit/src/rabbit_vhost.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -252,9 +252,9 @@ update_metadata(Name, Metadata0, ActingUser) ->
252252
Error
253253
end.
254254

255-
-spec update(vhost:name(), binary(), [atom()], rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
256-
update(Name, Description, Tags, ActingUser) ->
257-
Metadata = #{description => Description, tags => Tags},
255+
-spec update(vhost:name(), binary(), [atom()], rabbit_queue_type:queue_type() | 'undefined', rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
256+
update(Name, Description, Tags, DefaultQueueType, ActingUser) ->
257+
Metadata = #{description => Description, tags => Tags, default_queue_type => DefaultQueueType},
258258
update_metadata(Name, Metadata, ActingUser).
259259

260260
-spec delete(vhost:name(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
@@ -325,7 +325,7 @@ put_vhost(Name, Description, Tags0, DefaultQueueType, Trace, Username) ->
325325
rabbit_log:debug("Parsed tags ~tp to ~tp", [Tags, ParsedTags]),
326326
Result = case exists(Name) of
327327
true ->
328-
update(Name, Description, ParsedTags, Username);
328+
update(Name, Description, ParsedTags, DefaultQueueType, Username);
329329
false ->
330330
Metadata0 = #{description => Description,
331331
tags => ParsedTags},

deps/rabbitmq_cli/BUILD.bazel

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,11 @@ load(
1010
"mix_archive_build",
1111
)
1212

13-
mix_archive_build(
14-
name = "parallel_stream_ez",
15-
srcs = ["@parallel_stream//:sources"],
16-
out = "parallel_stream.ez",
17-
archives = ["@hex//:archive"],
18-
)
19-
2013
mix_archive_build(
2114
name = "csv_ez",
2215
srcs = ["@csv//:sources"],
2316
out = "csv.ez",
2417
archives = ["@hex//:archive"],
25-
ez_deps = [
26-
":parallel_stream_ez",
27-
],
2818
)
2919

3020
mix_archive_build(
@@ -49,8 +39,7 @@ rabbitmqctl(
4939
license_files = glob(["LICENSE*"]),
5040
source_deps = {
5141
"@csv//:sources": "csv",
52-
"@json//:sources": "json",
53-
"@parallel_stream//:sources": "parallel_stream", # transitive dep of csv 2.x
42+
"@json//:sources": "json"
5443
},
5544
visibility = ["//visibility:public"],
5645
deps = [
@@ -113,7 +102,6 @@ plt(
113102
"runtime_tools",
114103
],
115104
ez_deps = [
116-
":parallel_stream_ez",
117105
":csv_ez",
118106
":json_ez",
119107
],
@@ -159,7 +147,6 @@ rabbitmqctl_test(
159147
"@csv//:sources": "csv",
160148
"@dialyxir//:sources": "dialyxir",
161149
"@json//:sources": "json",
162-
"@parallel_stream//:sources": "parallel_stream", # transitive dep of csv 2.x
163150
"@temp//:sources": "temp",
164151
"@x509//:sources": "x509",
165152
},

deps/rabbitmq_cli/Makefile

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
PROJECT = rabbitmq_cli
22

33
BUILD_DEPS = rabbit_common
4-
DEPS = csv json observer_cli parallel_stream stdout_formatter
4+
DEPS = csv json observer_cli stdout_formatter
55
TEST_DEPS = amqp amqp_client dialyxir temp x509 rabbit
66

77
dep_amqp = hex 2.1.1
88
dep_csv = hex 2.4.1
99
dep_dialyxir = hex 0.5.1
10-
dep_parallel_stream = hex 1.0.6
1110
dep_json = hex 1.4.1
1211
dep_temp = hex 0.4.7
1312
dep_x509 = hex 0.7.0

deps/rabbitmq_cli/mix.exs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,6 @@ defmodule RabbitMQCtl.MixfileBase do
146146
:csv,
147147
path: Path.join(deps_dir, "csv")
148148
},
149-
{
150-
:parallel_stream,
151-
path: Path.join(deps_dir, "parallel_stream"), override: true
152-
},
153149
{
154150
:stdout_formatter,
155151
path: Path.join(deps_dir, "stdout_formatter"),

deps/rabbitmq_federation/src/rabbit_federation_link_util.erl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,13 +140,9 @@ connection_error(remote_start, {{shutdown, {server_initiated_close, Code, Messag
140140
connection_error(remote_start, E, Upstream, UParams, XorQName, State) ->
141141
rabbit_federation_status:report(
142142
Upstream, UParams, XorQName, clean_reason(E)),
143-
Reason = case E of
144-
{error, Value} -> Value;
145-
Other -> Other
146-
end,
147143
log_warning(XorQName, "did not connect to ~ts. Reason: ~tp",
148144
[rabbit_federation_upstream:params_to_string(UParams),
149-
Reason]),
145+
E]),
150146
{stop, {shutdown, restart}, State};
151147

152148
connection_error(remote, E, Upstream, UParams, XorQName, State) ->

deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,7 @@ handle_cast(pause, State = #state{run = false}) ->
104104
handle_cast(pause, State = #not_started{}) ->
105105
{noreply, State#not_started{run = false}};
106106

107-
handle_cast(pause, State = #state{ch = Ch, upstream = Upstream = #upstream{
108-
name = UpName, queue_name = QName
109-
}}) ->
110-
rabbit_log_federation:debug("Federation link of ~s (upstream: '~s'): asked to pause",
111-
[QName, UpName]),
107+
handle_cast(pause, State = #state{ch = Ch, upstream = Upstream}) ->
112108
cancel(Ch, Upstream),
113109
{noreply, State#state{run = false}};
114110

@@ -309,22 +305,18 @@ visit_match(_ ,_) ->
309305
consumer_tag(#upstream{consumer_tag = ConsumerTag}) ->
310306
ConsumerTag.
311307

312-
consume(Ch, Upstream = #upstream{name = UpName}, UQueue) ->
308+
consume(Ch, Upstream, UQueue) ->
313309
ConsumerTag = consumer_tag(Upstream),
314310
NoAck = Upstream#upstream.ack_mode =:= 'no-ack',
315-
rabbit_log_federation:debug("Federation link of ~ts: will consume from the upstream '~ts'",
316-
[rabbit_misc:rs(amqqueue:get_name(UQueue)), UpName]),
317311
amqp_channel:cast(
318312
Ch, #'basic.consume'{queue = name(UQueue),
319313
no_ack = NoAck,
320314
nowait = true,
321315
consumer_tag = ConsumerTag,
322316
arguments = [{<<"x-priority">>, long, -1}]}).
323317

324-
cancel(Ch, Upstream = #upstream{name = UpName, queue_name = QName}) ->
318+
cancel(Ch, Upstream) ->
325319
ConsumerTag = consumer_tag(Upstream),
326-
rabbit_log_federation:debug("Federation queue '~ts' link: will cancel consumer '~ts' on upstream '~ts'",
327-
[QName, ConsumerTag, UpName]),
328320
amqp_channel:cast(Ch, #'basic.cancel'{nowait = true,
329321
consumer_tag = ConsumerTag}).
330322

0 commit comments

Comments
 (0)