Skip to content

Commit 3dcebd1

Browse files
author
Elad Zelingher
committed
Merge branch 'client-disconnection' into develop
2 parents 63fca99 + bf40e3d commit 3dcebd1

File tree

5 files changed

+122
-34
lines changed

5 files changed

+122
-34
lines changed

src/net45/Default/WampSharp.Fleck/Fleck/FleckWampConnection.cs

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,25 @@
11
using System;
2+
using System.Reactive;
3+
using System.Reactive.Linq;
4+
using System.Reactive.Threading.Tasks;
5+
using System.Threading.Tasks;
26
using Fleck;
37
using WampSharp.Core.Listener;
8+
using WampSharp.Logging;
49

510
namespace WampSharp.Fleck
611
{
712
internal abstract class FleckWampConnection<TMessage> : AsyncWampConnection<TMessage>
813
{
914
protected IWebSocketConnection mWebSocketConnection;
10-
11-
public FleckWampConnection(IWebSocketConnection webSocketConnection)
15+
private readonly byte[] mPingBuffer = new byte[8];
16+
private readonly TimeSpan mAutoSendPingInterval;
17+
18+
public FleckWampConnection(IWebSocketConnection webSocketConnection,
19+
TimeSpan? autoSendPingInterval = null)
1220
{
1321
mWebSocketConnection = webSocketConnection;
22+
mAutoSendPingInterval = autoSendPingInterval ?? TimeSpan.FromSeconds(45);
1423
mWebSocketConnection.OnOpen = OnConnectionOpen;
1524
mWebSocketConnection.OnError = OnConnectionError;
1625
mWebSocketConnection.OnClose = OnConnectionClose;
@@ -19,6 +28,73 @@ public FleckWampConnection(IWebSocketConnection webSocketConnection)
1928
private void OnConnectionOpen()
2029
{
2130
RaiseConnectionOpen();
31+
32+
StartPing();
33+
}
34+
35+
#if NET40
36+
private void StartPing()
37+
{
38+
Observable.Defer
39+
(() => Observable.FromAsync
40+
(() =>
41+
{
42+
byte[] ticks = GetCurrentTicks();
43+
return mWebSocketConnection.SendPing(ticks);
44+
})
45+
.Concat(Observable.Timer(mAutoSendPingInterval)
46+
.Select(y => Unit.Default))
47+
)
48+
.Repeat()
49+
.ToTask()
50+
.ContinueWith(x =>
51+
{
52+
if (x.Exception != null)
53+
{
54+
mLogger.WarnException("Failed pinging remote peer", x.Exception);
55+
}
56+
});
57+
}
58+
59+
#elif NET45
60+
private void StartPing()
61+
{
62+
Task.Run((Action)Ping);
63+
}
64+
65+
// We currently detect only disconnected clients and not "zombies",
66+
// i.e. clients that respond relatively slow.
67+
private async void Ping()
68+
{
69+
while (IsConnected)
70+
{
71+
try
72+
{
73+
byte[] ticks = GetCurrentTicks();
74+
await mWebSocketConnection.SendPing(ticks);
75+
await Task.Delay(mAutoSendPingInterval);
76+
}
77+
catch (Exception ex)
78+
{
79+
mLogger.WarnException("Failed pinging remote peer", ex);
80+
}
81+
}
82+
}
83+
#endif
84+
85+
private byte[] GetCurrentTicks()
86+
{
87+
DateTime now = DateTime.Now;
88+
long ticks = now.Ticks;
89+
long bytes = ticks;
90+
91+
for (int i = 0; i < 8; i++)
92+
{
93+
mPingBuffer[i] = (byte) bytes;
94+
bytes = bytes >> 8;
95+
}
96+
97+
return mPingBuffer;
2298
}
2399

24100
private void OnConnectionError(Exception exception)

src/net45/WampSharp/Core/Listener/Connections/WampConnectionMonitor.cs

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
11
using System;
2-
using System.Reactive;
3-
using System.Reactive.Linq;
4-
using System.Reactive.Subjects;
52

63
namespace WampSharp.Core.Listener
74
{
@@ -15,16 +12,11 @@ internal class WampConnectionMonitor<TMessage> : IWampConnectionMonitor
1512
private readonly IWampConnection<TMessage> mConnection;
1613
private bool mConnected;
1714

18-
private readonly IEventPatternSource<EventArgs> mDisconnectionEventSource;
19-
private readonly ISubject<EventPattern<EventArgs>> mDisconnectionSubject;
20-
2115
public WampConnectionMonitor(IWampConnection<TMessage> connection)
2216
{
2317
mConnected = true;
2418
mConnection = connection;
25-
mDisconnectionSubject = new ReplaySubject<EventPattern<EventArgs>>(1);
2619

27-
mDisconnectionEventSource = mDisconnectionSubject.ToEventPattern();
2820
mConnection.ConnectionError += OnConnectionError;
2921
mConnection.ConnectionClosed += OnConnectionClosed;
3022
}
@@ -59,19 +51,14 @@ private void OnConnectionClosed()
5951

6052
private void RaiseConnectionClosed(object client, EventArgs empty)
6153
{
62-
mDisconnectionSubject.OnNext(new EventPattern<EventArgs>(client, empty));
63-
}
54+
EventHandler connectionClosed = ConnectionClosed;
6455

65-
public event EventHandler ConnectionClosed
66-
{
67-
add
56+
if (connectionClosed != null)
6857
{
69-
mDisconnectionEventSource.OnNext += new EventHandler<EventArgs>(value);
70-
}
71-
remove
72-
{
73-
mDisconnectionEventSource.OnNext -= new EventHandler<EventArgs>(value);
58+
connectionClosed(client, empty);
7459
}
7560
}
61+
62+
public event EventHandler ConnectionClosed;
7663
}
7764
}

src/net45/WampSharp/Core/Listener/WampListener.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public virtual void Stop()
7676

7777
protected virtual void OnConnectionException(IWampConnection<TMessage> connection, Exception exception)
7878
{
79+
OnCloseConnection(connection);
7980
}
8081

8182
protected virtual void OnCloseConnection(IWampConnection<TMessage> connection)
@@ -139,6 +140,7 @@ protected virtual void OnNewConnection(IWampConnection<TMessage> connection)
139140

140141
connection.MessageArrived += OnNewMessage;
141142
connection.ConnectionOpen += OnConnectionOpen;
143+
connection.ConnectionError += OnConnectionError;
142144
connection.ConnectionClosed += OnConnectionClose;
143145
}
144146

@@ -159,11 +161,22 @@ private void OnConnectionOpen(object sender, EventArgs e)
159161
OnConnectionOpen(connection);
160162
}
161163

