diff --git a/RabbitMQ.AMQP.Client/IEntities.cs b/RabbitMQ.AMQP.Client/IEntities.cs index ec2e783..bdc0d2e 100644 --- a/RabbitMQ.AMQP.Client/IEntities.cs +++ b/RabbitMQ.AMQP.Client/IEntities.cs @@ -35,13 +35,18 @@ public enum OverFlowStrategy { DropHead, RejectPublish, - RejectPublishDlx // DROP_HEAD("drop-head"), // REJECT_PUBLISH("reject-publish"), // REJECT_PUBLISH_DLX("reject-publish-dlx"); } + public enum LeaderLocatorStrategy + { + ClientLocal, + Balanced + } + public interface IQueueSpecification : IEntityInfoSpecification { public string QueueName { get; } @@ -67,6 +72,8 @@ public interface IQueueSpecification : IEntityInfoSpecification IQueueSpecification MaxLengthBytes(ByteCapacity maxLengthBytes); + IQueueSpecification LeaderLocator(LeaderLocatorStrategy strategy); + // TODO: Add more tests for SingleActiveConsumer IQueueSpecification SingleActiveConsumer(bool singleActiveConsumer); @@ -93,6 +100,8 @@ public interface IStreamSpecification public IStreamSpecification InitialClusterSize(int initialClusterSize); + public IStreamSpecification FileSizePerChunk(ByteCapacity fileSizePerChunk); + public IQueueSpecification Queue(); } @@ -112,6 +121,8 @@ public interface IQuorumQueueSpecification IQuorumQueueSpecification QuorumInitialGroupSize(int size); + IQuorumQueueSpecification QuorumTargetGroupSize(int size); + IQueueSpecification Queue(); } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpQueueSpecification.cs b/RabbitMQ.AMQP.Client/Impl/AmqpQueueSpecification.cs index 1987416..71cb5f7 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpQueueSpecification.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpQueueSpecification.cs @@ -231,6 +231,17 @@ public IQueueSpecification MaxLengthBytes(ByteCapacity maxLengthBytes) return this; } + public IQueueSpecification LeaderLocator(LeaderLocatorStrategy leaderLocatorStrategy) + { + _queueArguments["x-queue-leader-locator"] = leaderLocatorStrategy switch + { + LeaderLocatorStrategy.ClientLocal => "client-local", + LeaderLocatorStrategy.Balanced => "balanced", + _ => throw new ArgumentOutOfRangeException(nameof(leaderLocatorStrategy), leaderLocatorStrategy, null) + }; + return this; + } + public IQueueSpecification SingleActiveConsumer(bool singleActiveConsumer) { _queueArguments["x-single-active-consumer"] = singleActiveConsumer; @@ -397,6 +408,13 @@ public IStreamSpecification InitialClusterSize(int initialClusterSize) return this; } + public IStreamSpecification FileSizePerChunk(ByteCapacity fileSizePerChunk) + { + Utils.ValidatePositive("x-stream-file-size-per-chunk", fileSizePerChunk); + _parent._queueArguments["x-stream-file-size-per-chunk"] = (long)fileSizePerChunk; + return this; + } + public IQueueSpecification Queue() { return _parent; @@ -437,6 +455,13 @@ public IQuorumQueueSpecification QuorumInitialGroupSize(int size) return this; } + public IQuorumQueueSpecification QuorumTargetGroupSize(int size) + { + Utils.ValidatePositive("x-quorum-target-group-size", size); + _parent._queueArguments["x-quorum-target-group-size"] = size; + return this; + } + public IQueueSpecification Queue() { return _parent; diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index fa7564a..ba3938c 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -455,6 +455,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.Exclusive(bool isExclusive) -> RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.Expires(System.TimeSpan expiration) -> RabbitMQ.AMQP.Client.IQueueSpecification! RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.IsAutoDelete.get -> bool RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.IsExclusive.get -> bool +RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.LeaderLocator(RabbitMQ.AMQP.Client.LeaderLocatorStrategy leaderLocatorStrategy) -> RabbitMQ.AMQP.Client.IQueueSpecification! RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.MaxLength(long maxLength) -> RabbitMQ.AMQP.Client.IQueueSpecification! RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.MaxLengthBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxLengthBytes) -> RabbitMQ.AMQP.Client.IQueueSpecification! RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification.MessageTtl(System.TimeSpan ttl) -> RabbitMQ.AMQP.Client.IQueueSpecification! @@ -474,6 +475,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.DeadLetterStrategy(RabbitMQ.AM RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.DeliveryLimit(int limit) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification! RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification! RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.QuorumInitialGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification! +RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.QuorumTargetGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification! RabbitMQ.AMQP.Client.Impl.AmqpRpcClient RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.AmqpRpcClient(RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration! configuration) -> void RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! @@ -499,6 +501,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.RequestQueue(RabbitMQ.AMQP.Client RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.RequestQueue(string! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.AmqpStreamSpecification(RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification! parent) -> void +RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.FileSizePerChunk(RabbitMQ.AMQP.Client.ByteCapacity! fileSizePerChunk) -> RabbitMQ.AMQP.Client.IStreamSpecification! RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.InitialClusterSize(int initialClusterSize) -> RabbitMQ.AMQP.Client.IStreamSpecification! RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.MaxAge(System.TimeSpan maxAge) -> RabbitMQ.AMQP.Client.IStreamSpecification! RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.MaxSegmentSizeBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxSegmentSize) -> RabbitMQ.AMQP.Client.IStreamSpecification! @@ -631,6 +634,7 @@ RabbitMQ.AMQP.Client.IQueueSpecification.Exclusive(bool isExclusive) -> RabbitMQ RabbitMQ.AMQP.Client.IQueueSpecification.Expires(System.TimeSpan expiration) -> RabbitMQ.AMQP.Client.IQueueSpecification! RabbitMQ.AMQP.Client.IQueueSpecification.IsAutoDelete.get -> bool RabbitMQ.AMQP.Client.IQueueSpecification.IsExclusive.get -> bool +RabbitMQ.AMQP.Client.IQueueSpecification.LeaderLocator(RabbitMQ.AMQP.Client.LeaderLocatorStrategy strategy) -> RabbitMQ.AMQP.Client.IQueueSpecification! RabbitMQ.AMQP.Client.IQueueSpecification.MaxLength(long maxLength) -> RabbitMQ.AMQP.Client.IQueueSpecification! RabbitMQ.AMQP.Client.IQueueSpecification.MaxLengthBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxLengthBytes) -> RabbitMQ.AMQP.Client.IQueueSpecification! RabbitMQ.AMQP.Client.IQueueSpecification.MessageTtl(System.TimeSpan ttl) -> RabbitMQ.AMQP.Client.IQueueSpecification! @@ -649,6 +653,7 @@ RabbitMQ.AMQP.Client.IQuorumQueueSpecification.DeadLetterStrategy(RabbitMQ.AMQP. RabbitMQ.AMQP.Client.IQuorumQueueSpecification.DeliveryLimit(int limit) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification! RabbitMQ.AMQP.Client.IQuorumQueueSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification! RabbitMQ.AMQP.Client.IQuorumQueueSpecification.QuorumInitialGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification! +RabbitMQ.AMQP.Client.IQuorumQueueSpecification.QuorumTargetGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification! RabbitMQ.AMQP.Client.IRecoveryConfiguration RabbitMQ.AMQP.Client.IRecoveryConfiguration.Activated(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration! RabbitMQ.AMQP.Client.IRecoveryConfiguration.BackOffDelayPolicy(RabbitMQ.AMQP.Client.IBackOffDelayPolicy! backOffDelayPolicy) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration! @@ -680,12 +685,16 @@ RabbitMQ.AMQP.Client.IRpcServerBuilder.ReplyPostProcessor(System.Func RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.IRpcServerBuilder.RequestQueue(string! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.IStreamSpecification +RabbitMQ.AMQP.Client.IStreamSpecification.FileSizePerChunk(RabbitMQ.AMQP.Client.ByteCapacity! fileSizePerChunk) -> RabbitMQ.AMQP.Client.IStreamSpecification! RabbitMQ.AMQP.Client.IStreamSpecification.InitialClusterSize(int initialClusterSize) -> RabbitMQ.AMQP.Client.IStreamSpecification! RabbitMQ.AMQP.Client.IStreamSpecification.MaxAge(System.TimeSpan maxAge) -> RabbitMQ.AMQP.Client.IStreamSpecification! RabbitMQ.AMQP.Client.IStreamSpecification.MaxSegmentSizeBytes(RabbitMQ.AMQP.Client.ByteCapacity! maxSegmentSize) -> RabbitMQ.AMQP.Client.IStreamSpecification! RabbitMQ.AMQP.Client.IStreamSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification! RabbitMQ.AMQP.Client.IUriSelector RabbitMQ.AMQP.Client.IUriSelector.Select(System.Collections.Generic.ICollection! uris) -> System.Uri! +RabbitMQ.AMQP.Client.LeaderLocatorStrategy +RabbitMQ.AMQP.Client.LeaderLocatorStrategy.Balanced = 1 -> RabbitMQ.AMQP.Client.LeaderLocatorStrategy +RabbitMQ.AMQP.Client.LeaderLocatorStrategy.ClientLocal = 0 -> RabbitMQ.AMQP.Client.LeaderLocatorStrategy RabbitMQ.AMQP.Client.LifeCycleCallBack RabbitMQ.AMQP.Client.MessageHandler RabbitMQ.AMQP.Client.MetricsReporter diff --git a/Tests/Management/ManagementTests.cs b/Tests/Management/ManagementTests.cs index 86d1865..3d0a3a4 100644 --- a/Tests/Management/ManagementTests.cs +++ b/Tests/Management/ManagementTests.cs @@ -151,10 +151,13 @@ public async Task DeclareStreamQueueWithArguments() IQueueInfo queueInfo = await _management.Queue() .Name(_queueName) + .MaxLengthBytes(ByteCapacity.Kb(1024)) + .LeaderLocator(LeaderLocatorStrategy.Balanced) .Stream() .MaxAge(TimeSpan.FromSeconds(10)) .MaxSegmentSizeBytes(ByteCapacity.Kb(1024)) .InitialClusterSize(1) + .FileSizePerChunk(ByteCapacity.Kb(1024)) .Queue() .DeclareAsync(); @@ -162,6 +165,9 @@ public async Task DeclareStreamQueueWithArguments() Assert.Equal("10s", queueInfo.Arguments()["x-max-age"]); Assert.Equal(1024000L, queueInfo.Arguments()["x-stream-max-segment-size-bytes"]); Assert.Equal(1, queueInfo.Arguments()["x-initial-cluster-size"]); + Assert.Equal("balanced", queueInfo.Arguments()["x-queue-leader-locator"]); + Assert.Equal(1024000L, queueInfo.Arguments()["x-stream-file-size-per-chunk"]); + Assert.Equal(1024000L, queueInfo.Arguments()["x-max-length-bytes"]); // NB: DisposeAsync will delete the queue with name _queueName } @@ -173,10 +179,12 @@ public async Task DeclareQuorumQueueWithArguments() IQueueInfo queueInfo = await _management.Queue() .Name(_queueName) + .LeaderLocator(LeaderLocatorStrategy.ClientLocal) .Quorum() .DeliveryLimit(12) .DeadLetterStrategy(QuorumQueueDeadLetterStrategy.AtLeastOnce) .QuorumInitialGroupSize(3) + .QuorumTargetGroupSize(5) .Queue() .DeclareAsync(); @@ -184,6 +192,8 @@ public async Task DeclareQuorumQueueWithArguments() Assert.Equal(12, queueInfo.Arguments()["x-max-delivery-limit"]); Assert.Equal("at-least-once", queueInfo.Arguments()["x-dead-letter-strategy"]); Assert.Equal(3, queueInfo.Arguments()["x-quorum-initial-group-size"]); + Assert.Equal(5, queueInfo.Arguments()["x-quorum-target-group-size"]); + Assert.Equal("client-local", queueInfo.Arguments()["x-queue-leader-locator"]); // NB: DisposeAsync will delete the queue with name _queueName }