|
11 | 11 |
|
12 | 12 | -behaviour(gen_server2). |
13 | 13 |
|
14 | | --define(SYNC_INTERVAL, 200). %% milliseconds |
15 | | --define(RAM_DURATION_UPDATE_INTERVAL, 5000). |
16 | | --define(CONSUMER_BIAS_RATIO, 2.0). %% i.e. consume 100% faster |
| 14 | +-define(SYNC_INTERVAL, 200). %% milliseconds |
| 15 | +-define(UPDATE_RATES_INTERVAL, 5000). |
| 16 | +-define(CONSUMER_BIAS_RATIO, 2.0). %% i.e. consume 100% faster |
17 | 17 |
|
18 | 18 | -export([info_keys/0]). |
19 | 19 |
|
|
48 | 48 | expires, |
49 | 49 | %% timer used to periodically sync (flush) queue index |
50 | 50 | sync_timer_ref, |
51 | | - %% timer used to update ingress/egress rates and queue RAM duration target |
| 51 | + %% timer used to update ingress/egress rates |
52 | 52 | rate_timer_ref, |
53 | 53 | %% timer used to clean up this queue due to TTL (on when unused) |
54 | 54 | expiry_timer_ref, |
@@ -212,9 +212,6 @@ init_it2(Recover, From, State = #q{q = Q, |
212 | 212 | true -> |
213 | 213 | ok = file_handle_cache:register_callback( |
214 | 214 | rabbit_amqqueue, set_maximum_since_use, [self()]), |
215 | | - ok = rabbit_memory_monitor:register( |
216 | | - self(), {rabbit_amqqueue, |
217 | | - set_ram_duration_target, [self()]}), |
218 | 215 | BQ = backing_queue_module(), |
219 | 216 | BQS = bq_init(BQ, Q, TermsOrNew), |
220 | 217 | send_reply(From, {new, Q}), |
@@ -364,8 +361,7 @@ terminate_shutdown(Fun, #q{status = Status} = State) -> |
364 | 361 | fun stop_ttl_timer/1]), |
365 | 362 | case BQS of |
366 | 363 | undefined -> State1; |
367 | | - _ -> ok = rabbit_memory_monitor:deregister(self()), |
368 | | - QName = qname(State), |
| 364 | + _ -> QName = qname(State), |
369 | 365 | notify_decorators(shutdown, State), |
370 | 366 | [emit_consumer_deleted(Ch, CTag, QName, ActingUser) || |
371 | 367 | {Ch, CTag, _, _, _, _, _, _} <- |
@@ -529,8 +525,8 @@ stop_sync_timer(State) -> rabbit_misc:stop_timer(State, #q.sync_timer_ref). |
529 | 525 |
|
530 | 526 | ensure_rate_timer(State) -> |
531 | 527 | rabbit_misc:ensure_timer(State, #q.rate_timer_ref, |
532 | | - ?RAM_DURATION_UPDATE_INTERVAL, |
533 | | - update_ram_duration). |
| 528 | + ?UPDATE_RATES_INTERVAL, |
| 529 | + update_rates). |
534 | 530 |
|
535 | 531 | stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #q.rate_timer_ref). |
536 | 532 |
|
@@ -1193,7 +1189,6 @@ prioritise_cast(Msg, _Len, State) -> |
1193 | 1189 | case Msg of |
1194 | 1190 | delete_immediately -> 8; |
1195 | 1191 | {delete_exclusive, _Pid} -> 8; |
1196 | | - {set_ram_duration_target, _Duration} -> 8; |
1197 | 1192 | {set_maximum_since_use, _Age} -> 8; |
1198 | 1193 | {run_backing_queue, _Mod, _Fun} -> 6; |
1199 | 1194 | {ack, _AckTags, _ChPid} -> 4; %% [1] |
@@ -1221,13 +1216,12 @@ consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}, Low, High) -> |
1221 | 1216 | prioritise_info(Msg, _Len, #q{q = Q}) -> |
1222 | 1217 | DownPid = amqqueue:get_exclusive_owner(Q), |
1223 | 1218 | case Msg of |
1224 | | - {'DOWN', _, process, DownPid, _} -> 8; |
1225 | | - update_ram_duration -> 8; |
1226 | | - {maybe_expire, _Version} -> 8; |
1227 | | - {drop_expired, _Version} -> 8; |
1228 | | - emit_stats -> 7; |
1229 | | - sync_timeout -> 6; |
1230 | | - _ -> 0 |
| 1219 | + {'DOWN', _, process, DownPid, _} -> 8; |
| 1220 | + {maybe_expire, _Version} -> 8; |
| 1221 | + {drop_expired, _Version} -> 8; |
| 1222 | + emit_stats -> 7; |
| 1223 | + sync_timeout -> 6; |
| 1224 | + _ -> 0 |
1231 | 1225 | end. |
1232 | 1226 |
|
1233 | 1227 | handle_call({init, Recover}, From, State) -> |
@@ -1505,11 +1499,6 @@ handle_cast({deactivate_limit, ChPid}, State) -> |
1505 | 1499 | noreply(possibly_unblock(rabbit_queue_consumers:deactivate_limit_fun(), |
1506 | 1500 | ChPid, State)); |
1507 | 1501 |
|
1508 | | -handle_cast({set_ram_duration_target, Duration}, |
1509 | | - State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> |
1510 | | - BQS1 = BQ:set_ram_duration_target(Duration, BQS), |
1511 | | - noreply(State#q{backing_queue_state = BQS1}); |
1512 | | - |
1513 | 1502 | handle_cast({set_maximum_since_use, Age}, State) -> |
1514 | 1503 | ok = file_handle_cache:set_maximum_since_use(Age), |
1515 | 1504 | noreply(State); |
@@ -1659,15 +1648,12 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> |
1659 | 1648 | {stop, State1} -> stop(State1) |
1660 | 1649 | end; |
1661 | 1650 |
|
1662 | | -handle_info(update_ram_duration, State = #q{backing_queue = BQ, |
1663 | | - backing_queue_state = BQS}) -> |
1664 | | - {RamDuration, BQS1} = BQ:ram_duration(BQS), |
1665 | | - DesiredDuration = |
1666 | | - rabbit_memory_monitor:report_ram_duration(self(), RamDuration), |
1667 | | - BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), |
| 1651 | +handle_info(update_rates, State = #q{backing_queue = BQ, |
| 1652 | + backing_queue_state = BQS}) -> |
| 1653 | + BQS1 = BQ:update_rates(BQS), |
1668 | 1654 | %% Don't call noreply/1, we don't want to set timers |
1669 | 1655 | {State1, Timeout} = next_state(State#q{rate_timer_ref = undefined, |
1670 | | - backing_queue_state = BQS2}), |
| 1656 | + backing_queue_state = BQS1}), |
1671 | 1657 | {noreply, State1, Timeout}; |
1672 | 1658 |
|
1673 | 1659 | handle_info(sync_timeout, State) -> |
@@ -1697,11 +1683,8 @@ handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> |
1697 | 1683 | {hibernate, State}; |
1698 | 1684 | handle_pre_hibernate(State = #q{backing_queue = BQ, |
1699 | 1685 | backing_queue_state = BQS}) -> |
1700 | | - {RamDuration, BQS1} = BQ:ram_duration(BQS), |
1701 | | - DesiredDuration = |
1702 | | - rabbit_memory_monitor:report_ram_duration(self(), RamDuration), |
1703 | | - BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), |
1704 | | - BQS3 = BQ:handle_pre_hibernate(BQS2), |
| 1686 | + BQS1 = BQ:update_rates(BQS), |
| 1687 | + BQS3 = BQ:handle_pre_hibernate(BQS1), |
1705 | 1688 | rabbit_event:if_enabled( |
1706 | 1689 | State, #q.stats_timer, |
1707 | 1690 | fun () -> emit_stats(State, |
|
0 commit comments