Skip to content

Commit 8bb58fc

Browse files
committed
Fix our In/Out tasks not being awaited properly
1 parent 4250ba0 commit 8bb58fc

File tree

3 files changed

+63
-73
lines changed

3 files changed

+63
-73
lines changed

Signal-Windows.Lib/IncomingMessages.cs

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,40 +26,36 @@ class IncomingMessages : IMessagePipeCallback
2626
{
2727
private readonly ILogger Logger = LibsignalLogging.CreateLogger<IncomingMessages>();
2828
private readonly CancellationToken Token;
29-
private readonly Task<SignalServiceMessagePipe> PipeTask;
29+
private readonly SignalServiceMessagePipe Pipe;
3030
private readonly SignalServiceMessageReceiver MessageReceiver;
3131

32-
public IncomingMessages(CancellationToken token, Task<SignalServiceMessagePipe> pipe, SignalServiceMessageReceiver messageReceiver)
32+
public IncomingMessages(CancellationToken token, SignalServiceMessagePipe pipe, SignalServiceMessageReceiver messageReceiver)
3333
{
3434
Token = token;
35-
PipeTask = pipe;
35+
Pipe = pipe;
3636
MessageReceiver = messageReceiver;
3737
}
3838

3939
public async Task HandleIncomingMessages()
4040
{
4141
Logger.LogDebug("HandleIncomingMessages()");
42-
await PipeTask.ContinueWith(async _ =>
42+
while (!Token.IsCancellationRequested)
4343
{
44-
var pipe = await PipeTask;
45-
while (!Token.IsCancellationRequested)
44+
try
4645
{
47-
try
48-
{
49-
await pipe.ReadBlocking(this);
50-
}
51-
catch (OperationCanceledException)
52-
{
53-
break;
54-
}
55-
catch (Exception e)
56-
{
57-
var line = new StackTrace(e, true).GetFrames()[0].GetFileLineNumber();
58-
Logger.LogWarning("HandleIncomingMessages() failed: {0} occured ({1}):\n{2}", e.GetType(), e.Message, e.StackTrace);
59-
}
46+
await Pipe.ReadBlocking(this);
6047
}
61-
Logger.LogInformation("HandleIncomingMessages() finished");
62-
}, TaskContinuationOptions.RunContinuationsAsynchronously);
48+
catch (OperationCanceledException)
49+
{
50+
break;
51+
}
52+
catch (Exception e)
53+
{
54+
var line = new StackTrace(e, true).GetFrames()[0].GetFileLineNumber();
55+
Logger.LogWarning("HandleIncomingMessages() failed: {0} occured ({1}):\n{2}", e.GetType(), e.Message, e.StackTrace);
56+
}
57+
}
58+
Logger.LogInformation("HandleIncomingMessages() finished");
6359
}
6460

6561
public async Task OnMessage(SignalServiceMessagePipeMessage message)

Signal-Windows.Lib/OutgoingMessages.cs

Lines changed: 42 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -146,12 +146,12 @@ class OutgoingMessages
146146
private readonly CancellationToken Token;
147147
private readonly SignalLibHandle Handle;
148148
private readonly SignalStore Store;
149-
private readonly Task<SignalServiceMessagePipe> CreatePipeTask;
149+
private readonly SignalServiceMessagePipe Pipe;
150150

