Skip to content

Commit e93f91e

Browse files
committed
Add connectivity listener interface for websocket
1 parent de7f428 commit e93f91e

File tree

3 files changed

+186
-164
lines changed

3 files changed

+186
-164
lines changed

libsignal-service-dotnet/SignalServiceMessageReceiver.cs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ public class SignalServiceMessageReceiver
2727
private const int MAC_KEY_SIZE = 32;
2828
private readonly PushServiceSocket Socket;
2929
private readonly SignalServiceConfiguration Urls;
30-
private readonly CredentialsProvider credentialsProvider;
30+
private readonly CredentialsProvider CredentialsProvider;
3131
private readonly string UserAgent;
32+
private readonly ConnectivityListener ConnectivityListener;
3233
private readonly CancellationToken Token;
3334

3435
/// <summary>
@@ -38,13 +39,15 @@ public class SignalServiceMessageReceiver
3839
/// <param name="urls">The URL of the Signal Service.</param>
3940
/// <param name="credentials">The Signal Service user's credentials</param>
4041
/// <param name="userAgent"></param>
41-
public SignalServiceMessageReceiver(CancellationToken token, SignalServiceConfiguration urls, CredentialsProvider credentials, string userAgent)
42+
/// <param name="connectivityListener"></param>
43+
public SignalServiceMessageReceiver(CancellationToken token, SignalServiceConfiguration urls, CredentialsProvider credentials, string userAgent, ConnectivityListener connectivityListener)
4244
{
43-
this.Token = token;
44-
this.Urls = urls;
45-
this.credentialsProvider = credentials;
46-
this.Socket = new PushServiceSocket(urls, credentials, userAgent);
47-
this.UserAgent = userAgent;
45+
Token = token;
46+
Urls = urls;
47+
CredentialsProvider = credentials;
48+
Socket = new PushServiceSocket(urls, credentials, userAgent);
49+
UserAgent = userAgent;
50+
ConnectivityListener = connectivityListener;
4851
}
4952

