Skip to content

Commit 9bf380d

Browse files
authored
add missing queues configurations (#110)
* add missing queues configurations --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 6764374 commit 9bf380d

File tree

4 files changed

+56
-1
lines changed

4 files changed

+56
-1
lines changed

RabbitMQ.AMQP.Client/IEntities.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,18 @@ public enum OverFlowStrategy
3535
{
3636
DropHead,
3737
RejectPublish,
38-
3938
RejectPublishDlx
4039
// DROP_HEAD("drop-head"),
4140
// REJECT_PUBLISH("reject-publish"),
4241
// REJECT_PUBLISH_DLX("reject-publish-dlx");
4342
}
4443

44+
public enum LeaderLocatorStrategy
45+
{
46+
ClientLocal,
47+
Balanced
48+
}
49+
4550
public interface IQueueSpecification : IEntityInfoSpecification<IQueueInfo>
4651
{
4752
public string QueueName { get; }
@@ -67,6 +72,8 @@ public interface IQueueSpecification : IEntityInfoSpecification<IQueueInfo>
6772

6873
IQueueSpecification MaxLengthBytes(ByteCapacity maxLengthBytes);
6974

75+
IQueueSpecification LeaderLocator(LeaderLocatorStrategy strategy);
76+
7077
// TODO: Add more tests for SingleActiveConsumer
7178
IQueueSpecification SingleActiveConsumer(bool singleActiveConsumer);
7279

@@ -93,6 +100,8 @@ public interface IStreamSpecification
93100

94101
public IStreamSpecification InitialClusterSize(int initialClusterSize);
95102

103+
public IStreamSpecification FileSizePerChunk(ByteCapacity fileSizePerChunk);
104+
96105
public IQueueSpecification Queue();
97106
}
98107

@@ -112,6 +121,8 @@ public interface IQuorumQueueSpecification
112121

113122
IQuorumQueueSpecification QuorumInitialGroupSize(int size);
114123

124+
IQuorumQueueSpecification QuorumTargetGroupSize(int size);
125+
115126
IQueueSpecification Queue();
116127
}
117128

RabbitMQ.AMQP.Client/Impl/AmqpQueueSpecification.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,17 @@ public IQueueSpecification MaxLengthBytes(ByteCapacity maxLengthBytes)
231231
return this;
232232
}
233233

234+
public IQueueSpecification LeaderLocator(LeaderLocatorStrategy leaderLocatorStrategy)
235+
{
236+
_queueArguments["x-queue-leader-locator"] = leaderLocatorStrategy switch
237+
{
238+
LeaderLocatorStrategy.ClientLocal => "client-local",
239+
LeaderLocatorStrategy.Balanced => "balanced",
240+
_ => throw new ArgumentOutOfRangeException(nameof(leaderLocatorStrategy), leaderLocatorStrategy, null)
241+
};
242+
return this;
243+
}
244+
234245
public IQueueSpecification SingleActiveConsumer(bool singleActiveConsumer)
235246
{
236247
_queueArguments["x-single-active-consumer"] = singleActiveConsumer;
@@ -397,6 +408,13 @@ public IStreamSpecification InitialClusterSize(int initialClusterSize)
397408
return this;
398409
}
399410

411+
public IStreamSpecification FileSizePerChunk(ByteCapacity fileSizePerChunk)
412+
{
413+
Utils.ValidatePositive("x-stream-file-size-per-chunk", fileSizePerChunk);
414+
_parent._queueArguments["x-stream-file-size-per-chunk"] = (long)fileSizePerChunk;
415+
return this;
416+
}
417+
400418
public IQueueSpecification Queue()
401419
{
402420
return _parent;
@@ -437,6 +455,13 @@ public IQuorumQueueSpecification QuorumInitialGroupSize(int size)
437455
return this;
438456
}
439457