151-
public OutgoingMessages(CancellationToken token, Task<SignalServiceMessagePipe> createPipeTask, SignalStore store, SignalLibHandle handle)
151+
public OutgoingMessages(CancellationToken token, SignalServiceMessagePipe pipe, SignalStore store, SignalLibHandle handle)
152152
{
153153
Token = token;
154-
CreatePipeTask = createPipeTask;
154+
Pipe = pipe;
155155
Store = store;
156156
Handle = handle;
157157
}
@@ -161,60 +161,55 @@ public async Task HandleOutgoingMessages()
161161
Logger.LogDebug("HandleOutgoingMessages()");
162162
try
163163
{
164-
await CreatePipeTask;
165-
await Task.Run(async () =>
164+
var messageSender = new SignalServiceMessageSender(Token, LibUtils.ServiceConfiguration, Store.Username, Store.Password, (int)Store.DeviceId, new Store(), Pipe, null, LibUtils.USER_AGENT);
165+
while (!Token.IsCancellationRequested)
166166
{
167-
var pipe = await CreatePipeTask;
168-
var messageSender = new SignalServiceMessageSender(Token, LibUtils.ServiceConfiguration, Store.Username, Store.Password, (int)Store.DeviceId, new Store(), pipe, null, LibUtils.USER_AGENT);
169-
while (!Token.IsCancellationRequested)
167+
ISendable sendable = null;
168+
try
170169
{
171-
ISendable sendable = null;
172-
try
173-
{
174-
sendable = Handle.OutgoingQueue.Take(Token);
175-
Logger.LogTrace($"Sending {sendable.GetType().Name}");
176-
await sendable.Send(messageSender, Token);
177-
}
178-
catch (OperationCanceledException) { return; }
179-
catch (EncapsulatedExceptions exceptions)
180-
{
181-
sendable.Status = SignalMessageStatus.Confirmed;
182-
Logger.LogError("HandleOutgoingMessages() encountered libsignal exceptions");
183-
IList<UntrustedIdentityException> identityExceptions = exceptions.UntrustedIdentityExceptions;
184-
if (exceptions.NetworkExceptions.Count > 0)
185-
{
186-
sendable.Status = SignalMessageStatus.Failed_Network;
187-
}
188-
if (identityExceptions.Count > 0)
189-
{
190-
sendable.Status = SignalMessageStatus.Failed_Identity;
191-
}
192-
foreach (UntrustedIdentityException e in identityExceptions)
193-
{
194-
await Handle.HandleOutgoingKeyChangeLocked(e.E164number, Base64.EncodeBytes(e.IdentityKey.serialize()));
195-
}
196-
}
197-
catch (RateLimitException)
170+
sendable = Handle.OutgoingQueue.Take(Token);
171+
Logger.LogTrace($"Sending {sendable.GetType().Name}");
172+
await sendable.Send(messageSender, Token);
173+
}
174+
catch (OperationCanceledException) { return; }
175+
catch (EncapsulatedExceptions exceptions)
176+
{
177+
sendable.Status = SignalMessageStatus.Confirmed;
178+
Logger.LogError("HandleOutgoingMessages() encountered libsignal exceptions");
179+
IList<UntrustedIdentityException> identityExceptions = exceptions.UntrustedIdentityExceptions;
180+
if (exceptions.NetworkExceptions.Count > 0)
198181
{
199-
Logger.LogError("HandleOutgoingMessages() could not send due to rate limits");
200-
sendable.Status = SignalMessageStatus.Failed_Ratelimit;
182+
sendable.Status = SignalMessageStatus.Failed_Network;
201183
}
202-
catch (UntrustedIdentityException e)
184+
if (identityExceptions.Count > 0)
203185
{
204-
Logger.LogError("HandleOutgoingMessages() could not send due to untrusted identities");
205186
sendable.Status = SignalMessageStatus.Failed_Identity;
206-
await Handle.HandleOutgoingKeyChangeLocked(e.E164number, Base64.EncodeBytes(e.IdentityKey.serialize()));
207187
}
208-
catch (Exception e)
188+
foreach (UntrustedIdentityException e in identityExceptions)
209189
{
210-
var line = new StackTrace(e, true).GetFrames()[0].GetFileLineNumber();
211-
Logger.LogError("HandleOutgoingMessages() failed in line {0}: {1}\n{2}", line, e.Message, e.StackTrace);
212-
sendable.Status = SignalMessageStatus.Failed_Unknown;
190+
await Handle.HandleOutgoingKeyChangeLocked(e.E164number, Base64.EncodeBytes(e.IdentityKey.serialize()));
213191
}
214-
await Handle.HandleMessageSentLocked(sendable);
215192
}
216-
Logger.LogInformation("HandleOutgoingMessages() stopping: cancellation was requested");
217-
});
193+
catch (RateLimitException)
194+
{
195+
Logger.LogError("HandleOutgoingMessages() could not send due to rate limits");
196+
sendable.Status = SignalMessageStatus.Failed_Ratelimit;
197+
}
198+
catch (UntrustedIdentityException e)
199+
{
200+
Logger.LogError("HandleOutgoingMessages() could not send due to untrusted identities");
201+
sendable.Status = SignalMessageStatus.Failed_Identity;
202+
await Handle.HandleOutgoingKeyChangeLocked(e.E164number, Base64.EncodeBytes(e.IdentityKey.serialize()));
203+
}
204+
catch (Exception e)
205+
{
206+
var line = new StackTrace(e, true).GetFrames()[0].GetFileLineNumber();
207+
Logger.LogError("HandleOutgoingMessages() failed in line {0}: {1}\n{2}", line, e.Message, e.StackTrace);
208+
sendable.Status = SignalMessageStatus.Failed_Unknown;
209+
}
210+
await Handle.HandleMessageSentLocked(sendable);
211+
}
212+
Logger.LogInformation("HandleOutgoingMessages() stopping: cancellation was requested");
218213
}
219214
catch (OperationCanceledException)
220215
{

Signal-Windows.Lib/SignalLibHandle.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -298,9 +298,9 @@ public void Release()
298298
IncomingMessagesTask?.Wait();
299299
OutgoingMessagesTask?.Wait();
300300
Instance = null;
301-
Logger.LogTrace("Release() releasing global)");
301+
Logger.LogTrace("Release() releasing global");
302302
LibUtils.Unlock();
303-
Logger.LogTrace("Release() releasing local)");
303+
Logger.LogTrace("Release() releasing local");
304304
SemaphoreSlim.Release();
305305
Logger.LogTrace("Release() released");
306306
}
@@ -897,6 +897,8 @@ private void InitNetwork()
897897
{
898898
var pipe = await MessageReceiver.CreateMessagePipe(CancelSource.Token, new SignalWebSocketFactory());
899899
Logger.LogTrace("Messagepipe created");
900+
IncomingMessagesTask = await Task.Factory.StartNew(async () => await new IncomingMessages(CancelSource.Token, pipe, MessageReceiver).HandleIncomingMessages(), TaskCreationOptions.LongRunning);
901+
OutgoingMessagesTask = await Task.Factory.StartNew(async () => await new OutgoingMessages(CancelSource.Token, pipe, Store, this).HandleOutgoingMessages(), TaskCreationOptions.LongRunning);
900902
return pipe;
901903
}
902904
catch(Exception e)
@@ -906,9 +908,6 @@ private void InitNetwork()
906908
throw e;
907909
}
908910
});
909-
OutgoingMessages = new OutgoingMessages(CancelSource.Token, pipeTask, Store, this);
910-
IncomingMessagesTask = Task.Factory.StartNew(async () => await new IncomingMessages(CancelSource.Token, pipeTask, MessageReceiver).HandleIncomingMessages(), TaskCreationOptions.LongRunning);
911-
OutgoingMessagesTask = Task.Factory.StartNew(async () => await OutgoingMessages.HandleOutgoingMessages(), TaskCreationOptions.LongRunning);
912911
}
913912
catch(Exception e)
914913
{

0 commit comments

Comments
 (0)