Skip to content

Commit dc51977

Browse files
author
Elad Zelingher
committed
Disconnection detect issues
1 parent e252966 commit dc51977

File tree

5 files changed

+111
-34
lines changed

5 files changed

+111
-34
lines changed

src/net45/Default/WampSharp.Fleck/Fleck/FleckWampConnection.cs

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
using System;
2+
using System.Reactive;
3+
using System.Reactive.Linq;
4+
using System.Threading.Tasks;
25
using Fleck;
36
using WampSharp.Core.Listener;
47

@@ -7,10 +10,14 @@ namespace WampSharp.Fleck
710
internal abstract class FleckWampConnection<TMessage> : AsyncWampConnection<TMessage>
811
{
912
protected IWebSocketConnection mWebSocketConnection;
10-
11-
public FleckWampConnection(IWebSocketConnection webSocketConnection)
13+
private readonly byte[] mPingBuffer = new byte[8];
14+
private readonly TimeSpan mAutoSendPingInterval;
15+
16+
public FleckWampConnection(IWebSocketConnection webSocketConnection,
17+
TimeSpan? autoSendPingInterval = null)
1218
{
1319
mWebSocketConnection = webSocketConnection;
20+
mAutoSendPingInterval = autoSendPingInterval ?? TimeSpan.FromSeconds(45);
1421
mWebSocketConnection.OnOpen = OnConnectionOpen;
1522
mWebSocketConnection.OnError = OnConnectionError;
1623
mWebSocketConnection.OnClose = OnConnectionClose;
@@ -19,6 +26,63 @@ public FleckWampConnection(IWebSocketConnection webSocketConnection)
1926
private void OnConnectionOpen()
2027
{
2128
RaiseConnectionOpen();
29+
30+
StartPing();
31+
}
32+
33+
#if NET40
34+
private void StartPing()
35+
{
36+
Observable.Generate(0, x => true, x => x, x => x)
37+
.Select(x =>
38+
Observable.FromAsync(() =>
39+
{
40+
byte[] ticks = GetCurrentTicks();
41+
return mWebSocketConnection.SendPing(ticks);
42+
}).Concat(Observable.Timer(mAutoSendPingInterval)
43+
.Select(y => Unit.Default))
44+
)
45+
.Merge(1);
46+
}
47+
48+
#elif NET45
49+
private void StartPing()
50+
{
51+
Task.Run((Action)Ping);
52+
}
53+
54+
// We currently detect only disconnected clients and not "zombies",
55+
// i.e. clients that respond relatively slow.
56+
private async void Ping()
57+
{
58+
while (IsConnected)
59+
{
60+
try
61+
{
62+
byte[] ticks = GetCurrentTicks();
63+
await mWebSocketConnection.SendPing(ticks);
64+
await Task.Delay(mAutoSendPingInterval);
65+
}
66+
catch (Exception)
67+
{
68+
}
69+
}
70+
}
71+
#endif
72+
73+
private byte[] GetCurrentTicks()
74+
{
75+
DateTime now = DateTime.Now;
76+
long ticks = now.Ticks;
77+
long bytes = ticks;
78+
79+
for (int i = 0; i < 8; i++)
80+
{
81+
mPingBuffer[i] = (byte) bytes;
82+
bytes = bytes >> 8;
83+
}
84+
85+
return mPingBuffer;
2286
}
2387

2488
private void OnConnectionError(Exception exception)

src/net45/WampSharp/Core/Listener/Connections/WampConnectionMonitor.cs

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
11
using System;
2-
using System.Reactive;
3-
using System.Reactive.Linq;
4-
using System.Reactive.Subjects;
52

63
namespace WampSharp.Core.Listener
74
{
@@ -15,16 +12,11 @@ internal class WampConnectionMonitor<TMessage> : IWampConnectionMonitor
1512
private readonly IWampConnection<TMessage> mConnection;
1613
private bool mConnected;
1714

18-
private readonly IEventPatternSource<EventArgs> mDisconnectionEventSource;
19-
private readonly ISubject<EventPattern<EventArgs>> mDisconnectionSubject;
20-
2115
public WampConnectionMonitor(IWampConnection<TMessage> connection)
2216
{
2317
mConnected = true;
2418
mConnection = connection;
25-
mDisconnectionSubject = new ReplaySubject<EventPattern<EventArgs>>(1);
2619

27-
mDisconnectionEventSource = mDisconnectionSubject.ToEventPattern();
2820
mConnection.ConnectionError += OnConnectionError;
2921
mConnection.ConnectionClosed += OnConnectionClosed;
3022
}
@@ -59,19 +51,14 @@ private void OnConnectionClosed()
5951

6052
private void RaiseConnectionClosed(object client, EventArgs empty)
6153
{
62-
mDisconnectionSubject.OnNext(new EventPattern<EventArgs>(client, empty));
63-
}
54+
EventHandler connectionClosed = ConnectionClosed;
6455

65-
public event EventHandler ConnectionClosed
66-
{
67-
add
56+
if (connectionClosed != null)
6857
{
69-
mDisconnectionEventSource.OnNext += new EventHandler<EventArgs>(value);
70-
}
71-
remove
72-
{
73-
mDisconnectionEventSource.OnNext -= new EventHandler<EventArgs>(value);
58+
connectionClosed(client, empty);
7459
}
7560
}
61+
62+
public event EventHandler ConnectionClosed;
7663
}
7764
}

src/net45/WampSharp/Core/Listener/WampListener.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Reactive.Disposables;
3+
using SystemEx;
34
using WampSharp.Core.Dispatch;
45
using WampSharp.Core.Message;
56
using WampSharp.Logging;
@@ -75,6 +76,7 @@ public virtual void Stop()
7576

7677
protected virtual void OnConnectionException(IWampConnection<TMessage> connection, Exception exception)
7778
{
79+
OnCloseConnection(connection);
7880
}
7981

8082
protected virtual void OnCloseConnection(IWampConnection<TMessage> connection)
@@ -136,6 +138,7 @@ protected virtual void OnNewConnection(IWampConnection<TMessage> connection)
136138

137139
connection.MessageArrived += OnNewMessage;
138140
connection.ConnectionOpen += OnConnectionOpen;
141+
connection.ConnectionError += OnConnectionError;
139142
connection.ConnectionClosed += OnConnectionClose;
140143
}
141144

