Skip to content

Commit c862836

Browse files
Fix dropping messages on reconnect (#50409)
* Fix dropping messages on reconnect * fb * Update src/SignalR/common/Shared/MessageBuffer.cs Co-authored-by: Stephen Halter <[email protected]> --------- Co-authored-by: Stephen Halter <[email protected]>
1 parent bd72227 commit c862836

File tree

5 files changed

+17
-48
lines changed

5 files changed

+17
-48
lines changed

src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1137,7 +1137,6 @@ private async Task SendWithLock(ConnectionState expectedConnectionState, HubMess
11371137
break;
11381138
case SequenceMessage sequenceMessage:
11391139
Log.ReceivedSequenceMessage(_logger, sequenceMessage.SequenceId);
1140-
connectionState.ResetSequence(sequenceMessage);
11411140
break;
11421141
default:
11431142
throw new InvalidOperationException($"Unexpected message type: {message.GetType().FullName}");
@@ -2100,14 +2099,6 @@ public Task AckAsync(AckMessage ackMessage)
21002099
return Task.CompletedTask;
21012100
}
21022101

2103-
public void ResetSequence(SequenceMessage sequenceMessage)
2104-
{
2105-
if (UsingAcks())
2106-
{
2107-
_messageBuffer.ResetSequence(sequenceMessage);
2108-
}
2109-
}
2110-
21112102
[MemberNotNullWhen(true, nameof(_messageBuffer))]
21122103
public bool UsingAcks() => _messageBuffer is not null;
21132104

src/SignalR/common/Shared/MessageBuffer.cs

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ internal sealed class MessageBuffer : IDisposable
4141
private PipeWriter _writer;
4242

4343
private long _totalMessageCount;
44-
private bool _waitForSequenceMessage;
4544

4645
// Message IDs start at 1 and always increment by 1
4746
private long _currentReceivingSequenceId = 1;
@@ -210,19 +209,20 @@ public async Task AckAsync(AckMessage ackMessage)
210209

211210
internal bool ShouldProcessMessage(HubMessage message)
212211
{
213-
// TODO: if we're expecting a sequence message but get here should we error or ignore or maybe even continue to process them?
214-
if (_waitForSequenceMessage)
212+
if (message is SequenceMessage sequenceMessage)
215213
{
216-
if (message is SequenceMessage)
217-
{
218-
_waitForSequenceMessage = false;
219-
return true;
220-
}
221-
else
214+
// TODO: is a sequence message expected right now?
215+
216+
if (sequenceMessage.SequenceId > _currentReceivingSequenceId)
222217
{
223-
// ignore messages received while waiting for sequence message
224-
return false;
218+
throw new InvalidOperationException("Sequence ID greater than amount of messages we've received.");
225219
}
220+
221+
_currentReceivingSequenceId = sequenceMessage.SequenceId;
222+
223+
// Technically handled by the 'is not HubInvocationMessage' check, but this is future proofing in case that check changes
224+
// SequenceMessage should not be counted towards ackable messages
225+
return true;
226226
}
227227

228228
// Only care about messages implementing HubInvocationMessage currently (e.g. ignore ping, close, ack, sequence)
@@ -233,6 +233,7 @@ internal bool ShouldProcessMessage(HubMessage message)
233233
}
234234

235235
var currentId = _currentReceivingSequenceId;
236+
// ShouldProcessMessage is never called in parallel and is the only method referencing _currentReceivingSequenceId
236237
_currentReceivingSequenceId++;
237238
if (currentId <= _latestReceivedSequenceId)
238239
{
@@ -244,21 +245,8 @@ internal bool ShouldProcessMessage(HubMessage message)
244245
return true;
245246
}
246247

247-
internal void ResetSequence(SequenceMessage sequenceMessage)
248-
{
249-
// TODO: is a sequence message expected right now?
250-
251-
if (sequenceMessage.SequenceId > _currentReceivingSequenceId)
252-
{
253-
throw new InvalidOperationException("Sequence ID greater than amount of messages we've received.");
254-
}
255-
_currentReceivingSequenceId = sequenceMessage.SequenceId;
256-
}
257-
258248
internal async Task ResendAsync(PipeWriter writer)
259249
{
260-
_waitForSequenceMessage = true;
261-
262250
var tcs = new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously);
263251
_resend = tcs;
264252

src/SignalR/server/Core/src/HubConnectionContext.cs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -802,12 +802,4 @@ internal bool ShouldProcessMessage(HubMessage message)
802802
}
803803
return true;
804804
}
805-
806-
internal void ResetSequence(SequenceMessage sequenceMessage)
807-
{
808-
if (UsingStatefulReconnect())
809-
{
810-
_messageBuffer.ResetSequence(sequenceMessage);
811-
}
812-
}
813805
}

src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe
198198

199199
case SequenceMessage sequenceMessage:
200200
Log.ReceivedSequenceMessage(_logger, sequenceMessage.SequenceId);
201-
connection.ResetSequence(sequenceMessage);
202201
break;
203202

204203
case CloseMessage closeMessage:

src/SignalR/server/SignalR/test/Internal/MessageBufferTests.cs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,8 @@ public async Task UnAckedMessageResentOnReconnect()
142142
DuplexPipe.UpdateConnectionPair(ref pipes, connection);
143143
await messageBuffer.ResendAsync(pipes.Transport.Output);
144144

145-
// Any message except SequenceMessage will be ignored until a SequenceMessage is received
146-
Assert.False(messageBuffer.ShouldProcessMessage(PingMessage.Instance));
147-
Assert.False(messageBuffer.ShouldProcessMessage(CompletionMessage.WithResult("1", null)));
145+
Assert.True(messageBuffer.ShouldProcessMessage(PingMessage.Instance));
146+
Assert.True(messageBuffer.ShouldProcessMessage(CompletionMessage.WithResult("1", null)));
148147
Assert.True(messageBuffer.ShouldProcessMessage(new SequenceMessage(1)));
149148

150149
res = await pipes.Application.Input.ReadAsync();
@@ -164,10 +163,10 @@ public async Task UnAckedMessageResentOnReconnect()
164163

165164
pipes.Application.Input.AdvanceTo(buffer.Start);
166165

167-
messageBuffer.ResetSequence(new SequenceMessage(1));
166+
messageBuffer.ShouldProcessMessage(new SequenceMessage(1));
168167

169168
Assert.True(messageBuffer.ShouldProcessMessage(PingMessage.Instance));
170-
Assert.True(messageBuffer.ShouldProcessMessage(CompletionMessage.WithResult("1", null)));
169+
Assert.False(messageBuffer.ShouldProcessMessage(CompletionMessage.WithResult("1", null)));
171170
}
172171

173172
[Fact]
@@ -235,7 +234,7 @@ public async Task ReceiveSequenceMessageWithLargerIDThanMessagesReceived()
235234

236235
pipes.Application.Input.AdvanceTo(buffer.Start);
237236

238-
Assert.Throws<InvalidOperationException>(() => messageBuffer.ResetSequence(new SequenceMessage(2)));
237+
Assert.Throws<InvalidOperationException>(() => messageBuffer.ShouldProcessMessage(new SequenceMessage(2)));
239238
}
240239

241240
[Fact]

0 commit comments

Comments
 (0)