Skip to content

Commit 43323c2

Browse files
committed
add missing queues configurations
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 03bbe15 commit 43323c2

File tree

4 files changed

+38
-14
lines changed

4 files changed

+38
-14
lines changed

RabbitMQ.AMQP.Client/IEntities.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ public interface IQueueSpecification : IEntityInfoSpecification<IQueueInfo>
7171
IQueueSpecification OverflowStrategy(OverFlowStrategy overflow);
7272

7373
IQueueSpecification MaxLengthBytes(ByteCapacity maxLengthBytes);
74+
75+
IQueueSpecification LeaderLocator(LeaderLocatorStrategy strategy);
7476

7577
// TODO: Add more tests for SingleActiveConsumer
7678
IQueueSpecification SingleActiveConsumer(bool singleActiveConsumer);
@@ -98,7 +100,6 @@ public interface IStreamSpecification
98100

99101
public IStreamSpecification InitialClusterSize(int initialClusterSize);
100102

101-
public IStreamSpecification LeaderLocator(LeaderLocatorStrategy leaderLocatorStrategy);
102103

103104
public IStreamSpecification FileSizePerChunk(ByteCapacity fileSizePerChunk);
104105

@@ -120,6 +121,8 @@ public interface IQuorumQueueSpecification
120121
IQuorumQueueSpecification DeliveryLimit(int limit);
121122

122123
IQuorumQueueSpecification QuorumInitialGroupSize(int size);
124+
125+
IQuorumQueueSpecification QuorumTargetGroupSize(int size);
123126

124127
IQueueSpecification Queue();
125128
}

RabbitMQ.AMQP.Client/Impl/AmqpQueueSpecification.cs

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,19 @@ public IQueueSpecification MaxLengthBytes(ByteCapacity maxLengthBytes)
231231
return this;
232232
}
233233

234+
235+
public IQueueSpecification LeaderLocator(LeaderLocatorStrategy leaderLocatorStrategy)
236+
{
237+
_queueArguments["x-queue-leader-locator"] = leaderLocatorStrategy switch
238+
{
239+
LeaderLocatorStrategy.ClientLocal => "client-local",
240+
LeaderLocatorStrategy.Balanced => "balanced",
241+
_ => throw new ArgumentOutOfRangeException(nameof(leaderLocatorStrategy), leaderLocatorStrategy, null)
242+
};
243+
return this;
244+
}
245+
246+
234247
public IQueueSpecification SingleActiveConsumer(bool singleActiveConsumer)
235248
{
236249
_queueArguments["x-single-active-consumer"] = singleActiveConsumer;
@@ -397,23 +410,12 @@ public IStreamSpecification InitialClusterSize(int initialClusterSize)
397410
return this;
398411
}
399412

400-
public IStreamSpecification LeaderLocator(LeaderLocatorStrategy leaderLocatorStrategy)
401-
{
402-
_parent._queueArguments["x-queue-leader-locator"] = leaderLocatorStrategy switch
403-
{
404-
LeaderLocatorStrategy.ClientLocal => "client-local",
405-
LeaderLocatorStrategy.Balanced => "balanced",
406-
_ => throw new ArgumentOutOfRangeException(nameof(leaderLocatorStrategy), leaderLocatorStrategy, null)
407-
};
408-
return this;
409-
}
410413

411414
public IStreamSpecification FileSizePerChunk(ByteCapacity fileSizePerChunk)
412415
{
413416
Utils.ValidatePositive("x-stream-file-size-per-chunk", fileSizePerChunk);
414417
_parent._queueArguments["x-stream-file-size-per-chunk"] = (long)fileSizePerChunk;
415418
return this;
416-
417419
}
418420

419421
public IQueueSpecification Queue()
@@ -456,6 +458,13 @@ public IQuorumQueueSpecification QuorumInitialGroupSize(int size)
456458
return this;
457459
}
458460

