|
6 | 6 | %% |
7 | 7 |
|
8 | 8 | -module(rabbit_queue_type). |
| 9 | +-feature(maybe_expr, enable). |
9 | 10 |
|
10 | 11 | -behaviour(rabbit_registry_class). |
11 | 12 |
|
@@ -282,7 +283,12 @@ is_compatible(Type, Durable, Exclusive, AutoDelete) -> |
282 | 283 | declare(Q0, Node) -> |
283 | 284 | Q = rabbit_queue_decorator:set(rabbit_policy:set(Q0)), |
284 | 285 | Mod = amqqueue:get_type(Q), |
285 | | - Mod:declare(Q, Node). |
| 286 | + case check_queue_limits(Q) of |
| 287 | + ok -> |
| 288 | + Mod:declare(Q, Node); |
| 289 | + Error -> |
| 290 | + Error |
| 291 | + end. |
286 | 292 |
|
287 | 293 | -spec delete(amqqueue:amqqueue(), boolean(), |
288 | 294 | boolean(), rabbit_types:username()) -> |
@@ -730,3 +736,25 @@ known_queue_type_names() -> |
730 | 736 | {QueueTypes, _} = lists:unzip(Registered), |
731 | 737 | QTypeBins = lists:map(fun(X) -> atom_to_binary(X) end, QueueTypes), |
732 | 738 | ?KNOWN_QUEUE_TYPES ++ QTypeBins. |
| 739 | + |
| 740 | +-spec check_queue_limits(amqqueue:amqqueue()) -> |
| 741 | + ok | |
| 742 | + {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. |
| 743 | +check_queue_limits(Q) -> |
| 744 | + maybe |
| 745 | + %% Prepare for more checks |
| 746 | + ok ?= check_vhost_queue_limit(Q) |
| 747 | + end. |
| 748 | + |
| 749 | +check_vhost_queue_limit(Q) -> |
| 750 | + #resource{name = QueueName} = amqqueue:get_name(Q), |
| 751 | + VHost = amqqueue:get_vhost(Q), |
| 752 | + case rabbit_vhost_limit:is_over_queue_limit(VHost) of |
| 753 | + false -> |
| 754 | + ok; |
| 755 | + {true, Limit} -> |
| 756 | + {protocol_error, precondition_failed, |
| 757 | + "cannot declare queue '~ts': " |
| 758 | + "queue limit in vhost '~ts' (~tp) is reached", |
| 759 | + [QueueName, VHost, Limit]} |
| 760 | + end. |
0 commit comments