Skip to content

Commit f89dfe5

Browse files
michaelklishinmergify[bot]
authored andcommitted
Shovel: use lqueue over queue
the `queue' module is not aware of the queue length, which means its `len/1` function is O(n) while with `lqueue' it is O(1). We rely on `queue:len/1' for a shovel metric, so why not use a more efficient drop-in replacement our team has developed in 2011. (cherry picked from commit 7aa7433)
1 parent 62b1183 commit f89dfe5

File tree

3 files changed

+13
-13
lines changed

3 files changed

+13
-13
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -362,16 +362,16 @@ status(_) ->
362362
running.
363363

364364
pending_count(#{dest := Dest}) ->
365-
Pending = maps:get(pending, Dest, queue:new()),
366-
queue:len(Pending).
365+
Pending = maps:get(pending, Dest, lqueue:new()),
366+
lqueue:len(Pending).
367367

368368
add_pending(Elem, State = #{dest := Dest}) ->
369-
Pending = maps:get(pending, Dest, queue:new()),
370-
State#{dest => Dest#{pending => queue:in(Elem, Pending)}}.
369+
Pending = maps:get(pending, Dest, lqueue:new()),
370+
State#{dest => Dest#{pending => lqueue:in(Elem, Pending)}}.
371371

372372
pop_pending(State = #{dest := Dest}) ->
373-
Pending = maps:get(pending, Dest, queue:new()),
374-
case queue:out(Pending) of
373+
Pending = maps:get(pending, Dest, lqueue:new()),
374+
case lqueue:out(Pending) of
375375
{empty, _} ->
376376
empty;
377377
{{value, Elem}, Pending2} ->

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ status(State) ->
476476
end.
477477

478478
pending_count(#{dest := #{pending_delivery := Pending}}) ->
479-
queue:len(Pending);
479+
lqueue:len(Pending);
480480
pending_count(_) ->
481481
0.
482482

@@ -947,12 +947,12 @@ is_blocked(_) ->
947947
false.
948948

949949
add_pending_delivery(Elem, State = #{dest := Dest}) ->
950-
Pending = maps:get(pending_delivery, Dest, queue:new()),
951-
State#{dest => Dest#{pending_delivery => queue:in(Elem, Pending)}}.
950+
Pending = maps:get(pending_delivery, Dest, lqueue:new()),
951+
State#{dest => Dest#{pending_delivery => lqueue:in(Elem, Pending)}}.
952952

953953
pop_pending_delivery(State = #{dest := Dest}) ->
954-
Pending = maps:get(pending_delivery, Dest, queue:new()),
955-
case queue:out(Pending) of
954+
Pending = maps:get(pending_delivery, Dest, lqueue:new()),
955+
case lqueue:out(Pending) of
956956
{empty, _} ->
957957
empty;
958958
{{value, Elem}, Pending2} ->

deps/rabbitmq_shovel/test/pending_count_SUITE.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,13 @@ amqp091_pending_count_empty_queue(_Config) ->
6969

7070
amqp091_pending_count_with_messages(_Config) ->
7171
%% Test that pending_count returns correct count when messages are pending
72-
PendingQueue = queue:from_list([{1, msg1}, {2, msg2}, {3, msg3}]),
72+
PendingQueue = lqueue:from_list([{1, msg1}, {2, msg2}, {3, msg3}]),
7373
State = #{dest => #{pending => PendingQueue}},
7474
?assertEqual(3, rabbit_amqp091_shovel:pending_count(State)).
7575

7676
amqp091_pending_count_after_drain(_Config) ->
7777
%% Test that pending_count returns 0 after messages are drained
78-
EmptyQueue = queue:new(),
78+
EmptyQueue = lqueue:new(),
7979
State = #{dest => #{pending => EmptyQueue}},
8080
?assertEqual(0, rabbit_amqp091_shovel:pending_count(State)).
8181

0 commit comments

Comments
 (0)