Skip to content

Commit d61e84e

Browse files
authored
Add fields to QueueSpecification (#24)
* Add fields to QueueSpecification * Stream Definitions * add quorum,classic queue definition --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent d65966b commit d61e84e

File tree

7 files changed

+461
-15
lines changed

7 files changed

+461
-15
lines changed

RabbitMQ.AMQP.Client/IConsumerBuilder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,5 @@ public interface IStreamOptions
3737
IConsumerBuilder Builder();
3838
}
3939

40-
40+
4141
}

RabbitMQ.AMQP.Client/IEntities.cs

Lines changed: 91 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,17 @@ public interface IEntityDeclaration
2121
Task Declare();
2222
}
2323

24+
public enum OverFlowStrategy
25+
{
26+
DropHead,
27+
RejectPublish,
28+
29+
RejectPublishDlx
30+
// DROP_HEAD("drop-head"),
31+
// REJECT_PUBLISH("reject-publish"),
32+
// REJECT_PUBLISH_DLX("reject-publish-dlx");
33+
}
34+
2435
public interface IQueueSpecification : IEntityInfoDeclaration<IQueueInfo>
2536
{
2637
IQueueSpecification Name(string name);
@@ -38,13 +49,88 @@ public interface IQueueSpecification : IEntityInfoDeclaration<IQueueInfo>
3849
IQueueSpecification Type(QueueType type);
3950
public QueueType Type();
4051

41-
// IQuorumQueueSpecification Quorum();
52+
53+
IQueueSpecification DeadLetterExchange(string dlx);
54+
55+
IQueueSpecification DeadLetterRoutingKey(string dlrk);
56+
57+
IQueueSpecification OverflowStrategy(OverFlowStrategy overflow);
58+
59+
IQueueSpecification MaxLengthBytes(ByteCapacity maxLengthBytes);
60+
61+
IQueueSpecification SingleActiveConsumer(bool singleActiveConsumer);
62+
63+
64+
IQueueSpecification Expires(TimeSpan expiration);
65+
66+
IStreamSpecification Stream();
67+
68+
IQuorumQueueSpecification Quorum();
69+
70+
IClassicQueueSpecification Classic();
71+
72+
73+
IQueueSpecification MaxLength(long maxLength);
74+
75+
76+
IQueueSpecification MessageTtl(TimeSpan ttl);
77+
}
78+
79+
public interface IStreamSpecification
80+
{
81+
public IStreamSpecification MaxAge(TimeSpan maxAge);
82+
83+
public IStreamSpecification MaxSegmentSizeBytes(ByteCapacity maxSegmentSize);
84+
85+
public IStreamSpecification InitialClusterSize(int initialClusterSize);
86+
87+
public IQueueSpecification Queue();
88+
}
89+
90+
public enum QuorumQueueDeadLetterStrategy
91+
{
92+
// AT_MOST_ONCE("at-most-once"),
93+
// AT_LEAST_ONCE("at-least-once");
94+
AtMostOnce,
95+
AtLeastOnce
96+
}
97+
98+
public interface IQuorumQueueSpecification
99+
{
100+
IQuorumQueueSpecification DeadLetterStrategy(QuorumQueueDeadLetterStrategy strategy);
101+
102+
IQuorumQueueSpecification DeliveryLimit(int limit);
103+
104+
IQuorumQueueSpecification QuorumInitialGroupSize(int size);
105+
106+
IQueueSpecification Queue();
42107
}
43108

44-
// public interface IQuorumQueueSpecification
45-
// {
46-
// IQueueSpecification Queue();
47-
// }
109+
public enum ClassicQueueMode
110+
{
111+
Default,
112+
Lazy
113+
}
114+
115+
public enum ClassicQueueVersion
116+
{
117+
// V1(1),
118+
// V2(2);
119+
V1,
120+
V2
121+
}
122+
123+
public interface IClassicQueueSpecification
124+
{
125+
// 1 <= maxPriority <= 255
126+
IClassicQueueSpecification MaxPriority(int maxPriority);
127+
128+
IClassicQueueSpecification Mode(ClassicQueueMode mode);
129+
130+
IClassicQueueSpecification Version(ClassicQueueVersion version);
131+
132+
IQueueSpecification Queue();
133+
}
48134

