|
8 | 8 | -module(rabbit_quorum_queue). |
9 | 9 |
|
10 | 10 | -behaviour(rabbit_queue_type). |
| 11 | +-behaviour(rabbit_policy_validator). |
| 12 | +-behaviour(rabbit_policy_merge_strategy). |
11 | 13 |
|
12 | 14 | -export([init/1, |
13 | 15 | close/1, |
|
35 | 37 | -export([format/1]). |
36 | 38 | -export([open_files/1]). |
37 | 39 | -export([peek/2, peek/3]). |
38 | | --export([add_member/4]). |
39 | | --export([delete_member/3]). |
| 40 | +-export([add_member/4, add_member/2]). |
| 41 | +-export([delete_member/3, delete_member/2]). |
40 | 42 | -export([requeue/3]). |
41 | 43 | -export([policy_changed/1]). |
42 | 44 | -export([format_ra_event/3]). |
|
65 | 67 | is_compatible/3, |
66 | 68 | declare/2, |
67 | 69 | is_stateful/0]). |
| 70 | +-export([validate_policy/1, merge_policy_value/3]). |
68 | 71 |
|
69 | 72 | -export([force_shrink_member_to_current_member/2, |
70 | 73 | force_all_queues_shrink_member_to_current_member/0]). |
|
114 | 117 | -define(ADD_MEMBER_TIMEOUT, 5000). |
115 | 118 | -define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096 |
116 | 119 |
|
| 120 | +%%----------- QQ policies --------------------------------------------------- |
| 121 | + |
| 122 | +-rabbit_boot_step( |
| 123 | + {?MODULE, |
| 124 | + [{description, "QQ target group size policies. " |
| 125 | + "target-group-size controls the targeted number of " |
| 126 | + "member nodes for the queue. If set, RabbitMQ will try to " |
| 127 | + "grow the queue members to the target size. " |
| 128 | + "See module rabbit_queue_member_eval."}, |
| 129 | + {mfa, {rabbit_registry, register, |
| 130 | + [policy_validator, <<"target-group-size">>, ?MODULE]}}, |
| 131 | + {mfa, {rabbit_registry, register, |
| 132 | + [operator_policy_validator, <<"target-group-size">>, ?MODULE]}}, |
| 133 | + {mfa, {rabbit_registry, register, |
| 134 | + [policy_merge_strategy, <<"target-group-size">>, ?MODULE]}}, |
| 135 | + {requires, rabbit_registry}, |
| 136 | + {enables, recovery}]}). |
| 137 | + |
| 138 | +validate_policy(Args) -> |
| 139 | + Count = proplists:get_value(<<"target-group-size">>, Args, none), |
| 140 | + case is_integer(Count) andalso Count > 0 of |
| 141 | + true -> ok; |
| 142 | + false -> {error, "~tp is not a valid qq target count value", [Count]} |
| 143 | + end. |
| 144 | + |
| 145 | +merge_policy_value(<<"target-group-size">>, Val, OpVal) -> |
| 146 | + max(Val, OpVal). |
| 147 | + |
117 | 148 | %%----------- rabbit_queue_type --------------------------------------------- |
118 | 149 |
|
119 | 150 | -spec is_enabled() -> boolean(). |
@@ -215,6 +246,7 @@ start_cluster(Q) -> |
215 | 246 | ok = rabbit_fifo_client:update_machine_state(LeaderId, |
216 | 247 | ra_machine_config(NewQ)), |
217 | 248 | notify_decorators(QName, startup), |
| 249 | + rabbit_quorum_queue_periodic_membership_reconciliation:queue_created(NewQ), |
218 | 250 | rabbit_event:notify(queue_created, |
219 | 251 | [{name, QName}, |
220 | 252 | {durable, Durable}, |
@@ -1093,6 +1125,8 @@ add_member(VHost, Name, Node, Timeout) -> |
1093 | 1125 | E |
1094 | 1126 | end. |
1095 | 1127 |
|
| 1128 | +add_member(Q, Node) -> |
| 1129 | + add_member(Q, Node, ?ADD_MEMBER_TIMEOUT). |
1096 | 1130 | add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) -> |
1097 | 1131 | {RaName, _} = amqqueue:get_pid(Q), |
1098 | 1132 | QName = amqqueue:get_name(Q), |
|
0 commit comments