164+
private void OnConnectionError(object sender, WampConnectionErrorEventArgs e)
165+
{
166+
OnConnectionClosed(sender);
167+
}
168+
162169
private void OnConnectionClose(object sender, EventArgs e)
170+
{
171+
OnConnectionClosed(sender);
172+
}
173+
174+
private void OnConnectionClosed(object sender)
163175
{
164176
IWampConnection<TMessage> connection = sender as IWampConnection<TMessage>;
165177
connection.ConnectionClosed -= OnConnectionClose;
166178
connection.MessageArrived -= OnNewMessage;
179+
connection.ConnectionError -= OnConnectionError;
167180
OnCloseConnection(connection);
168181
}
169182
}

src/net45/WampSharp/WAMP2/V2/Rpc/Dealer/WampClientRouterCallbackAdapter.cs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,15 @@ public WampClientRouterCallbackAdapter(IWampRawRpcOperationClientCallback caller
1616
{
1717
mCaller = caller;
1818
mNotifier = mCaller as ICallbackDisconnectionNotifier;
19+
mNotifier.Disconnected += OnDisconnected;
1920
mOptions = options;
2021
}
2122

23+
private void OnDisconnected(object sender, EventArgs e)
24+
{
25+
RaiseDisconnected();
26+
}
27+
2228
public void Result<TMessage>(IWampFormatter<TMessage> formatter, YieldOptions details)
2329
{
2430
ResultDetails resultDetails = GetResultDetails(details);
@@ -59,15 +65,15 @@ private ResultDetails GetResultDetails(YieldOptions details)
5965
return new ResultDetails {Progress = details.Progress};
6066
}
6167

62-
public event EventHandler Disconnected
68+
public event EventHandler Disconnected;
69+
70+
private void RaiseDisconnected()
6371
{
64-
add
65-
{
66-
mNotifier.Disconnected += value;
67-
}
68-
remove
72+
EventHandler handler = Disconnected;
73+
74+
if (handler != null)
6975
{
70-
mNotifier.Disconnected -= value;
76+
handler(this, EventArgs.Empty);
7177
}
7278
}
7379
}

src/net45/WampSharp/WAMP2/V2/Rpc/Dealer/WampRpcOperationCallback.cs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public WampRpcOperationCallback(IWampCaller caller, long requestId)
2424
mRequestId = requestId;
2525

2626
mMonitor = caller as IWampConnectionMonitor;
27+
mMonitor.ConnectionClosed += OnConnectionClosed;
2728
}
2829

2930
public void Result<TResult>(IWampFormatter<TResult> formatter, ResultDetails details)
@@ -56,15 +57,20 @@ public void Error<TResult>(IWampFormatter<TResult> formatter, TResult details, s
5657
mCaller.CallError(mRequestId, details, error, arguments.Cast<object>().ToArray(), argumentsKeywords);
5758
}
5859

59-
public event EventHandler Disconnected
60+
public event EventHandler Disconnected;
61+
62+
private void OnConnectionClosed(object sender, EventArgs e)
6063
{
61-
add
62-
{
63-
mMonitor.ConnectionClosed += value;
64-
}
65-
remove
64+
RaiseDisconnected();
65+
}
66+
67+
private void RaiseDisconnected()
68+
{
69+
EventHandler handler = Disconnected;
70+
71+
if (handler != null)
6672
{
67-
mMonitor.ConnectionClosed -= value;
73+
handler(this, EventArgs.Empty);
6874
}
6975
}
7076

0 commit comments

Comments
 (0)