Skip to content

Commit db52ebd

Browse files
Introduce ConnectionFactory#UseBackgroundThreadsForIO
Make it possible to configure the client to use background threads for I/O and heartbeat senders.
1 parent 5d25ea9 commit db52ebd

File tree

4 files changed

+94
-13
lines changed

4 files changed

+94
-13
lines changed

projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ public class ConnectionFactory
146146
/// <summary>Timeout setting for connection attempts (in milliseconds)</summary>
147147
public int RequestedConnectionTimeout = DefaultConnectionTimeout;
148148

149+
public bool UseBackgroundThreadsForIO = false;
150+
149151
/// <summary>Dictionary of client properties to be sent to the
150152
/// server</summary>
151153
public IDictionary<string, object> ClientProperties = ConnectionBase.DefaultClientProperties();

projects/client/RabbitMQ.Client/src/client/impl/ConnectionBase.cs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,9 @@ public ConnectionBase(ConnectionFactory factory,
117117
m_session0.Handler = new MainSession.SessionCloseDelegate(NotifyReceivedCloseOk);
118118
m_model0 = (ModelBase)Protocol.CreateModel(m_session0);
119119

120-
StartMainLoop();
120+
StartMainLoop(factory.UseBackgroundThreadsForIO);
121121
Open(insist);
122-
StartHeartbeatLoops();
122+
StartHeartbeatLoops(factory.UseBackgroundThreadsForIO);
123123
AppDomain.CurrentDomain.DomainUnload += HandleDomainUnload;
124124
}
125125

@@ -540,25 +540,27 @@ public void TerminateMainloop()
540540
m_running = false;
541541
}
542542

543-
public void StartMainLoop()
543+
public void StartMainLoop(bool useBackgroundThread)
544544
{
545545
Thread mainLoopThread = new Thread(new ThreadStart(MainLoop));
546546
mainLoopThread.Name = "AMQP Connection " + Endpoint.ToString();
547+
mainLoopThread.IsBackground = useBackgroundThread;
547548
mainLoopThread.Start();
548549
}
549550

550-
public void StartHeartbeatLoops()
551+
public void StartHeartbeatLoops(bool useBackgroundThread)
551552
{
552553
if (Heartbeat != 0) {
553-
StartHeartbeatLoop(new ThreadStart(HeartbeatReadLoop), "Inbound");
554-
StartHeartbeatLoop(new ThreadStart(HeartbeatWriteLoop), "Outbound");
554+
StartHeartbeatLoop(new ThreadStart(HeartbeatReadLoop), "Inbound", useBackgroundThread);
555+
StartHeartbeatLoop(new ThreadStart(HeartbeatWriteLoop), "Outbound", useBackgroundThread);
555556
}
556557
}
557558

558-
public void StartHeartbeatLoop(ThreadStart loop, string name)
559+
public void StartHeartbeatLoop(ThreadStart loop, string name, bool useBackgroundThread)
559560
{
560561
Thread heartbeatLoop = new Thread(loop);
561562
heartbeatLoop.Name = "AMQP Heartbeat " + name + " for Connection " + Endpoint.ToString();
563+
heartbeatLoop.IsBackground = useBackgroundThread;
562564
heartbeatLoop.Start();
563565
}
564566

projects/client/RabbitMQ.Client/src/client/impl/v0_9_1/ProtocolBase.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,17 +78,17 @@ public IModel CreateModel(ISession session) {
7878
}
7979

8080
public IConnection CreateConnection(ConnectionFactory factory,
81-
bool insist,
82-
IFrameHandler frameHandler)
81+
bool insist,
82+
IFrameHandler frameHandler)
8383
{
8484
return new Connection(factory, insist, frameHandler);
8585
}
8686

8787
public void CreateConnectionClose(ushort reasonCode,
88-
string reasonText,
89-
out Command request,
90-
out int replyClassId,
91-
out int replyMethodId)
88+
string reasonText,
89+
out Command request,
90+
out int replyClassId,
91+
out int replyMethodId)
9292
{
9393
request = new Command(new RabbitMQ.Client.Framing.Impl.v0_9_1.ConnectionClose(reasonCode,
9494
reasonText,
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (C) 2007-2014 GoPivotal, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// http://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v1.1:
23+
//
24+
//---------------------------------------------------------------------------
25+
// The contents of this file are subject to the Mozilla Public License
26+
// Version 1.1 (the "License"); you may not use this file except in
27+
// compliance with the License. You may obtain a copy of the License
28+
// at http://www.mozilla.org/MPL/
29+
//
30+
// Software distributed under the License is distributed on an "AS IS"
31+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32+
// the License for the specific language governing rights and
33+
// limitations under the License.
34+
//
35+
// The Original Code is RabbitMQ.
36+
//
37+
// The Initial Developer of the Original Code is GoPivotal, Inc.
38+
// Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
39+
//---------------------------------------------------------------------------
40+
41+
using NUnit.Framework;
42+
43+
using System;
44+
using System.IO;
45+
using System.Text;
46+
using System.Collections;
47+
48+
using RabbitMQ.Client;
49+
using RabbitMQ.Client.Impl;
50+
using RabbitMQ.Client.Exceptions;
51+
using RabbitMQ.Util;
52+
53+
namespace RabbitMQ.Client.Unit
54+
{
55+
[TestFixture]
56+
public class TestConnectionWithBackgroundThreads
57+
{
58+
59+
[Test]
60+
public void TestWithBackgroundThreadsEnabled()
61+
{
62+
ConnectionFactory connFactory = new ConnectionFactory();
63+
connFactory.UseBackgroundThreadsForIO = true;
64+
65+
IConnection conn = connFactory.CreateConnection();
66+
IModel ch = conn.CreateModel();
67+
68+
// sanity check
69+
string q = ch.QueueDeclare();
70+
ch.QueueDelete(q);
71+
72+
ch.Close();
73+
conn.Close();
74+
}
75+
}
76+
}
77+

0 commit comments

Comments
 (0)