5053
/// <summary>
@@ -104,8 +107,8 @@ public string RetrieveAttachmentDownloadUrl(SignalServiceAttachmentPointer point
104107
/// <returns>A SignalServiceMessagePipe for receiving Signal Service messages.</returns>
105108
public SignalServiceMessagePipe CreateMessagePipe()
106109
{
107-
SignalWebSocketConnection webSocket = new SignalWebSocketConnection(Token, Urls.SignalServiceUrls[0].Url, credentialsProvider, UserAgent);
108-
return new SignalServiceMessagePipe(Token, webSocket, credentialsProvider);
110+
SignalWebSocketConnection webSocket = new SignalWebSocketConnection(Token, Urls.SignalServiceUrls[0].Url, CredentialsProvider, UserAgent, ConnectivityListener);
111+
return new SignalServiceMessagePipe(Token, webSocket, CredentialsProvider);
109112
}
110113

111114
public List<SignalServiceEnvelope> RetrieveMessages(MessageReceivedCallback callback)
Lines changed: 159 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -1,155 +1,159 @@
1-
using Coe.WebSocketWrapper;
2-
using Google.Protobuf;
3-
using libsignal.util;
4-
using libsignalservice.push;
5-
using libsignalservice.util;
6-
using Microsoft.Extensions.Logging;
7-
using System;
8-
using System.Collections.Concurrent;
9-
using System.IO;
10-
using System.Text;
11-
using System.Threading;
12-
using System.Threading.Tasks;
13-
14-
namespace libsignalservice.websocket
15-
{
16-
internal class SignalWebSocketConnection
17-
{
18-
private readonly ILogger Logger = LibsignalLogging.CreateLogger<SignalWebSocketConnection>();
19-
private static readonly Object obj = new Object();
20-
21-
private readonly BlockingCollection<WebSocketRequestMessage> IncomingRequests = new BlockingCollection<WebSocketRequestMessage>(new ConcurrentQueue<WebSocketRequestMessage>());
22-
private readonly ConcurrentDictionary<ulong, Tuple<CountdownEvent, uint, string>> OutgoingRequests = new ConcurrentDictionary<ulong, Tuple<CountdownEvent, uint, string>>();
23-
24-
private readonly string WsUri;
25-
private readonly CredentialsProvider CredentialsProvider;
26-
private readonly string UserAgent;
27-
private WebSocketWrapper WebSocket;
28-
private CancellationToken Token;
29-
30-
internal SignalWebSocketConnection(CancellationToken token, string httpUri, CredentialsProvider credentialsProvider, string userAgent)
31-
{
32-
Token = token;
33-
CredentialsProvider = credentialsProvider;
34-
UserAgent = userAgent;
35-
if (credentialsProvider.GetDeviceId() == SignalServiceAddress.DEFAULT_DEVICE_ID)
36-
{
37-
WsUri = httpUri.Replace("https://", "wss://")
38-
.Replace("http://", "ws://") + $"/v1/websocket/?login={credentialsProvider.GetUser()}&password={credentialsProvider.GetPassword()}";
39-
}
40-
else
41-
{
42-
WsUri = httpUri.Replace("https://", "wss://")
43-
.Replace("http://", "ws://") + $"/v1/websocket/?login={credentialsProvider.GetUser()}.{credentialsProvider.GetDeviceId()}&password={credentialsProvider.GetPassword()}";
44-
}
45-
UserAgent = userAgent;
46-
WebSocket = new WebSocketWrapper(WsUri, token);
47-
WebSocket.OnConnect(Connection_OnOpened);
48-
WebSocket.OnMessage(Connection_OnMessage);
49-
}
50-
51-
public void Connect()
52-
{
53-
WebSocket.Connect();
54-
}
55-
56-
private void Connection_OnOpened()
57-
{
58-
}
59-
60-
private void Connection_OnMessage(byte[] obj)
61-
{
62-
var msg = WebSocketMessage.Parser.ParseFrom(obj);
63-
if (msg.Type == WebSocketMessage.Types.Type.Request)
64-
{
65-
Logger.LogTrace("Adding request to IncomingRequests");
66-
IncomingRequests.Add(msg.Request);
67-
}
68-
else if (msg.Type == WebSocketMessage.Types.Type.Response)
69-
{
70-
Logger.LogTrace("Adding response {0} ({1} {2})", msg.Response.Id, msg.Response.Status, Encoding.UTF8.GetString(msg.Response.Body.ToByteArray()));
71-
var t = new Tuple<CountdownEvent, uint, string>(null, msg.Response.Status, Encoding.UTF8.GetString(msg.Response.Body.ToByteArray()));
72-
Tuple<CountdownEvent, uint, string> savedRequest;
73-
OutgoingRequests.TryGetValue(msg.Response.Id, out savedRequest);
74-
OutgoingRequests.AddOrUpdate(msg.Response.Id, t, (k, v) => t);
75-
savedRequest.Item1.Signal();
76-
}
77-
}
78-
79-
public void Disconnect()
80-
{
81-
Logger.LogWarning("Disconnect is not supported yet");
82-
throw new NotImplementedException();
83-
}
84-
85-
86-
87-
/// <summary>
88-
/// Gets the next WebSocketRequestMessage from the websocket.
89-
/// If there are no received messages in the buffer, this method will block until there are, or this connection's token is cancelled.
90-
/// </summary>
91-
/// <remarks>
92-
/// keks
93-
/// </remarks>
94-
/// <returns>A WebSocketRequestMessage read from the websocket's pipe</returns>
95-
public WebSocketRequestMessage ReadRequestBlocking()
96-
{
97-
return IncomingRequests.Take(Token);
98-
}
99-
100-
/// <summary>
101-
/// Sends a WebSocketRequestMessage to the Signal server. The returned task will block for a maximum of 10 seconds.
102-
/// </summary>
103-
/// <param name="request"></param>
104-
/// <returns>Returns a task that returns a server response or throws an exception.</returns>
105-
internal async Task<Tuple<uint, string>> SendRequest(WebSocketRequestMessage request)
106-
{
107-
Tuple<CountdownEvent, uint, string> t = new Tuple<CountdownEvent, uint, string>(new CountdownEvent(1), 0, null);
108-
WebSocketMessage message = new WebSocketMessage
109-
{
110-
Type = WebSocketMessage.Types.Type.Request,
111-
Request = request
112-
};
113-
OutgoingRequests.AddOrUpdate(request.Id, t, (k, v) => t);
114-
WebSocket.OutgoingQueue.Add(message.ToByteArray());
115-
return await Task.Run(() =>
116-
{
117-
if (t.Item1.Wait(10 * 1000, Token))
118-
{
119-
var handledTuple = OutgoingRequests[request.Id];
120-
return new Tuple<uint, string>(handledTuple.Item2, handledTuple.Item3);
121-
}
122-
throw new IOException("wait for confirmation timeout");
123-
});
124-
}
125-
126-
/// <summary>
127-
/// Sends a WebSocketResponseMessage to the Signal server. This method does not block until the message is actually sent.
128-
/// </summary>
129-
/// <param name="response"></param>
130-
public void SendResponse(WebSocketResponseMessage response)
131-
{
132-
WebSocketMessage message = new WebSocketMessage
133-
{
134-
Type = WebSocketMessage.Types.Type.Response,
135-
Response = response
136-
};
137-
WebSocket.OutgoingQueue.Add(message.ToByteArray());
138-
}
139-
140-
private void SendKeepAlive(CancellationToken token, object state)
141-
{
142-
WebSocketMessage message = new WebSocketMessage
143-
{
144-
Type = WebSocketMessage.Types.Type.Request,
145-
Request = new WebSocketRequestMessage
146-
{
147-
Id = KeyHelper.getTime(),
148-
Path = "/v1/keepalive",
149-
Verb = "GET"
150-
},
151-
};
152-
WebSocket.OutgoingQueue.Add(message.ToByteArray());
153-
}
154-
}
155-
}
1+
using Coe.WebSocketWrapper;
2+
using Google.Protobuf;
3+
using libsignal.util;
4+
using libsignalservice.push;
5+
using libsignalservice.util;
6+
using Microsoft.Extensions.Logging;
7+
using System;
8+
using System.Collections.Concurrent;
9+
using System.IO;
10+
using System.Text;
11+
using System.Threading;
12+
using System.Threading.Tasks;
13+
14+
namespace libsignalservice.websocket
15+
{
16+
internal class SignalWebSocketConnection
17+
{
18+
private readonly ILogger Logger = LibsignalLogging.CreateLogger<SignalWebSocketConnection>();
19+
private static readonly Object obj = new Object();
20+
21+
private readonly BlockingCollection<WebSocketRequestMessage> IncomingRequests = new BlockingCollection<WebSocketRequestMessage>(new ConcurrentQueue<WebSocketRequestMessage>());
22+
private readonly ConcurrentDictionary<ulong, Tuple<CountdownEvent, uint, string>> OutgoingRequests = new ConcurrentDictionary<ulong, Tuple<CountdownEvent, uint, string>>();
23+
24+
private readonly string WsUri;
25+
private readonly CredentialsProvider CredentialsProvider;
26+
private readonly string UserAgent;
27+
private WebSocketWrapper WebSocket;
28+
private readonly CancellationToken Token;
29+
private readonly ConnectivityListener Listener;
30+
31+
internal SignalWebSocketConnection(CancellationToken token, string httpUri, CredentialsProvider credentialsProvider, string userAgent, ConnectivityListener listener)
32+
{
33+
Token = token;
34+
CredentialsProvider = credentialsProvider;
35+
UserAgent = userAgent;
36+
Listener = listener;
37+
if (credentialsProvider.GetDeviceId() == SignalServiceAddress.DEFAULT_DEVICE_ID)
38+
{
39+
WsUri = httpUri.Replace("https://", "wss://")
40+
.Replace("http://", "ws://") + $"/v1/websocket/?login={credentialsProvider.GetUser()}&password={credentialsProvider.GetPassword()}";
41+
}
42+
else
43+
{
44+
WsUri = httpUri.Replace("https://", "wss://")
45+
.Replace("http://", "ws://") + $"/v1/websocket/?login={credentialsProvider.GetUser()}.{credentialsProvider.GetDeviceId()}&password={credentialsProvider.GetPassword()}";
46+
}
47+
UserAgent = userAgent;
48+
WebSocket = new WebSocketWrapper(WsUri, token);
49+
WebSocket.OnConnect(Connection_OnOpened);
50+
WebSocket.OnMessage(Connection_OnMessage);
51+
}
52+
53+
public void Connect()
54+
{
55+
Listener?.OnConnecting();
56+
WebSocket.Connect();
57+
}
58+
59+
private void Connection_OnOpened()
60+
{
61+
Listener?.OnConnecting();
62+
}
63+
64+
private void Connection_OnMessage(byte[] obj)
65+
{
66+
var msg = WebSocketMessage.Parser.ParseFrom(obj);
67+
if (msg.Type == WebSocketMessage.Types.Type.Request)
68+
{
69+
Logger.LogTrace("Adding request to IncomingRequests");
70+
IncomingRequests.Add(msg.Request);
71+
}
72+
else if (msg.Type == WebSocketMessage.Types.Type.Response)
73+
{
74+
Logger.LogTrace("Adding response {0} ({1} {2})", msg.Response.Id, msg.Response.Status, Encoding.UTF8.GetString(msg.Response.Body.ToByteArray()));
75+
var t = new Tuple<CountdownEvent, uint, string>(null, msg.Response.Status, Encoding.UTF8.GetString(msg.Response.Body.ToByteArray()));
76+
Tuple<CountdownEvent, uint, string> savedRequest;
77+
OutgoingRequests.TryGetValue(msg.Response.Id, out savedRequest);
78+
OutgoingRequests.AddOrUpdate(msg.Response.Id, t, (k, v) => t);
79+
savedRequest.Item1.Signal();
80+
}
81+
}
82+
83+
public void Disconnect()
84+
{
85+
Logger.LogWarning("Disconnect is not supported yet");
86+
throw new NotImplementedException();
87+
}
88+
89+
90+
91+
/// <summary>
92+
/// Gets the next WebSocketRequestMessage from the websocket.
93+
/// If there are no received messages in the buffer, this method will block until there are, or this connection's token is cancelled.
94+
/// </summary>
95+
/// <remarks>
96+
/// keks
97+
/// </remarks>
98+
/// <returns>A WebSocketRequestMessage read from the websocket's pipe</returns>
99+
public WebSocketRequestMessage ReadRequestBlocking()
100+
{
101+
return IncomingRequests.Take(Token);
102+
}
103+
104+
/// <summary>
105+
/// Sends a WebSocketRequestMessage to the Signal server. The returned task will block for a maximum of 10 seconds.
106+
/// </summary>
107+
/// <param name="request"></param>
108+
/// <returns>Returns a task that returns a server response or throws an exception.</returns>
109+
internal async Task<Tuple<uint, string>> SendRequest(WebSocketRequestMessage request)
110+
{
111+
Tuple<CountdownEvent, uint, string> t = new Tuple<CountdownEvent, uint, string>(new CountdownEvent(1), 0, null);
112+
WebSocketMessage message = new WebSocketMessage
113+
{
114+
Type = WebSocketMessage.Types.Type.Request,
115+
Request = request
116+
};
117+
OutgoingRequests.AddOrUpdate(request.Id, t, (k, v) => t);
118+
WebSocket.OutgoingQueue.Add(message.ToByteArray());
119+
return await Task.Run(() =>
120+
{
121+
if (t.Item1.Wait(10 * 1000, Token))
122+
{
123+
var handledTuple = OutgoingRequests[request.Id];
124+
return new Tuple<uint, string>(handledTuple.Item2, handledTuple.Item3);
125+
}
126+
throw new IOException("wait for confirmation timeout");
127+
});
128+
}
129+
130+
/// <summary>
131+
/// Sends a WebSocketResponseMessage to the Signal server. This method does not block until the message is actually sent.
132+
/// </summary>
133+
/// <param name="response"></param>
134+
public void SendResponse(WebSocketResponseMessage response)
135+
{
136+
WebSocketMessage message = new WebSocketMessage
137+
{
138+
Type = WebSocketMessage.Types.Type.Response,
139+
Response = response
140+
};
141+
WebSocket.OutgoingQueue.Add(message.ToByteArray());
142+
}
143+
144+
private void SendKeepAlive(CancellationToken token, object state)
145+
{
146+
WebSocketMessage message = new WebSocketMessage
147+
{
148+
Type = WebSocketMessage.Types.Type.Request,
149+
Request = new WebSocketRequestMessage
150+
{
151+
Id = KeyHelper.getTime(),
152+
Path = "/v1/keepalive",
153+
Verb = "GET"
154+
},
155+
};
156+
WebSocket.OutgoingQueue.Add(message.ToByteArray());
157+
}
158+
}
159+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
5+
namespace libsignalservice.websocket
6+
{
7+
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
8+
public interface ConnectivityListener
9+
{
10+
void OnConnected();
11+
void OnConnecting();
12+
void OnDisconnected();
13+
}
14+
#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member
15+
}

0 commit comments

Comments
 (0)