Skip to content

Commit 0713e84

Browse files
authored
Fix #871 (#872)
* Increase dispatcher queue size * Allow to customize the size of a dispatcher's queue
1 parent 2f2da92 commit 0713e84

File tree

3 files changed

+13
-8
lines changed

3 files changed

+13
-8
lines changed

Source/EasyNetQ/ConnectionConfiguration.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Concurrent;
23
using System.Collections.Generic;
34
using System.IO;
45
using System.Linq;
@@ -37,6 +38,7 @@ public class ConnectionConfiguration
3738
public bool UseBackgroundThreads { get; set; }
3839
public IList<AuthMechanismFactory> AuthMechanisms { get; set; }
3940
public TimeSpan ConnectIntervalAttempt { get; set; }
41+
public int DispatcherQueueSize { get; set; }
4042

4143
public ConnectionConfiguration()
4244
{
@@ -51,6 +53,7 @@ public ConnectionConfiguration()
5153
PersistentMessages = true;
5254
UseBackgroundThreads = false;
5355
ConnectIntervalAttempt = TimeSpan.FromSeconds(5);
56+
DispatcherQueueSize = 1024;
5457

5558
// prefetchCount determines how many messages will be allowed in the local in-memory queue
5659
// setting to zero makes this infinite, but risks an out-of-memory exception.

Source/EasyNetQ/ConnectionString/ConnectionStringGrammar.cs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@ namespace EasyNetQ.ConnectionString
1212
public static class ConnectionStringGrammar
1313
{
1414
public static Parser<string> Text = Parse.CharExcept(';').Many().Text();
15-
public static Parser<ushort> Number = Parse.Number.Select(ushort.Parse);
15+
public static Parser<ushort> UShortNumber = Parse.Number.Select(ushort.Parse);
16+
public static Parser<int> IntNumber = Parse.Number.Select(int.Parse);
1617

1718
public static Parser<bool> Bool = (Parse.CaseInsensitiveString("true").Or(Parse.CaseInsensitiveString("false"))).Text().Select(x => x.ToLower() == "true");
1819

1920
public static Parser<HostConfiguration> Host =
2021
from host in Parse.Char(c => c != ':' && c != ';' && c != ',', "host").Many().Text()
21-
from port in Parse.Char(':').Then(_ => Number).Or(Parse.Return((ushort)0))
22+
from port in Parse.Char(':').Then(_ => UShortNumber).Or(Parse.Return((ushort)0))
2223
select new HostConfiguration { Host = host, Port = port };
2324

2425
public static Parser<IEnumerable<HostConfiguration>> Hosts = Host.ListDelimitedBy(',');
@@ -31,18 +32,19 @@ from port in Parse.Char(':').Then(_ => Number).Or(Parse.Return((ushort)0))
3132
// add new connection string parts here
3233
BuildKeyValueParser("amqp", AMQP, c => c.AMQPConnectionString),
3334
BuildKeyValueParser("host", Hosts, c => c.Hosts),
34-
BuildKeyValueParser("port", Number, c => c.Port),
35+
BuildKeyValueParser("port", UShortNumber, c => c.Port),
3536
BuildKeyValueParser("virtualHost", Text, c => c.VirtualHost),
36-
BuildKeyValueParser("requestedHeartbeat", Number, c => c.RequestedHeartbeat),
37+
BuildKeyValueParser("requestedHeartbeat", UShortNumber, c => c.RequestedHeartbeat),
3738
BuildKeyValueParser("username", Text, c => c.UserName),
3839
BuildKeyValueParser("password", Text, c => c.Password),
39-
BuildKeyValueParser("prefetchcount", Number, c => c.PrefetchCount),
40-
BuildKeyValueParser("timeout", Number, c => c.Timeout),
40+
BuildKeyValueParser("prefetchCount", UShortNumber, c => c.PrefetchCount),
41+
BuildKeyValueParser("timeout", UShortNumber, c => c.Timeout),
4142
BuildKeyValueParser("publisherConfirms", Bool, c => c.PublisherConfirms),
4243
BuildKeyValueParser("persistentMessages", Bool, c => c.PersistentMessages),
4344
BuildKeyValueParser("product", Text, c => c.Product),
4445
BuildKeyValueParser("platform", Text, c => c.Platform),
4546
BuildKeyValueParser("useBackgroundThreads", Bool, c => c.UseBackgroundThreads),
47+
BuildKeyValueParser("dispatcherQueueSize", IntNumber, c => c.DispatcherQueueSize),
4648
BuildKeyValueParser("name", Text, c => c.Name)
4749
}.Aggregate((a, b) => a.Or(b));
4850

Source/EasyNetQ/Producer/ClientCommandDispatcherSingleton.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@ namespace EasyNetQ.Producer
99
{
1010
public class ClientCommandDispatcherSingleton : IClientCommandDispatcher
1111
{
12-
private const int queueSize = 1;
1312
private readonly CancellationTokenSource cancellation = new CancellationTokenSource();
1413
private readonly IPersistentChannel persistentChannel;
15-
private readonly BlockingCollection<Action> queue = new BlockingCollection<Action>(queueSize);
14+
private readonly BlockingCollection<Action> queue;
1615

1716
public ClientCommandDispatcherSingleton(
1817
ConnectionConfiguration configuration,
@@ -23,6 +22,7 @@ public ClientCommandDispatcherSingleton(
2322
Preconditions.CheckNotNull(connection, "connection");
2423
Preconditions.CheckNotNull(persistentChannelFactory, "persistentChannelFactory");
2524

25+
queue = new BlockingCollection<Action>(configuration.DispatcherQueueSize);
2626
persistentChannel = persistentChannelFactory.CreatePersistentChannel(connection);
2727

2828
StartDispatcherThread(configuration);

0 commit comments

Comments
 (0)