Skip to content

Commit fdc0737

Browse files
authored
Add durable and expires properties to responder configuration (#1037)
* add durable and expires properties to responder configuration * int? -> int * add xml docs * fix typos to trigger rebuild
1 parent 0592084 commit fdc0737

File tree

6 files changed

+77
-19
lines changed

6 files changed

+77
-19
lines changed

Source/EasyNetQ.Tests.Tasks/Tasks/SimpleRequester/LatencyRecorder.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ public void Report(object status)
5151
var ticksTenSecondsAgo = DateTime.Now.AddSeconds(-10).Ticks;
5252
var lateResponses = requests.Where(x => (!x.Value.HasResponded) && (x.Value.Ticks < ticksTenSecondsAgo));
5353

54-
var reponded = requests.Count(x => x.Value.HasResponded);
54+
var responded = requests.Count(x => x.Value.HasResponded);
5555

56-
Console.WriteLine("Total: {0}, reponded: {1} over 10 seconds late: [{2}]",
56+
Console.WriteLine("Total: {0}, responded: {1} over 10 seconds late: [{2}]",
5757
requests.Count,
58-
reponded,
58+
responded,
5959
string.Join(",", lateResponses.Select(x => x.Value.Id.ToString())));
6060
}
6161
}
@@ -79,4 +79,4 @@ public void Respond()
7979
public bool HasResponded { get; private set; }
8080
public long ResponseTimeTicks { get; private set; }
8181
}
82-
}
82+
}

Source/EasyNetQ.Tests.Tasks/Tasks/TestSimpleService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public Task Run(CancellationToken cancellationToken)
4646

4747
private static Task<TestAsyncResponseMessage> HandleAsyncRequest(TestAsyncRequestMessage request)
4848
{
49-
Console.Out.WriteLine("Got aysnc request '{0}'", request.Text);
49+
Console.Out.WriteLine("Got async request '{0}'", request.Text);
5050

5151
return RunDelayed(1000, () =>
5252
{

Source/EasyNetQ.Tests/MessageVersioningTests/VersionedExchangeDeclareStrategyTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void Should_declare_exchange_again_if_first_attempt_failed()
5050

5151

5252
// Unversioned message - exchange declared
53-
// Versioned message - superceded exchange declared, then superceding, then bind
53+
// Versioned message - superseded exchange declared, then superseding, then bind
5454
[Fact]
5555
public void When_declaring_exchanges_for_unversioned_message_one_exchange_created()
5656
{

Source/EasyNetQ/FluentConfiguration/ISubscriptionConfiguration.cs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,38 +9,37 @@ namespace EasyNetQ.FluentConfiguration
99
/// e.g.
1010
/// x => x.WithTopic("*.brighton")
1111
/// </summary>
12-
/// <typeparam name="T">The message type to be published</typeparam>
1312
public interface ISubscriptionConfiguration
1413
{
1514
/// <summary>
1615
/// Add a topic for the queue binding
1716
/// </summary>
1817
/// <param name="topic">The topic to add</param>
19-
/// <returns></returns>
18+
/// <returns>Reference to the same <see cref="ISubscriptionConfiguration"/> to allow methods chaining.</returns>
2019
ISubscriptionConfiguration WithTopic(string topic);
2120

2221
/// <summary>
2322
/// Configures the queue's durability
2423
/// </summary>
25-
/// <returns></returns>
24+
/// <returns>Reference to the same <see cref="ISubscriptionConfiguration"/> to allow methods chaining.</returns>
2625
ISubscriptionConfiguration WithAutoDelete(bool autoDelete = true);
2726

2827
/// <summary>
2928
/// Configures the queue's durability
3029
/// </summary>
31-
/// <returns></returns>
30+
/// <returns>Reference to the same <see cref="ISubscriptionConfiguration"/> to allow methods chaining.</returns>
3231
ISubscriptionConfiguration WithDurable(bool durable = true);
3332

3433
/// <summary>
3534
/// Configures the consumer's priority
3635
/// </summary>
37-
/// <returns></returns>
36+
/// <returns>Reference to the same <see cref="ISubscriptionConfiguration"/> to allow methods chaining.</returns>
3837
ISubscriptionConfiguration WithPriority(int priority);
3938

4039
/// <summary>
4140
/// Configures the consumer's prefetch count
4241
/// </summary>
43-
/// <returns></returns>
42+
/// <returns>Reference to the same <see cref="ISubscriptionConfiguration"/> to allow methods chaining.</returns>
4443
ISubscriptionConfiguration WithPrefetchCount(ushort prefetchCount);
4544

4645
/// <summary>
@@ -53,38 +52,40 @@ public interface ISubscriptionConfiguration
5352
/// Leases of durable queues restart when the server restarts.
5453
/// </summary>
5554
/// <param name="expires">The value of the x-expires argument or expires policy describes the expiration period in milliseconds and is subject to the same constraints as x-message-ttl and cannot be zero. Thus a value of 1000 means a queue which is unused for 1 second will be deleted.</param>
56-
/// <returns></returns>
55+
/// <returns>Reference to the same <see cref="ISubscriptionConfiguration"/> to allow methods chaining.</returns>
5756
ISubscriptionConfiguration WithExpires(int expires);
5857

5958
/// <summary>
6059
/// Configures the consumer's to be exclusive
6160
/// </summary>
62-
/// <returns></returns>
61+
/// <returns>Reference to the same <see cref="ISubscriptionConfiguration"/> to allow methods chaining.</returns>
6362
ISubscriptionConfiguration AsExclusive(bool isExclusive = true);
6463

6564
/// <summary>
6665
/// Configures the queue's maxPriority
6766
/// </summary>
68-
/// <returns></returns>
67+
/// <returns>Reference to the same <see cref="ISubscriptionConfiguration"/> to allow methods chaining.</returns>
6968
ISubscriptionConfiguration WithMaxPriority(byte priority);
7069

7170
/// <summary>
7271
/// Sets the queue name
7372
/// </summary>
7473
/// <param name="queueName"></param>
75-
/// <returns></returns>
74+
/// <returns>Reference to the same <see cref="ISubscriptionConfiguration"/> to allow methods chaining.</returns>
7675
ISubscriptionConfiguration WithQueueName(string queueName);
7776

7877
/// <summary>
7978
/// The maximum number of ready messages that may exist on the queue.
8079
/// Messages will be dropped or dead-lettered from the front of the queue to make room for new messages once the limit is reached.
8180
/// </summary>
81+
/// <returns>Reference to the same <see cref="ISubscriptionConfiguration"/> to allow methods chaining.</returns>
8282
ISubscriptionConfiguration WithMaxLength(int maxLength);
8383

8484
/// <summary>
8585
/// The maximum size of the queue in bytes.
8686
/// Messages will be dropped or dead-lettered from the front of the queue to make room for new messages once the limit is reached
8787
/// </summary>
88+
/// <returns>Reference to the same <see cref="ISubscriptionConfiguration"/> to allow methods chaining.</returns>
8889
ISubscriptionConfiguration WithMaxLengthBytes(int maxLengthBytes);
8990
}
9091

@@ -179,4 +180,4 @@ public ISubscriptionConfiguration WithMaxLengthBytes(int maxLengthBytes)
179180
return this;
180181
}
181182
}
182-
}
183+
}