461+
public IQuorumQueueSpecification QuorumTargetGroupSize(int size)
462+
{
463+
Utils.ValidatePositive("x-quorum-target-group-size", size);
464+
_parent._queueArguments["x-quorum-target-group-size"] = size;
465+
return this;
466+
}
467+
459468
public IQueueSpecification Queue()
460469
{
461470
return _parent;

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.Exclusive(bool isExclusive) ->
455455
RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.Expires(System.TimeSpan expiration) -> RabbitMQ.AMQP.Client.IQueueSpecification!
456456
RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.IsAutoDelete.get -> bool
457457
RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.IsExclusive.get -> bool
458+
RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.LeaderLocator(RabbitMQ.AMQP.Client.LeaderLocatorStrategy leaderLocatorStrategy) -> RabbitMQ.AMQP.Client.IQueueSpecification!
458459
RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.MaxLength(long maxLength) -> RabbitMQ.AMQP.Client.IQueueSpecification!
459460
RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.MaxLengthBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxLengthBytes) -> RabbitMQ.AMQP.Client.IQueueSpecification!
460461
RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.MessageTtl(System.TimeSpan ttl) -> RabbitMQ.AMQP.Client.IQueueSpecification!
@@ -474,6 +475,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.DeadLetterStrategy(RabbitMQ.AM
474475
RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.DeliveryLimit(int limit) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification!
475476
RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!
476477
RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.QuorumInitialGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification!
478+
RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.QuorumTargetGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification!
477479
RabbitMQ.AMQP.Client.Impl.AmqpRpcClient
478480
RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.AmqpRpcClient(RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration! configuration) -> void
479481
RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IMessage!>!
@@ -501,7 +503,6 @@ RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification
501503
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.AmqpStreamSpecification(RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification! parent) -> void
502504
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.FileSizePerChunk(RabbitMQ.AMQP.Client.ByteCapacity! fileSizePerChunk) -> RabbitMQ.AMQP.Client.IStreamSpecification!
503505
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.InitialClusterSize(int initialClusterSize) -> RabbitMQ.AMQP.Client.IStreamSpecification!
504-
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.LeaderLocator(RabbitMQ.AMQP.Client.LeaderLocatorStrategy leaderLocatorStrategy) -> RabbitMQ.AMQP.Client.IStreamSpecification!
505506
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.MaxAge(System.TimeSpan maxAge) -> RabbitMQ.AMQP.Client.IStreamSpecification!
506507
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.MaxSegmentSizeBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxSegmentSize) -> RabbitMQ.AMQP.Client.IStreamSpecification!
507508
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!
@@ -633,6 +634,7 @@ RabbitMQ.AMQP.Client.IQueueSpecification.Exclusive(bool isExclusive) -> RabbitMQ
633634
RabbitMQ.AMQP.Client.IQueueSpecification.Expires(System.TimeSpan expiration) -> RabbitMQ.AMQP.Client.IQueueSpecification!
634635
RabbitMQ.AMQP.Client.IQueueSpecification.IsAutoDelete.get -> bool
635636
RabbitMQ.AMQP.Client.IQueueSpecification.IsExclusive.get -> bool
637+
RabbitMQ.AMQP.Client.IQueueSpecification.LeaderLocator(RabbitMQ.AMQP.Client.LeaderLocatorStrategy strategy) -> RabbitMQ.AMQP.Client.IQueueSpecification!
636638
RabbitMQ.AMQP.Client.IQueueSpecification.MaxLength(long maxLength) -> RabbitMQ.AMQP.Client.IQueueSpecification!
637639
RabbitMQ.AMQP.Client.IQueueSpecification.MaxLengthBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxLengthBytes) -> RabbitMQ.AMQP.Client.IQueueSpecification!
638640
RabbitMQ.AMQP.Client.IQueueSpecification.MessageTtl(System.TimeSpan ttl) -> RabbitMQ.AMQP.Client.IQueueSpecification!
@@ -651,6 +653,7 @@ RabbitMQ.AMQP.Client.IQuorumQueueSpecification.DeadLetterStrategy(RabbitMQ.AMQP.
651653
RabbitMQ.AMQP.Client.IQuorumQueueSpecification.DeliveryLimit(int limit) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification!
652654
RabbitMQ.AMQP.Client.IQuorumQueueSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!
653655
RabbitMQ.AMQP.Client.IQuorumQueueSpecification.QuorumInitialGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification!
656+
RabbitMQ.AMQP.Client.IQuorumQueueSpecification.QuorumTargetGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification!
654657
RabbitMQ.AMQP.Client.IRecoveryConfiguration
655658
RabbitMQ.AMQP.Client.IRecoveryConfiguration.Activated(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration!
656659
RabbitMQ.AMQP.Client.IRecoveryConfiguration.BackOffDelayPolicy(RabbitMQ.AMQP.Client.IBackOffDelayPolicy! backOffDelayPolicy) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration!
@@ -684,7 +687,6 @@ RabbitMQ.AMQP.Client.IRpcServerBuilder.RequestQueue(string! requestQueue) -> Rab
684687
RabbitMQ.AMQP.Client.IStreamSpecification
685688
RabbitMQ.AMQP.Client.IStreamSpecification.FileSizePerChunk(RabbitMQ.AMQP.Client.ByteCapacity! fileSizePerChunk) -> RabbitMQ.AMQP.Client.IStreamSpecification!
686689
RabbitMQ.AMQP.Client.IStreamSpecification.InitialClusterSize(int initialClusterSize) -> RabbitMQ.AMQP.Client.IStreamSpecification!
687-
RabbitMQ.AMQP.Client.IStreamSpecification.LeaderLocator(RabbitMQ.AMQP.Client.LeaderLocatorStrategy leaderLocatorStrategy) -> RabbitMQ.AMQP.Client.IStreamSpecification!
688690
RabbitMQ.AMQP.Client.IStreamSpecification.MaxAge(System.TimeSpan maxAge) -> RabbitMQ.AMQP.Client.IStreamSpecification!
689691
RabbitMQ.AMQP.Client.IStreamSpecification.MaxSegmentSizeBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxSegmentSize) -> RabbitMQ.AMQP.Client.IStreamSpecification!
690692
RabbitMQ.AMQP.Client.IStreamSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!

Tests/Management/ManagementTests.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,17 +151,23 @@ public async Task DeclareStreamQueueWithArguments()
151151

152152
IQueueInfo queueInfo = await _management.Queue()
153153
.Name(_queueName)
154+
.MaxLengthBytes(ByteCapacity.Kb(1024))
155+
.LeaderLocator(LeaderLocatorStrategy.Balanced)
154156
.Stream()
155157
.MaxAge(TimeSpan.FromSeconds(10))
156158
.MaxSegmentSizeBytes(ByteCapacity.Kb(1024))
157159
.InitialClusterSize(1)
160+
.FileSizePerChunk(ByteCapacity.Kb(1024))
158161
.Queue()
159162
.DeclareAsync();
160163

161164
Assert.Equal(_queueName, queueInfo.Name());
162165
Assert.Equal("10s", queueInfo.Arguments()["x-max-age"]);
163166
Assert.Equal(1024000L, queueInfo.Arguments()["x-stream-max-segment-size-bytes"]);
164167
Assert.Equal(1, queueInfo.Arguments()["x-initial-cluster-size"]);
168+
Assert.Equal("balanced", queueInfo.Arguments()["x-queue-leader-locator"]);
169+
Assert.Equal(1024000L, queueInfo.Arguments()["x-stream-file-size-per-chunk"]);
170+
Assert.Equal(1024000L, queueInfo.Arguments()["x-max-length-bytes"]);
165171
// NB: DisposeAsync will delete the queue with name _queueName
166172
}
167173

