Skip to content

Commit 536e9ff

Browse files
vyourtchenko-connamarahuwmjenkinsgbirchmeier
authored
Allow ThreadedSocketAcceptor to be restarted after calling Stop() - fixes issue #196PR 586 Continued (connamara#959)
History: * Original issue connamara#196 * First PR attempt connamara#351 * that PR was updated to PR connamara#586 * and then *that* PR was updated to PR connamara#959 Description from all the squashed commits: * 196 - modifications to allow the acceptor to be restarted after a call to stop has been made * 196 - changing to using ManulResetEvent rather than thread.sleep and ensuring it's disposed of properly * 196 - need to call session.logon when starting the acceptor as sessions are disabled during the call to logout during acceptor.stop() * 196 - changing ThreadedSocketReactor to use BeginAcceptTcpClient/EndAcceptTcpClient * 196 - fixing issues introduced by dispose changes for 340 * 196 - modifications to allow the acceptor to be restarted after a call to stop has been made * 196 - need to call session.logon when starting the acceptor as sessions are disabled during the call to logout during acceptor.stop() * 196 - changing ThreadedSocketReactor to use BeginAcceptTcpClient/EndAcceptTcpClient * Fixing appveyor issue with default parameters in a test * Issue 196 - fixing test * specify debug file location - Attempt to eliminate flakiness (connamara#586) * Fix rebase of RestartingTheThreadedSocketAcceptorTests * Remove duplicate function that popped up after rebase completed * Updating tests to match latest remote updates * Continued fixing test with latest remote changes * Fixing more tests * fix warnings and other minor changes * fix dupe/unused usings * fix nullable warnings * remove unused vars * refactor out the silly LogEvent function * delete #region around the sole ctor * change some properties to be expression-bodied * use ObjectDisposedException.ThrowIf * remove unneeded SocketOption assignments * hard-wrap some long lines * make AreSocketsRunning internal; only UT uses it * release notes for connamara#195 / PR connamara#596 / PR connamara#959 * delete non-error write to the NonSessionLog --------- Co-authored-by: Huw Jenkins <[email protected]> Co-authored-by: Vlad Yourtchenko <[email protected]> Co-authored-by: Grant Birchmeier <[email protected]>
1 parent d875305 commit 536e9ff

File tree

8 files changed

+729
-137
lines changed

8 files changed

+729
-137
lines changed

QuickFIXn/Exceptions.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,17 @@ public ConnectionResetByPeerException(System.Exception innerException)
120120
{ }
121121
}
122122