Source/EasyNetQ/Producer/IResponderConfiguration.cs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,40 @@
11
namespace EasyNetQ.Producer
22
{
3+
/// <summary>
4+
/// Allows configuration to be fluently extended without adding overloads to IBus
5+
///
6+
/// e.g.
7+
/// x => x.WithPrefetchCount(50)
8+
/// </summary>
39
public interface IResponderConfiguration
410
{
511
IResponderConfiguration WithPrefetchCount(ushort prefetchCount);
12+
13+
/// <summary>
14+
/// Sets the queue name
15+
/// </summary>
16+
/// <param name="queueName"></param>
17+
/// <returns>Reference to the same <see cref="IResponderConfiguration"/> to allow methods chaining.</returns>
618
IResponderConfiguration WithQueueName(string queueName);
19+
20+
/// <summary>
21+
/// Configures the queue's durability
22+
/// </summary>
23+
/// <returns>Reference to the same <see cref="IResponderConfiguration"/> to allow methods chaining.</returns>
24+
IResponderConfiguration WithDurable(bool durable = true);
25+
26+
/// <summary>
27+
/// Expiry time can be set for a given queue by setting the x-expires argument to queue.declare, or by setting the expires policy.
28+
/// This controls for how long a queue can be unused before it is automatically deleted.
29+
/// Unused means the queue has no consumers, the queue has not been redeclared, and basic.get has not been invoked for a duration of at least the expiration period.
30+
/// This can be used, for example, for RPC-style reply queues, where many queues can be created which may never be drained.
31+
/// The server guarantees that the queue will be deleted, if unused for at least the expiration period.
32+
/// No guarantee is given as to how promptly the queue will be removed after the expiration period has elapsed.
33+
/// Leases of durable queues restart when the server restarts.
34+
/// </summary>
35+
/// <param name="expires">The value of the x-expires argument or expires policy describes the expiration period in milliseconds and is subject to the same constraints as x-message-ttl and cannot be zero. Thus a value of 1000 means a queue which is unused for 1 second will be deleted.</param>
36+
/// <returns>Reference to the same <see cref="IResponderConfiguration"/> to allow methods chaining.</returns>
37+
IResponderConfiguration WithExpires(int expires);
738
}
839

940
public class ResponderConfiguration : IResponderConfiguration
@@ -14,8 +45,19 @@ public ResponderConfiguration(ushort defaultPrefetchCount)
1445
}
1546

1647
public ushort PrefetchCount { get; private set; }
48+
1749
public string QueueName { get; private set; }
1850

51+
/// <summary>
52+
/// Durable queues remain active when a server restarts.
53+
/// </summary>
54+
public bool Durable { get; private set; } = true;
55+
56+
/// <summary>
57+
/// Determines how long (in milliseconds) a queue can remain unused before it is automatically deleted by the server.
58+
/// </summary>
59+
public int? Expires { get; private set; }
60+
1961
public IResponderConfiguration WithPrefetchCount(ushort prefetchCount)
2062
{
2163
PrefetchCount = prefetchCount;
@@ -27,5 +69,17 @@ public IResponderConfiguration WithQueueName(string queueName)
2769
QueueName = queueName;
2870
return this;
2971
}
72+
73+
public IResponderConfiguration WithDurable(bool durable = true)
74+
{
75+
Durable = durable;
76+
return this;
77+
}
78+
79+
public IResponderConfiguration WithExpires(int expires)
80+
{
81+
Expires = expires;
82+
return this;
83+
}
3084
}
31-
}
85+
}

Source/EasyNetQ/Producer/Rpc.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,10 @@ public virtual IDisposable Respond<TRequest, TResponse>(Func<TRequest, Task<TRes
255255

256256
var routingKey = configuration.QueueName ?? conventions.RpcRoutingKeyNamingConvention(requestType);
257257

258-
var queue = advancedBus.QueueDeclare(routingKey);
258+
var queue = advancedBus.QueueDeclare(
259+
routingKey,
260+
durable: configuration.Durable,
261+
expires: configuration.Expires);
259262

260263
var exchange = DeclareAndBindRpcExchange(
261264
conventions.RpcRequestExchangeNamingConvention(requestType),

0 commit comments

Comments
 (0)