@@ -173,17 +179,21 @@ public async Task DeclareQuorumQueueWithArguments()
173179

174180
IQueueInfo queueInfo = await _management.Queue()
175181
.Name(_queueName)
182+
.LeaderLocator(LeaderLocatorStrategy.ClientLocal)
176183
.Quorum()
177184
.DeliveryLimit(12)
178185
.DeadLetterStrategy(QuorumQueueDeadLetterStrategy.AtLeastOnce)
179186
.QuorumInitialGroupSize(3)
187+
.QuorumTargetGroupSize(5)
180188
.Queue()
181189
.DeclareAsync();
182190

183191
Assert.Equal(_queueName, queueInfo.Name());
184192
Assert.Equal(12, queueInfo.Arguments()["x-max-delivery-limit"]);
185193
Assert.Equal("at-least-once", queueInfo.Arguments()["x-dead-letter-strategy"]);
186194
Assert.Equal(3, queueInfo.Arguments()["x-quorum-initial-group-size"]);
195+
Assert.Equal(5, queueInfo.Arguments()["x-quorum-target-group-size"]);
196+
Assert.Equal("client-local", queueInfo.Arguments()["x-queue-leader-locator"]);
187197
// NB: DisposeAsync will delete the queue with name _queueName
188198
}
189199

0 commit comments

Comments
 (0)