123+
public class ConnectionShutdownRequestedException : QuickFIXException
124+
{
125+
public ConnectionShutdownRequestedException()
126+
: base("Connection shutdown requested")
127+
{ }
128+
129+
public ConnectionShutdownRequestedException(System.Exception innerException)
130+
: base("Connection shutdown requested", innerException)
131+
{ }
132+
}
133+
123134
public class MessageParseError : QuickFIXException
124135
{
125136
public MessageParseError()

QuickFIXn/ThreadedSocketAcceptor.cs

Lines changed: 89 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System;
55
using QuickFix.Logger;
66
using QuickFix.Store;
7+
using System.Threading;
78

89
namespace QuickFix
910
{
@@ -13,17 +14,16 @@ namespace QuickFix
1314
/// </summary>
1415
public class ThreadedSocketAcceptor : IAcceptor
1516
{
17+
private const int TenSecondsInTicks = 10000;
18+
1619
private readonly Dictionary<SessionID, Session> _sessions = new();
1720
private readonly SessionSettings _settings;
1821
private readonly Dictionary<IPEndPoint, AcceptorSocketDescriptor> _socketDescriptorForAddress = new();
1922
private readonly SessionFactory _sessionFactory;
20-
private bool _isStarted = false;
2123
private bool _disposed = false;
2224
private readonly object _sync = new();
2325
private readonly NonSessionLog _nonSessionLog;
2426

25-
#region Constructors
26-
2727
/// <summary>
2828
/// Create a ThreadedSocketAcceptor
2929
/// </summary>
@@ -59,13 +59,11 @@ public ThreadedSocketAcceptor(
5959
}
6060
}
6161

62-
#endregion
63-
6462
#region Private Methods
6563

6664
private AcceptorSocketDescriptor GetAcceptorSocketDescriptor(SettingsDictionary dict)
6765
{
68-
int port = System.Convert.ToInt32(dict.GetLong(SessionSettings.SOCKET_ACCEPT_PORT));
66+
int port = Convert.ToInt32(dict.GetLong(SessionSettings.SOCKET_ACCEPT_PORT));
6967
SocketSettings socketSettings = new SocketSettings();
7068

7169
IPEndPoint socketEndPoint;
@@ -133,7 +131,7 @@ private bool CreateSession(SessionID sessionId, SettingsDictionary dict)
133131

134132
// start SocketReactor if it was created via AddSession call
135133
// and if acceptor is already started
136-
if (_isStarted && !_disposed)
134+
if (IsStarted && !_disposed)
137135
{
138136
descriptor.SocketReactor.Start();
139137
}
@@ -166,6 +164,21 @@ private void StopAcceptingConnections()
166164
}
167165
}
168166

167+
private void LogonAllSessions()
168+
{
169+
foreach (Session session in _sessions.Values)
170+
{
171+
try
172+
{
173+
session.Logon();
174+
}
175+
catch (Exception e)
176+
{
177+
session.Log.OnEvent($"Error during logon of Session {session.SessionID}: {e.Message}");
178+
}
179+
}
180+
}
181+
169182
private void LogoutAllSessions(bool force)
170183
{
171184
foreach (Session session in _sessions.Values)
@@ -182,55 +195,40 @@ private void LogoutAllSessions(bool force)
182195

183196
if (force && IsLoggedOn)
184197
{
185-
foreach (Session session in _sessions.Values)
186-
{
187-
try
188-
{
189-
if (session.IsLoggedOn)
190-
session.Disconnect("Forcibly disconnecting session");
191-
}
192-
catch (Exception e)
193-
{
194-
session.Log.OnEvent($"Error during disconnect of Session {session.SessionID}: {e.Message}");
195-
}
196-
}
198+
DisconnectSessions("Forcibly disconnecting sessions");
197199
}
198200

199201
if (!force)
200202
WaitForLogout();
201203
}
202204

203-
/// <summary>
204-
/// TODO implement WaitForLogout
205-
/// </summary>
206205
private void WaitForLogout()
207206
{
208-
/*
209-
int start = System.Environment.TickCount;
210-
HashSet<Session> sessions = new HashSet<Session>(sessions_.Values);
211-
while(sessions.Count > 0)
207+
int start = Environment.TickCount;
208+
using( var resetEvent = new ManualResetEvent( false ) )
212209
{
213-
Thread.Sleep(100);
214-
215-
int elapsed = System.Environment.TickCount - start;
216-
Iterator<Session> sessionItr = loggedOnSessions.iterator();
217-
while (sessionItr.hasNext())
210+
while (IsLoggedOn && (Environment.TickCount - start) < TenSecondsInTicks)
218211
{
219-
Session session = sessionItr.next();
220-
if (elapsed >= session.getLogoutTimeout() * 1000L)
221-
{
222-
session.disconnect("Logout timeout, force disconnect", false);
223-
sessionItr.remove();
224-
}
212+
resetEvent.WaitOne( 100 );
213+
}
214+
}
215+
DisconnectSessions("Logout timeout, force disconnect");
216+
}
217+
218+
private void DisconnectSessions(string disconnectMessage)
219+
{
220+
foreach (Session session in _sessions.Values)
221+
{
222+
try
223+
{
224+
if (session.IsLoggedOn)
225+
session.Disconnect(disconnectMessage);
225226
}
226-
// Be sure we don't look forever
227-
if (elapsed > 60000)
227+
catch (Exception e)
228228
{
229-
log.warn("Stopping session logout wait after 1 minute");
230-
break;
229+
session.Log.OnEvent($"Error during disconnect of Session {session.SessionID}: {e.Message}");
231230
}
232231
}
233-
*/
234232
}
235233

236234
private void DisposeSessions()
@@ -252,10 +250,11 @@ public void Start()
252250

253251
lock (_sync)
254252
{
255-
if (!_isStarted)
253+
if (!IsStarted)
256254
{
255+
LogonAllSessions();
257256
StartAcceptingConnections();
258-
_isStarted = true;
257+
IsStarted = true;
259258
}
260259
}
261260
}
@@ -267,17 +266,17 @@ public void Stop()
267266