49135
public interface IQueueDeletion
50136
{

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,14 +345,16 @@ await _semaphoreClose.WaitAsync()
345345

346346
OnNewStatus(State.Closing, null);
347347

348+
await _management.CloseAsync()
349+
.ConfigureAwait(false);
350+
348351
if (_nativeConnection is { IsClosed: false })
349352
{
350353
await _nativeConnection.CloseAsync()
351354
.ConfigureAwait(false);
352355
}
353356

354-
await _management.CloseAsync()
355-
.ConfigureAwait(false);
357+
356358
}
357359
finally
358360
{

RabbitMQ.AMQP.Client/Impl/AmqpQueueSpecification.cs

Lines changed: 179 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,13 @@ public uint ConsumerCount()
9595
/// <param name="management"></param>
9696
public class AmqpQueueSpecification(AmqpManagement management) : IQueueSpecification
9797
{
98+
internal readonly TimeSpan _tenYears = TimeSpan.FromDays(365 * 10);
99+
98100
private string? _name;
99101
private bool _exclusive = false;
100102
private bool _autoDelete = false;
101103
private const bool Durable = true;
102-
private readonly Map _arguments = new();
104+
internal readonly Map _arguments = new();
103105

104106
public IQueueSpecification Name(string name)
105107
{
@@ -175,6 +177,82 @@ public QueueType Type()
175177
return (QueueType)Enum.Parse(typeof(QueueType), type.ToUpperInvariant());
176178
}
177179

180+
public IQueueSpecification DeadLetterExchange(string dlx)
181+
{
182+
_arguments["x-dead-letter-exchange"] = dlx;
183+
return this;
184+
}
185+
186+
public IQueueSpecification DeadLetterRoutingKey(string dlrk)
187+
{
188+
_arguments["x-dead-letter-routing-key"] = dlrk;
189+
return this;
190+
}
191+
192+
public IQueueSpecification OverflowStrategy(OverFlowStrategy overflow)
193+
{
194+
_arguments["x-overflow"] = overflow switch
195+
{
196+
OverFlowStrategy.DropHead => "drop-head",
197+
OverFlowStrategy.RejectPublish => "reject-publish",
198+
OverFlowStrategy.RejectPublishDlx => "reject-publish-dlx",
199+
_ => throw new ArgumentOutOfRangeException(nameof(overflow), overflow, null)
200+
};
201+
return this;
202+
}
203+
204+
public IQueueSpecification MaxLengthBytes(ByteCapacity maxLengthBytes)
205+
{
206+
Utils.ValidatePositive("Max length", maxLengthBytes.ToBytes());
207+
_arguments["x-max-length-bytes"] = maxLengthBytes.ToBytes();
208+
return this;
209+
}
210+
211+
public IQueueSpecification SingleActiveConsumer(bool singleActiveConsumer)
212+
{
213+
_arguments["x-single-active-consumer"] = singleActiveConsumer;
214+
return this;
215+
}
216+
217+
public IQueueSpecification Expires(TimeSpan expiration)
218+
{
219+
Utils.ValidatePositive("Expiration", (long)expiration.TotalMilliseconds, (long)_tenYears.TotalMilliseconds);
220+
_arguments["x-expires"] = (long)expiration.TotalMilliseconds;
221+
return this;
222+
}
223+
224+
public IStreamSpecification Stream()
225+
{
226+
Type(QueueType.STREAM);
227+
return new AmqpStreamSpecification(this);
228+
}
229+
230+
public IQuorumQueueSpecification Quorum()
231+
{
232+
Type(QueueType.QUORUM);
233+
return new AmqpQuorumSpecification(this);
234+
}
235+
236+
public IClassicQueueSpecification Classic()
237+
{
238+
Type(QueueType.CLASSIC);
239+
return new AmqpClassicSpecification(this);
240+
}
241+
242+
public IQueueSpecification MaxLength(long maxLength)
243+
{
244+
Utils.ValidatePositive("Max length", maxLength);
245+
_arguments["x-max-length"] = maxLength;
246+
return this;
247+
}
248+
249+
public IQueueSpecification MessageTtl(TimeSpan ttl)
250+
{
251+
Utils.ValidateNonNegative("TTL", (long)ttl.TotalMilliseconds, (long)_tenYears.TotalMilliseconds);
252+
_arguments["x-message-ttl"] = (long)ttl.TotalMilliseconds;
253+
return this;
254+
}
255+
178256
public async Task<IQueueInfo> Declare()
179257
{
180258
if (Type() is QueueType.QUORUM or QueueType.STREAM)
@@ -215,6 +293,106 @@ public async Task<IQueueInfo> Declare()
215293
}
216294
}
217295

296+
public class AmqpStreamSpecification(AmqpQueueSpecification parent) : IStreamSpecification
297+
{
298+
public IStreamSpecification MaxAge(TimeSpan maxAge)
299+
{
300+
Utils.ValidatePositive("x-max-age", (long)maxAge.TotalMilliseconds,
301+
(long)parent._tenYears.TotalMilliseconds);
302+
parent._arguments["x-max-age"] = $"{maxAge.Seconds}s";
303+
return this;
304+
}
305+
306+
public IStreamSpecification MaxSegmentSizeBytes(ByteCapacity maxSegmentSize)
307+
{
308+
Utils.ValidatePositive("x-stream-max-segment-size-bytes", maxSegmentSize.ToBytes());
309+
parent._arguments["x-stream-max-segment-size-bytes"] = maxSegmentSize.ToBytes();
310+
return this;
311+
}
312+
313+
public IStreamSpecification InitialClusterSize(int initialClusterSize)
314+
{
315+
Utils.ValidatePositive("x-initial-cluster-size", initialClusterSize);
316+
parent._arguments["x-initial-cluster-size"] = initialClusterSize;
317+
return this;
318+
}
319+
320+
public IQueueSpecification Queue()
321+
{
322+
return parent;
323+
}
324+
}
325+
326+
public class AmqpQuorumSpecification(AmqpQueueSpecification parent) : IQuorumQueueSpecification
327+
{
328+
public IQuorumQueueSpecification DeadLetterStrategy(QuorumQueueDeadLetterStrategy strategy)
329+
{
330+
parent._arguments["x-dead-letter-strategy"] = strategy switch
331+
{
332+
QuorumQueueDeadLetterStrategy.AtMostOnce => "at-most-once",
333+
QuorumQueueDeadLetterStrategy.AtLeastOnce => "at-least-once",
334+
_ => throw new ArgumentOutOfRangeException(nameof(strategy), strategy, null)
335+
};
336+
return this;
337+
}
338+
339+
public IQuorumQueueSpecification DeliveryLimit(int limit)
340+
{
341+
Utils.ValidatePositive("x-max-delivery-limit", limit);
342+
parent._arguments["x-max-delivery-limit"] = limit;
343+
return this;
344+
}
345+
346+
public IQuorumQueueSpecification QuorumInitialGroupSize(int size)
347+
{
348+
Utils.ValidatePositive("x-quorum-initial-group-size", size);
349+
parent._arguments["x-quorum-initial-group-size"] = size;
350+
return this;
351+
}
352+
353+
public IQueueSpecification Queue()
354+
{
355+
return parent;
356+
}
357+
}
358+
359+
public class AmqpClassicSpecification(AmqpQueueSpecification parent) : IClassicQueueSpecification
360+
{
361+
public IClassicQueueSpecification MaxPriority(int maxPriority)
362+
{
363+
Utils.ValidatePositive("x-max-priority", maxPriority, 255);
364+
parent._arguments["x-max-priority"] = maxPriority;
365+
return this;
366+
}
367+
368+
public IClassicQueueSpecification Mode(ClassicQueueMode mode)
369+
{
370+
parent._arguments["x-queue-mode"] = mode switch
371+
{
372+
ClassicQueueMode.Default => "default",
373+
ClassicQueueMode.Lazy => "lazy",
374+
_ => throw new ArgumentOutOfRangeException(nameof(mode), mode, null)
375+
};
376+
return this;
377+
}
378+
379+
public IClassicQueueSpecification Version(ClassicQueueVersion version)
380+
{
381+
parent._arguments["x-queue-version"] = version switch
382+
{
383+
ClassicQueueVersion.V1 => 1,
384+
ClassicQueueVersion.V2 => 2,
385+
_ => throw new ArgumentOutOfRangeException(nameof(version), version, null)
386+
};
387+
return this;
388+
}
389+
390+
public IQueueSpecification Queue()
391+
{
392+
return parent;
393+
}
394+
}
395+
218396
public class DefaultQueueDeletionInfo : IEntityInfo
219397
{
220398
}

0 commit comments

Comments
 (0)