Skip to content

Commit 5c21de9

Browse files
author
Matthias Radestock
committed
refactoring: rename "round robin" to "active consumers"
1 parent 096e6d4 commit 5c21de9

File tree

1 file changed

+31
-27
lines changed

1 file changed

+31
-27
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
has_had_consumers,
5454
next_msg_id,
5555
message_buffer,
56-
round_robin,
56+
active_consumers,
5757
blocked_consumers}).
5858

5959
-record(consumer, {tag, ack_required}).
@@ -100,7 +100,7 @@ init(Q) ->
100100
has_had_consumers = false,
101101
next_msg_id = 1,
102102
message_buffer = queue:new(),
103-
round_robin = queue:new(),
103+
active_consumers = queue:new(),
104104
blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}.
105105

106106
terminate(_Reason, State) ->
@@ -167,14 +167,14 @@ record_current_channel_tx(ChPid, Txn) ->
167167

168168
deliver_immediately(Message, Delivered,
169169
State = #q{q = #amqqueue{name = QName},
170-
round_robin = RoundRobin,
170+
active_consumers = ActiveConsumers,
171171
blocked_consumers = BlockedConsumers,
172172
next_msg_id = NextId}) ->
173173
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
174-
case queue:out(RoundRobin) of
174+
case queue:out(ActiveConsumers) of
175175
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
176176
ack_required = AckRequired}}},
177-
RoundRobinTail} ->
177+
ActiveConsumersTail} ->
178178
C = #cr{limiter_pid = LimiterPid,
179179
unsent_message_count = Count,
180180
unacked_messages = UAM} = ch_record(ChPid),
@@ -190,29 +190,31 @@ deliver_immediately(Message, Delivered,
190190
NewC = C#cr{unsent_message_count = Count + 1,
191191
unacked_messages = NewUAM},
192192
store_ch_record(NewC),
193-
{NewRoundRobin, NewBlockedConsumers} =
193+
{NewActiveConsumers, NewBlockedConsumers} =
194194
case ch_record_state_transition(C, NewC) of
195-
ok -> {queue:in(QEntry, RoundRobinTail),
195+
ok -> {queue:in(QEntry, ActiveConsumersTail),
196196
BlockedConsumers};
197197
block ->
198-
{RoundRobin1, BlockedConsumers1} =
198+
{ActiveConsumers1, BlockedConsumers1} =
199199
move_consumers(ChPid,
200-
RoundRobinTail,
200+
ActiveConsumersTail,
201201
BlockedConsumers),
202-
{RoundRobin1,
202+
{ActiveConsumers1,
203203
queue:in(QEntry, BlockedConsumers1)}
204204
end,
205205
{offered, AckRequired,
206-
State#q{round_robin = NewRoundRobin,
206+
State#q{active_consumers = NewActiveConsumers,
207207
blocked_consumers = NewBlockedConsumers,
208208
next_msg_id = NextId + 1}};
209209
false ->
210210
store_ch_record(C#cr{is_limit_active = true}),
211-
{NewRoundRobin, NewBlockedConsumers} =
212-
move_consumers(ChPid, RoundRobin, BlockedConsumers),
211+
{NewActiveConsumers, NewBlockedConsumers} =
212+
move_consumers(ChPid,
213+
ActiveConsumers,
214+
BlockedConsumers),
213215
deliver_immediately(
214216
Message, Delivered,
215-
State#q{round_robin = NewRoundRobin,
217+
State#q{active_consumers = NewActiveConsumers,
216218
blocked_consumers = NewBlockedConsumers})
217219
end;
218220
{empty, _} ->
@@ -277,12 +279,12 @@ possibly_unblock(State, ChPid, Update) ->
277279
store_ch_record(NewC),
278280
case ch_record_state_transition(C, NewC) of
279281
ok -> State;
280-
unblock -> {NewBlockedeConsumers, NewRoundRobin} =
282+
unblock -> {NewBlockedeConsumers, NewActiveConsumers} =
281283
move_consumers(ChPid,
282284
State#q.blocked_consumers,
283-
State#q.round_robin),
285+
State#q.active_consumers),
284286
run_poke_burst(
285-
State#q{round_robin = NewRoundRobin,
287+
State#q{active_consumers = NewActiveConsumers,
286288
blocked_consumers = NewBlockedeConsumers})
287289
end
288290
end.
@@ -312,7 +314,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
312314
{ChPid, _} -> none;
313315
Other -> Other
314316
end,
315-
round_robin = remove_consumers(ChPid, State#q.round_robin),
317+
active_consumers = remove_consumers(
318+
ChPid, State#q.active_consumers),
316319
blocked_consumers = remove_consumers(
317320
ChPid, State#q.blocked_consumers)}),
318321
case should_auto_delete(NewState) of
@@ -360,7 +363,7 @@ run_poke_burst(MessageBuffer, State) ->
360363
State#q{message_buffer = MessageBuffer}
361364
end.
362365

363-
is_unused(State) -> queue:is_empty(State#q.round_robin) andalso
366+
is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso
364367
queue:is_empty(State#q.blocked_consumers).
365368

366369
maybe_send_reply(_ChPid, undefined) -> ok;
@@ -521,7 +524,7 @@ i(acks_uncommitted, _) ->
521524
lists:sum([length(Pending) ||
522525
#tx{pending_acks = Pending} <- all_tx_record()]);
523526
i(consumers, State) ->
524-
queue:len(State#q.round_robin) + queue:len(State#q.blocked_consumers);
527+
queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers);
525528
i(transactions, _) ->
526529
length(all_tx_record());
527530
i(memory, _) ->
@@ -639,10 +642,10 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
639642
State1#q.blocked_consumers)};
640643
false -> run_poke_burst(
641644
State1#q{
642-
round_robin =
645+
active_consumers =
643646
add_consumer(
644647
ChPid, Consumer,
645-
State1#q.round_robin)})
648+
State1#q.active_consumers)})
646649
end,
647650
reply(ok, State2)
648651
end
@@ -666,9 +669,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
666669
State#q{exclusive_consumer = cancel_holder(ChPid,
667670
ConsumerTag,
668671
Holder),
669-
round_robin = remove_consumer(
670-
ChPid, ConsumerTag,
671-
State#q.round_robin),
672+
active_consumers = remove_consumer(
673+
ChPid, ConsumerTag,
674+
State#q.active_consumers),
672675
blocked_consumers = remove_consumer(
673676
ChPid, ConsumerTag,
674677
State#q.blocked_consumers)},
@@ -680,8 +683,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
680683

681684
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
682685
message_buffer = MessageBuffer,
683-
round_robin = RoundRobin}) ->
684-
reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State);
686+
active_consumers = ActiveConsumers}) ->
687+
reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)},
688+
State);
685689

686690
handle_call({delete, IfUnused, IfEmpty}, _From,
687691
State = #q{message_buffer = MessageBuffer}) ->

0 commit comments

Comments
 (0)