268267
public void Stop(bool force)
269268
{
270-
if (_disposed)
271-
throw new ObjectDisposedException(GetType().Name);
272-
273-
StopAcceptingConnections();
274-
LogoutAllSessions(force);
275-
DisposeSessions();
276-
_sessions.Clear();
277-
_isStarted = false;
269+
ObjectDisposedException.ThrowIf(_disposed, this);
278270

279-
// FIXME StopSessionTimer();
280-
// FIXME Session.UnregisterSessions(GetSessions());
271+
lock( _sync )
272+
{
273+
if( IsStarted )
274+
{
275+
IsStarted = false;
276+
LogoutAllSessions(force);
277+
StopAcceptingConnections();
278+
}
279+
}
281280
}
282281

283282
/// <summary>
@@ -292,6 +291,29 @@ public bool IsLoggedOn
292291
}
293292
}
294293

294+
/// <summary>
295+
/// Gets a value indicating whether this instance is started.
296+
/// </summary>
297+
/// <value>
298+
/// <c>true</c> if this instance is started; otherwise, <c>false</c>.
299+
/// </value>
300+
public bool IsStarted { get; private set; } = false;
301+
302+
/// <summary>
303+
/// (For use by Unit Tests)
304+
/// Gets a value indicating whether this instance is started.
305+
/// </summary>
306+
/// <value>
307+
/// <c>true</c> if this instance is started; otherwise, <c>false</c>.
308+
/// </value>
309+
internal bool AreSocketsRunning
310+
{
311+
get
312+
{
313+
return _socketDescriptorForAddress.All( s => s.Value.SocketReactor.IsRunning );
314+
}
315+
}
316+
295317
/// <summary>
296318
/// Get the SessionIDs for the sessions managed by this acceptor.
297319
/// </summary>
@@ -363,22 +385,21 @@ public bool RemoveSession(SessionID sessionId, bool terminateActiveSession)
363385
/// Any subclasses of ThreadedSocketAcceptor should override this if they have resources to dispose
364386
/// Any override should call base.Dispose(disposing).
365387
/// </summary>
366-
/// <param name="disposing"></param>
388+
/// <param name="disposing">true if called from Dispose()</param>
367389
protected void Dispose(bool disposing)
368390
{
369-
if (_disposed) return;
370-
if (disposing)
391+
if(_disposed) { return; }
392+
try
371393
{
372-
try
373-
{
374-
Stop();
375-
}
376-
catch (ObjectDisposedException)
377-
{
378-
// ignore
379-
}
394+
Stop();
395+
DisposeSessions();
396+
_sessions.Clear();
397+
_disposed = true;
398+
}
399+
catch (ObjectDisposedException)
400+
{
401+
// ignore
380402
}
381-
_disposed = true;
382403
}
383404
/// <summary>
384405
/// Disposes created sessions
@@ -388,8 +409,8 @@ protected void Dispose(bool disposing)
388409
/// </remarks>
389410
public void Dispose()
390411
{
391-
Dispose(true);
392-
GC.SuppressFinalize(this);
412+
Dispose( true );
413+
GC.SuppressFinalize( this );
393414
}
394415

395416
~ThreadedSocketAcceptor() => Dispose(false);

0 commit comments

Comments
 (0)