458+
public IQuorumQueueSpecification QuorumTargetGroupSize(int size)
459+
{
460+
Utils.ValidatePositive("x-quorum-target-group-size", size);
461+
_parent._queueArguments["x-quorum-target-group-size"] = size;
462+
return this;
463+
}
464+
440465
public IQueueSpecification Queue()
441466
{
442467
return _parent;

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.Exclusive(bool isExclusive) ->
455455
RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.Expires(System.TimeSpan expiration) -> RabbitMQ.AMQP.Client.IQueueSpecification!
456456
RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.IsAutoDelete.get -> bool
457457
RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.IsExclusive.get -> bool
458+
RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.LeaderLocator(RabbitMQ.AMQP.Client.LeaderLocatorStrategy leaderLocatorStrategy) -> RabbitMQ.AMQP.Client.IQueueSpecification!
458459
RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.MaxLength(long maxLength) -> RabbitMQ.AMQP.Client.IQueueSpecification!
459460
RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.MaxLengthBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxLengthBytes) -> RabbitMQ.AMQP.Client.IQueueSpecification!
460461
RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.MessageTtl(System.TimeSpan ttl) -> RabbitMQ.AMQP.Client.IQueueSpecification!
@@ -474,6 +475,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.DeadLetterStrategy(RabbitMQ.AM
474475
RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.DeliveryLimit(int limit) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification!
475476
RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!
476477
RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.QuorumInitialGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification!
478+
RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.QuorumTargetGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification!
477479
RabbitMQ.AMQP.Client.Impl.AmqpRpcClient
478480
RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.AmqpRpcClient(RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration! configuration) -> void
479481
RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IMessage!>!
@@ -499,6 +501,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.RequestQueue(RabbitMQ.AMQP.Client
499501
RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.RequestQueue(string! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
500502
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification
501503
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.AmqpStreamSpecification(RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification! parent) -> void
504+
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.FileSizePerChunk(RabbitMQ.AMQP.Client.ByteCapacity! fileSizePerChunk) -> RabbitMQ.AMQP.Client.IStreamSpecification!
502505
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.InitialClusterSize(int initialClusterSize) -> RabbitMQ.AMQP.Client.IStreamSpecification!
503506
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.MaxAge(System.TimeSpan maxAge) -> RabbitMQ.AMQP.Client.IStreamSpecification!
504507
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.MaxSegmentSizeBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxSegmentSize) -> RabbitMQ.AMQP.Client.IStreamSpecification!
@@ -631,6 +634,7 @@ RabbitMQ.AMQP.Client.IQueueSpecification.Exclusive(bool isExclusive) -> RabbitMQ
631634
RabbitMQ.AMQP.Client.IQueueSpecification.Expires(System.TimeSpan expiration) -> RabbitMQ.AMQP.Client.IQueueSpecification!
632635
RabbitMQ.AMQP.Client.IQueueSpecification.IsAutoDelete.get -> bool
633636
RabbitMQ.AMQP.Client.IQueueSpecification.IsExclusive.get -> bool
637+
RabbitMQ.AMQP.Client.IQueueSpecification.LeaderLocator(RabbitMQ.AMQP.Client.LeaderLocatorStrategy strategy) -> RabbitMQ.AMQP.Client.IQueueSpecification!
634638
RabbitMQ.AMQP.Client.IQueueSpecification.MaxLength(long maxLength) -> RabbitMQ.AMQP.Client.IQueueSpecification!
635639
RabbitMQ.AMQP.Client.IQueueSpecification.MaxLengthBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxLengthBytes) -> RabbitMQ.AMQP.Client.IQueueSpecification!
636640
RabbitMQ.AMQP.Client.IQueueSpecification.MessageTtl(System.TimeSpan ttl) -> RabbitMQ.AMQP.Client.IQueueSpecification!
@@ -649,6 +653,7 @@ RabbitMQ.AMQP.Client.IQuorumQueueSpecification.DeadLetterStrategy(RabbitMQ.AMQP.
649653
RabbitMQ.AMQP.Client.IQuorumQueueSpecification.DeliveryLimit(int limit) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification!
650654
RabbitMQ.AMQP.Client.IQuorumQueueSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!
651655
RabbitMQ.AMQP.Client.IQuorumQueueSpecification.QuorumInitialGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification!
656+
RabbitMQ.AMQP.Client.IQuorumQueueSpecification.QuorumTargetGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification!
652657
RabbitMQ.AMQP.Client.IRecoveryConfiguration
653658
RabbitMQ.AMQP.Client.IRecoveryConfiguration.Activated(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration!
654659
RabbitMQ.AMQP.Client.IRecoveryConfiguration.BackOffDelayPolicy(RabbitMQ.AMQP.Client.IBackOffDelayPolicy! backOffDelayPolicy) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration!
@@ -680,12 +685,16 @@ RabbitMQ.AMQP.Client.IRpcServerBuilder.ReplyPostProcessor(System.Func<RabbitMQ.A
680685
RabbitMQ.AMQP.Client.IRpcServerBuilder.RequestQueue(RabbitMQ.AMQP.Client.IQueueSpecification! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
681686
RabbitMQ.AMQP.Client.IRpcServerBuilder.RequestQueue(string! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
682687
RabbitMQ.AMQP.Client.IStreamSpecification
688+
RabbitMQ.AMQP.Client.IStreamSpecification.FileSizePerChunk(RabbitMQ.AMQP.Client.ByteCapacity! fileSizePerChunk) -> RabbitMQ.AMQP.Client.IStreamSpecification!
683689
RabbitMQ.AMQP.Client.IStreamSpecification.InitialClusterSize(int initialClusterSize) -> RabbitMQ.AMQP.Client.IStreamSpecification!
684690
RabbitMQ.AMQP.Client.IStreamSpecification.MaxAge(System.TimeSpan maxAge) -> RabbitMQ.AMQP.Client.IStreamSpecification!
685691
RabbitMQ.AMQP.Client.IStreamSpecification.MaxSegmentSizeBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxSegmentSize) -> RabbitMQ.AMQP.Client.IStreamSpecification!
686692
RabbitMQ.AMQP.Client.IStreamSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!
687693
RabbitMQ.AMQP.Client.IUriSelector
688694
RabbitMQ.AMQP.Client.IUriSelector.Select(System.Collections.Generic.ICollection<System.Uri!>! uris) -> System.Uri!
695+
RabbitMQ.AMQP.Client.LeaderLocatorStrategy
696+
RabbitMQ.AMQP.Client.LeaderLocatorStrategy.Balanced = 1 -> RabbitMQ.AMQP.Client.LeaderLocatorStrategy
697+
RabbitMQ.AMQP.Client.LeaderLocatorStrategy.ClientLocal = 0 -> RabbitMQ.AMQP.Client.LeaderLocatorStrategy
689698
RabbitMQ.AMQP.Client.LifeCycleCallBack
690699
RabbitMQ.AMQP.Client.MessageHandler
691700
RabbitMQ.AMQP.Client.MetricsReporter

Tests/Management/ManagementTests.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,17 +151,23 @@ public async Task DeclareStreamQueueWithArguments()
151151

152152
IQueueInfo queueInfo = await _management.Queue()
153153
.Name(_queueName)
154+
.MaxLengthBytes(ByteCapacity.Kb(1024))
155+
.LeaderLocator(LeaderLocatorStrategy.Balanced)
154156
.Stream()
155157
.MaxAge(TimeSpan.FromSeconds(10))
156158
.MaxSegmentSizeBytes(ByteCapacity.Kb(1024))
157159
.InitialClusterSize(1)
160+
.FileSizePerChunk(ByteCapacity.Kb(1024))
158161
.Queue()
159162
.DeclareAsync();
160163

161164
Assert.Equal(_queueName, queueInfo.Name());
162165
Assert.Equal("10s", queueInfo.Arguments()["x-max-age"]);
163166
Assert.Equal(1024000L, queueInfo.Arguments()["x-stream-max-segment-size-bytes"]);
164167
Assert.Equal(1, queueInfo.Arguments()["x-initial-cluster-size"]);
168+
Assert.Equal("balanced", queueInfo.Arguments()["x-queue-leader-locator"]);
169+
Assert.Equal(1024000L, queueInfo.Arguments()["x-stream-file-size-per-chunk"]);
170+
Assert.Equal(1024000L, queueInfo.Arguments()["x-max-length-bytes"]);
165171
// NB: DisposeAsync will delete the queue with name _queueName
166172
}
167173

@@ -173,17 +179,21 @@ public async Task DeclareQuorumQueueWithArguments()
173179

174180
IQueueInfo queueInfo = await _management.Queue()
175181
.Name(_queueName)
182+
.LeaderLocator(LeaderLocatorStrategy.ClientLocal)
176183
.Quorum()
177184
.DeliveryLimit(12)
178185
.DeadLetterStrategy(QuorumQueueDeadLetterStrategy.AtLeastOnce)
179186
.QuorumInitialGroupSize(3)
187+
.QuorumTargetGroupSize(5)
180188
.Queue()
181189
.DeclareAsync();
182190

183191
Assert.Equal(_queueName, queueInfo.Name());
184192
Assert.Equal(12, queueInfo.Arguments()["x-max-delivery-limit"]);
185193
Assert.Equal("at-least-once", queueInfo.Arguments()["x-dead-letter-strategy"]);
186194
Assert.Equal(3, queueInfo.Arguments()["x-quorum-initial-group-size"]);
195+
Assert.Equal(5, queueInfo.Arguments()["x-quorum-target-group-size"]);
196+
Assert.Equal("client-local", queueInfo.Arguments()["x-queue-leader-locator"]);
187197
// NB: DisposeAsync will delete the queue with name _queueName
188198
}
189199

0 commit comments

Comments
 (0)