Skip to content

Commit 66b02e7

Browse files
committed
Use the outgoing queue for everything
1 parent 5523884 commit 66b02e7

File tree

4 files changed

+164
-127
lines changed

4 files changed

+164
-127
lines changed

Signal-Windows.Lib/IncomingMessages.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,24 +26,25 @@ class IncomingMessages : IMessagePipeCallback
2626
{
2727
private readonly ILogger Logger = LibsignalLogging.CreateLogger<IncomingMessages>();
2828
private readonly CancellationToken Token;
29-
private readonly SignalServiceMessagePipe Pipe;
29+
private readonly Task<SignalServiceMessagePipe> PipeTask;
3030
private readonly SignalServiceMessageReceiver MessageReceiver;
3131

32-
public IncomingMessages(CancellationToken token, SignalServiceMessagePipe pipe, SignalServiceMessageReceiver messageReceiver)
32+
public IncomingMessages(CancellationToken token, Task<SignalServiceMessagePipe> pipe, SignalServiceMessageReceiver messageReceiver)
3333
{
3434
Token = token;
35-
Pipe = pipe;
35+
PipeTask = pipe;
3636
MessageReceiver = messageReceiver;
3737
}
3838

3939
public async Task HandleIncomingMessages()
4040
{
4141
Logger.LogDebug("HandleIncomingMessages()");
42+
var pipe = await PipeTask;
4243
while (!Token.IsCancellationRequested)
4344
{
4445
try
4546
{
46-
await Pipe.ReadBlocking(this);
47+
await pipe.ReadBlocking(this);
4748
}
4849
catch (OperationCanceledException)
4950
{
@@ -578,7 +579,7 @@ private async Task HandleSignalMessage(SignalServiceEnvelope envelope, SignalSer
578579
Group = group,
579580
Timestamp = Util.CurrentTimeMillis()
580581
};
581-
await SignalLibHandle.Instance.OutgoingMessages.SendMessage(envelope.GetSourceAddress(), requestInfoMessage);
582+
SignalLibHandle.Instance.OutgoingQueue.Add(new SignalServiceDataMessageSendable(requestInfoMessage, envelope.GetSourceAddress()));
582583
}
583584
composedTimestamp = envelope.GetTimestamp();
584585
}

Signal-Windows.Lib/OutgoingMessages.cs

Lines changed: 127 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -21,135 +21,170 @@
2121

2222
namespace Signal_Windows.Lib
2323
{
24-
class OutgoingMessages
24+
interface ISendable
2525
{
26-
private readonly ILogger Logger = LibsignalLogging.CreateLogger<OutgoingMessages>();
27-
private readonly CancellationToken Token;
28-
private readonly SignalServiceMessageSender MessageSender;
29-
private readonly SignalLibHandle Handle;
26+
SignalMessageStatus Status { set; }
27+
Task Send(SignalServiceMessageSender messageSender, CancellationToken token);
28+
}
29+
30+
class SignalServiceSyncMessageSendable : ISendable
31+
{
32+
public SignalMessageStatus Status { get; set; }
33+
private readonly SignalServiceSyncMessage SyncMessage;
3034

31-
public OutgoingMessages(CancellationToken token, SignalServiceMessageSender sender, SignalLibHandle handle)
35+
public SignalServiceSyncMessageSendable(SignalServiceSyncMessage message)
3236
{
33-
Token = token;
34-
MessageSender = sender;
35-
Handle = handle;
37+
SyncMessage = message;
3638
}
3739

38-
public async Task SendMessage(List<SignalServiceAddress> recipients, SignalServiceDataMessage message)
40+
public async Task Send(SignalServiceMessageSender messageSender, CancellationToken token)
3941
{
40-
await MessageSender.SendMessage(Token, recipients, message);
42+
await messageSender.SendMessage(token, SyncMessage);
4143
}
44+
}
45+
46+
class SignalServiceDataMessageSendable : ISendable
47+
{
48+
public SignalMessageStatus Status { get; set; }
49+
private readonly SignalServiceDataMessage DataMessage;
50+
private readonly SignalServiceAddress Recipient;
4251

43-
public async Task SendMessage(SignalServiceAddress recipient, SignalServiceDataMessage message)
52+
public SignalServiceDataMessageSendable(SignalServiceDataMessage dataMessage, SignalServiceAddress recipient)
4453
{
45-
await MessageSender.SendMessage(Token, recipient, message);
54+
DataMessage = dataMessage;
55+
Recipient = recipient;
4656
}
4757

48-
public void SendMessage(SignalServiceAddress recipient, SignalServiceSyncMessage message)
58+
public async Task Send(SignalServiceMessageSender messageSender, CancellationToken token)
4959
{
50-
lock (this)
51-
{
52-
MessageSender.SendMessage(Token, message);
53-
}
60+
await messageSender.SendMessage(token, Recipient, DataMessage);
5461
}
62+
}
63+
64+
class SignalMessageSendable : ISendable
65+
{
66+
private readonly ILogger Logger = LibsignalLogging.CreateLogger<SignalMessageSendable>();
67+
public readonly SignalMessage OutgoingSignalMessage;
5568

56-
public void SendMessage(SignalServiceSyncMessage message)
69+
public SignalMessageSendable(SignalMessage message)
5770
{
58-
lock (this)
59-
{
60-
MessageSender.SendMessage(Token, message);
61-
}
71+
OutgoingSignalMessage = message;
6272
}
6373

64-
public async Task HandleOutgoingMessages()
74+
public SignalMessageStatus Status { set => OutgoingSignalMessage.Status = value; }
75+
76+
public async Task Send(SignalServiceMessageSender messageSender, CancellationToken token)
6577
{
66-
Logger.LogDebug("HandleOutgoingMessages()");
67-
while (!Token.IsCancellationRequested)
78+
List<SignalServiceAttachment> outgoingAttachmentsList = null;
79+
if (OutgoingSignalMessage.Attachments != null && OutgoingSignalMessage.Attachments.Count > 0)
6880
{
69-
SignalMessage outgoingSignalMessage = null;
70-
try
81+
outgoingAttachmentsList = new List<SignalServiceAttachment>();
82+
foreach (var attachment in OutgoingSignalMessage.Attachments)
7183
{
72-
outgoingSignalMessage = Handle.OutgoingQueue.Take(Token);
73-
List<SignalServiceAttachment> outgoingAttachmentsList = null;
74-
if (outgoingSignalMessage.Attachments != null && outgoingSignalMessage.Attachments.Count > 0)
84+
try
7585
{
76-
outgoingAttachmentsList = new List<SignalServiceAttachment>();
77-
foreach (var attachment in outgoingSignalMessage.Attachments)
78-
{
79-
try
80-
{
81-
var file = await ApplicationData.Current.LocalCacheFolder.GetFileAsync(@"Attachments\" + attachment.Id + ".plain");
82-
var stream = await file.OpenStreamForReadAsync();
83-
outgoingAttachmentsList.Add(SignalServiceAttachment.NewStreamBuilder()
84-
.WithContentType(attachment.ContentType)
85-
.WithStream(stream)
86-
.WithLength(stream.Length)
87-
.WithFileName(attachment.SentFileName)
88-
.Build());
89-
}
90-
catch (Exception e)
91-
{
92-
Logger.LogError($"HandleOutgoingMessages() failed to add attachment {attachment.Id}: {e.Message}\n{e.StackTrace}");
93-
}
94-
}
86+
var file = await ApplicationData.Current.LocalCacheFolder.GetFileAsync(@"Attachments\" + attachment.Id + ".plain");
87+
var stream = await file.OpenStreamForReadAsync();
88+
outgoingAttachmentsList.Add(SignalServiceAttachment.NewStreamBuilder()
89+
.WithContentType(attachment.ContentType)
90+
.WithStream(stream)
91+
.WithLength(stream.Length)
92+
.WithFileName(attachment.SentFileName)
93+
.Build());
9594
}
96-
97-
SignalServiceDataMessage message = new SignalServiceDataMessage()
98-
{
99-
Body = outgoingSignalMessage.Content.Content,
100-
Timestamp = outgoingSignalMessage.ComposedTimestamp,
101-
ExpiresInSeconds = (int)outgoingSignalMessage.ExpiresAt,
102-
Attachments = outgoingAttachmentsList
103-
};
104-
105-
if (!outgoingSignalMessage.ThreadId.EndsWith("="))
95+
catch (Exception e)
10696
{
107-
if (!Token.IsCancellationRequested)
108-
{
109-
await MessageSender.SendMessage(Token, new SignalServiceAddress(outgoingSignalMessage.ThreadId), message);
110-
outgoingSignalMessage.Status = SignalMessageStatus.Confirmed;
111-
}
97+
Logger.LogError($"HandleOutgoingMessages() failed to add attachment {attachment.Id}: {e.Message}\n{e.StackTrace}");
11298
}
113-
else
99+
}
100+
}
101+
102+
SignalServiceDataMessage message = new SignalServiceDataMessage()
103+
{
104+
Body = OutgoingSignalMessage.Content.Content,
105+
Timestamp = OutgoingSignalMessage.ComposedTimestamp,
106+
ExpiresInSeconds = (int)OutgoingSignalMessage.ExpiresAt,
107+
Attachments = outgoingAttachmentsList
108+
};
109+
110+
if (!OutgoingSignalMessage.ThreadId.EndsWith("="))
111+
{
112+
if (!token.IsCancellationRequested)
113+
{
114+
await messageSender.SendMessage(token, new SignalServiceAddress(OutgoingSignalMessage.ThreadId), message);
115+
OutgoingSignalMessage.Status = SignalMessageStatus.Confirmed;
116+
}
117+
}
118+
else
119+
{
120+
List<SignalServiceAddress> recipients = new List<SignalServiceAddress>();
121+
SignalGroup g = await SignalDBContext.GetOrCreateGroupLocked(OutgoingSignalMessage.ThreadId, 0);
122+
foreach (GroupMembership sc in g.GroupMemberships)
123+
{
124+
if (sc.Contact.ThreadId != SignalLibHandle.Instance.Store.Username)
114125
{
115-
List<SignalServiceAddress> recipients = new List<SignalServiceAddress>();
116-
SignalGroup g = await SignalDBContext.GetOrCreateGroupLocked(outgoingSignalMessage.ThreadId, 0);
117-
foreach (GroupMembership sc in g.GroupMemberships)
118-
{
119-
if (sc.Contact.ThreadId != SignalLibHandle.Instance.Store.Username)
120-
{
121-
recipients.Add(new SignalServiceAddress(sc.Contact.ThreadId));
122-
}
123-
}
124-
message.Group = new SignalServiceGroup()
125-
{
126-
GroupId = Base64.Decode(g.ThreadId),
127-
Type = SignalServiceGroup.GroupType.DELIVER
128-
};
129-
if (!Token.IsCancellationRequested)
130-
{
131-
await SendMessage(recipients, message);
132-
outgoingSignalMessage.Status = SignalMessageStatus.Confirmed;
133-
}
126+
recipients.Add(new SignalServiceAddress(sc.Contact.ThreadId));
134127
}
135128
}
129+
message.Group = new SignalServiceGroup()
130+
{
131+
GroupId = Base64.Decode(g.ThreadId),
132+
Type = SignalServiceGroup.GroupType.DELIVER
133+
};
134+
if (!token.IsCancellationRequested)
135+
{
136+
await messageSender.SendMessage(token, recipients, message);
137+
OutgoingSignalMessage.Status = SignalMessageStatus.Confirmed;
138+
}
139+
}
140+
}
141+
}
142+
143+
class OutgoingMessages
144+
{
145+
private readonly ILogger Logger = LibsignalLogging.CreateLogger<OutgoingMessages>();
146+
private readonly CancellationToken Token;
147+
private readonly SignalLibHandle Handle;
148+
private readonly SignalStore Store;
149+
private readonly Task<SignalServiceMessagePipe> CreatePipeTask;
150+
151+
public OutgoingMessages(CancellationToken token, Task<SignalServiceMessagePipe> createPipeTask, SignalStore store, SignalLibHandle handle)
152+
{
153+
Token = token;
154+
CreatePipeTask = createPipeTask;
155+
Store = store;
156+
Handle = handle;
157+
}
158+
159+
public async Task HandleOutgoingMessages()
160+
{
161+
Logger.LogDebug("HandleOutgoingMessages()");
162+
var messageSender = new SignalServiceMessageSender(Token, LibUtils.ServiceConfiguration, Store.Username, Store.Password, (int)Store.DeviceId, new Store(), await CreatePipeTask, null, LibUtils.USER_AGENT);
163+
while (!Token.IsCancellationRequested)
164+
{
165+
ISendable sendable = null;
166+
try
167+
{
168+
sendable = Handle.OutgoingQueue.Take(Token);
169+
await sendable.Send(messageSender, Token);
170+
}
136171
catch (OperationCanceledException)
137172
{
138173
Logger.LogInformation("HandleOutgoingMessages() finished");
139174
return;
140175
}
141176
catch (EncapsulatedExceptions exceptions)
142177
{
143-
outgoingSignalMessage.Status = SignalMessageStatus.Confirmed;
178+
sendable.Status = SignalMessageStatus.Confirmed;
144179
Logger.LogError("HandleOutgoingMessages() encountered libsignal exceptions");
145180
IList<UntrustedIdentityException> identityExceptions = exceptions.UntrustedIdentityExceptions;
146181
if (exceptions.NetworkExceptions.Count > 0)
147182
{
148-
outgoingSignalMessage.Status = SignalMessageStatus.Failed_Network;
183+
sendable.Status = SignalMessageStatus.Failed_Network;
149184
}
150185
if (identityExceptions.Count > 0)
151186
{
152-
outgoingSignalMessage.Status = SignalMessageStatus.Failed_Identity;
187+
sendable.Status = SignalMessageStatus.Failed_Identity;
153188
}
154189
foreach (UntrustedIdentityException e in identityExceptions)
155190
{
@@ -159,21 +194,21 @@ public async Task HandleOutgoingMessages()
159194
catch (RateLimitException)
160195
{
161196
Logger.LogError("HandleOutgoingMessages() could not send due to rate limits");
162-
outgoingSignalMessage.Status = SignalMessageStatus.Failed_Ratelimit;
197+
sendable.Status = SignalMessageStatus.Failed_Ratelimit;
163198
}
164199
catch (UntrustedIdentityException e)
165200
{
166201
Logger.LogError("HandleOutgoingMessages() could not send due to untrusted identities");
167-
outgoingSignalMessage.Status = SignalMessageStatus.Failed_Identity;
202+
sendable.Status = SignalMessageStatus.Failed_Identity;
168203
await Handle.HandleOutgoingKeyChangeLocked(e.E164number, Base64.EncodeBytes(e.IdentityKey.serialize()));
169204
}
170205
catch (Exception e)
171206
{
172207
var line = new StackTrace(e, true).GetFrames()[0].GetFileLineNumber();
173208
Logger.LogError("HandleOutgoingMessages() failed in line {0}: {1}\n{2}", line, e.Message, e.StackTrace);
174-
outgoingSignalMessage.Status = SignalMessageStatus.Failed_Unknown;
209+
sendable.Status = SignalMessageStatus.Failed_Unknown;
175210
}
176-
await Handle.HandleMessageSentLocked(outgoingSignalMessage);
211+
await Handle.HandleMessageSentLocked(sendable);
177212
}
178213
Logger.LogInformation("HandleOutgoingMessages() finished");
179214
}

0 commit comments

Comments
 (0)