@@ -156,11 +159,22 @@ private void OnConnectionOpen(object sender, EventArgs e)
156159
OnConnectionOpen(connection);
157160
}
158161

162+
private void OnConnectionError(object sender, WampConnectionErrorEventArgs e)
163+
{
164+
OnConnectionClosed(sender);
165+
}
166+
159167
private void OnConnectionClose(object sender, EventArgs e)
168+
{
169+
OnConnectionClosed(sender);
170+
}
171+
172+
private void OnConnectionClosed(object sender)
160173
{
161174
IWampConnection<TMessage> connection = sender as IWampConnection<TMessage>;
162175
connection.ConnectionClosed -= OnConnectionClose;
163176
connection.MessageArrived -= OnNewMessage;
177+
connection.ConnectionError -= OnConnectionError;
164178
OnCloseConnection(connection);
165179
}
166180
}

src/net45/WampSharp/WAMP2/V2/Rpc/Dealer/WampClientRouterCallbackAdapter.cs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,15 @@ public WampClientRouterCallbackAdapter(IWampRawRpcOperationClientCallback caller
1616
{
1717
mCaller = caller;
1818
mNotifier = mCaller as ICallbackDisconnectionNotifier;
19+
mNotifier.Disconnected += OnDisconnected;
1920
mOptions = options;
2021
}
2122

23+
private void OnDisconnected(object sender, EventArgs e)
24+
{
25+
RaiseDisconnected();
26+
}
27+
2228
public void Result<TMessage>(IWampFormatter<TMessage> formatter, YieldOptions details)
2329
{
2430
ResultDetails resultDetails = GetResultDetails(details);
@@ -59,15 +65,15 @@ private ResultDetails GetResultDetails(YieldOptions details)
5965
return new ResultDetails {Progress = details.Progress};
6066
}
6167

62-
public event EventHandler Disconnected
68+
public event EventHandler Disconnected;
69+
70+
private void RaiseDisconnected()
6371
{
64-
add
65-
{
66-
mNotifier.Disconnected += value;
67-
}
68-
remove
72+
EventHandler handler = Disconnected;
73+
74+
if (handler != null)
6975
{
70-
mNotifier.Disconnected -= value;
76+
handler(this, EventArgs.Empty);
7177
}
7278
}
7379
}

src/net45/WampSharp/WAMP2/V2/Rpc/Dealer/WampRpcOperationCallback.cs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public WampRpcOperationCallback(IWampCaller caller, long requestId)
2424
mRequestId = requestId;
2525

2626
mMonitor = caller as IWampConnectionMonitor;
27+
mMonitor.ConnectionClosed += OnConnectionClosed;
2728
}
2829

2930
public void Result<TResult>(IWampFormatter<TResult> formatter, ResultDetails details)
@@ -56,15 +57,20 @@ public void Error<TResult>(IWampFormatter<TResult> formatter, TResult details, s
5657
mCaller.CallError(mRequestId, details, error, arguments.Cast<object>().ToArray(), argumentsKeywords);
5758
}
5859

59-
public event EventHandler Disconnected
60+
public event EventHandler Disconnected;
61+
62+
private void OnConnectionClosed(object sender, EventArgs e)
6063
{
61-
add
62-
{
63-
mMonitor.ConnectionClosed += value;
64-
}
65-
remove
64+
RaiseDisconnected();
65+
}
66+
67+
private void RaiseDisconnected()
68+
{
69+
EventHandler handler = Disconnected;
70+
71+
if (handler != null)
6672
{
67-
mMonitor.ConnectionClosed -= value;
73+
handler(this, EventArgs.Empty);
6874
}
6975
}
7076

0 commit comments

Comments
 (0)