Skip to content

Commit 3fd9a59

Browse files
authored
fix sessionStatus type, remove no connection -> no connection transition on server, extra invariant checks (#296)
## Why we get a very very low volume of message out of order (~40 messages across 15m connections) but these still suck lets see if the message ordering problem is present before we send too ## What changed add some extra invariant violation checks before message send fix some small session status type alignment and removed an extra state where we would emit an extra session transition for the transparent reconnect case ## Versioning - [ ] Breaking protocol change - [ ] Breaking ts/js API change <!-- Kind reminder to add tests and updated documentation if needed -->
1 parent 59c4145 commit 3fd9a59

File tree

11 files changed

+226
-47
lines changed

11 files changed

+226
-47
lines changed

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@replit/river",
33
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
4-
"version": "0.205.1",
4+
"version": "0.205.2",
55
"type": "module",
66
"exports": {
77
".": {

testUtil/fixtures/mockTransport.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@ export class InMemoryConnection extends Connection {
1515
constructor(pipe: Duplex) {
1616
super();
1717
this.conn = pipe;
18+
this.conn.allowHalfOpen = false;
1819

1920
this.conn.on('data', (data: Uint8Array) => {
2021
for (const cb of this.dataListeners) {
2122
cb(data);
2223
}
2324
});
2425

25-
this.conn.on('end', () => {
26+
this.conn.on('close', () => {
2627
for (const cb of this.closeListeners) {
2728
cb();
2829
}
@@ -46,6 +47,7 @@ export class InMemoryConnection extends Connection {
4647
close(): void {
4748
setImmediate(() => {
4849
this.conn.end();
50+
this.conn.emit('close');
4951
});
5052
}
5153
}
@@ -153,6 +155,7 @@ export function createMockTransportNetwork(
153155
simulatePhantomDisconnect() {
154156
for (const conn of Object.values(connections.get())) {
155157
conn.serverToClient.pause();
158+
conn.clientToServer.pause();
156159
}
157160
},
158161
async restartServer() {

transport/events.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { type Static } from '@sinclair/typebox';
22
import { Connection } from './connection';
33
import { OpaqueTransportMessage, HandshakeErrorResponseCodes } from './message';
44
import { Session, SessionState } from './sessionStateMachine';
5+
import { SessionId } from './sessionStateMachine/common';
56
import { TransportStatus } from './transport';
67

78
export const ProtocolError = {
@@ -26,11 +27,11 @@ export interface EventMap {
2627
session: Pick<Session<Connection>, 'id' | 'to'>;
2728
};
2829
sessionTransition:
29-
| { state: SessionState.Connected }
30-
| { state: SessionState.Handshaking }
31-
| { state: SessionState.Connecting }
32-
| { state: SessionState.BackingOff }
33-
| { state: SessionState.NoConnection };
30+
| { state: SessionState.Connected; id: SessionId }
31+
| { state: SessionState.Handshaking; id: SessionId }
32+
| { state: SessionState.Connecting; id: SessionId }
33+
| { state: SessionState.BackingOff; id: SessionId }
34+
| { state: SessionState.NoConnection; id: SessionId };
3435
protocolError:
3536
| {
3637
type: (typeof ProtocolError)['HandshakeFailed'];

transport/server.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -395,9 +395,8 @@ export abstract class ServerTransport<
395395
);
396396

397397
oldSession = noConnectionSession;
398+
this.updateSession(oldSession);
398399
}
399-
400-
this.updateSession(oldSession);
401400
} else if (oldSession) {
402401
connectCase = 'hard reconnection';
403402

transport/sessionStateMachine/SessionConnected.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
ControlMessageAckSchema,
55
OpaqueTransportMessage,
66
PartialTransportMessage,
7+
TransportMessage,
78
isAck,
89
} from '../message';
910
import {
@@ -48,10 +49,25 @@ export class SessionConnected<
4849
this.heartbeatMisses = 0;
4950
}
5051

52+
private assertSendOrdering(constructedMsg: TransportMessage) {
53+
if (constructedMsg.seq > this.seqSent + 1) {
54+
const msg = `invariant violation: would have sent out of order msg (seq: ${constructedMsg.seq}, expected: ${this.seqSent} + 1)`;
55+
this.log?.error(msg, {
56+
...this.loggingMetadata,
57+
transportMessage: constructedMsg,
58+
tags: ['invariant-violation'],
59+
});
60+
61+
throw new Error(msg);
62+
}
63+
}
64+
5165
send(msg: PartialTransportMessage): string {
5266
const constructedMsg = this.constructMsg(msg);
67+
this.assertSendOrdering(constructedMsg);
5368
this.sendBuffer.push(constructedMsg);
5469
this.conn.send(this.options.codec.toBuffer(constructedMsg));
70+
this.seqSent = constructedMsg.seq;
5571

5672
return constructedMsg.id;
5773
}
@@ -75,7 +91,9 @@ export class SessionConnected<
7591
);
7692

7793
for (const msg of this.sendBuffer) {
94+
this.assertSendOrdering(msg);
7895
this.conn.send(this.options.codec.toBuffer(msg));
96+
this.seqSent = msg.seq;
7997
}
8098
}
8199

@@ -165,7 +183,7 @@ export class SessionConnected<
165183
);
166184
} else {
167185
const reason = `received out-of-order msg, closing connection (got seq: ${parsedMsg.seq}, wanted seq: ${this.ack})`;
168-
this.log?.warn(reason, {
186+
this.log?.error(reason, {
169187
...this.loggingMetadata,
170188
transportMessage: parsedMsg,
171189
tags: ['invariant-violation'],

transport/sessionStateMachine/common.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ export interface IdentifiedSessionProps extends CommonSessionProps {
208208
to: TransportClientId;
209209
seq: number;
210210
ack: number;
211+
seqSent: number;
211212
sendBuffer: Array<OpaqueTransportMessage>;
212213
telemetry: TelemetryInfo;
213214
protocolVersion: ProtocolVersion;
@@ -224,15 +225,29 @@ export abstract class IdentifiedSession extends CommonSession {
224225
*/
225226
seq: number;
226227

228+
/**
229+
* Last seq we sent over the wire this session (excluding handshake) and retransmissions
230+
*/
231+
seqSent: number;
232+
227233
/**
228234
* Number of unique messages we've received this session (excluding handshake)
229235
*/
230236
ack: number;
231237
sendBuffer: Array<OpaqueTransportMessage>;
232238

233239
constructor(props: IdentifiedSessionProps) {
234-
const { id, to, seq, ack, sendBuffer, telemetry, log, protocolVersion } =
235-
props;
240+
const {
241+
id,
242+
to,
243+
seq,
244+
ack,
245+
sendBuffer,
246+
telemetry,
247+
log,
248+
protocolVersion,
249+
seqSent: messagesSent,
250+
} = props;
236251
super(props);
237252
this.id = id;
238253
this.to = to;
@@ -242,6 +257,7 @@ export abstract class IdentifiedSession extends CommonSession {
242257
this.telemetry = telemetry;
243258
this.log = log;
244259
this.protocolVersion = protocolVersion;
260+
this.seqSent = messagesSent;
245261
}
246262

247263
get loggingMetadata(): MessageMetadata {

transport/sessionStateMachine/stateMachine.test.ts

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1942,16 +1942,19 @@ describe('session state machine', () => {
19421942
expect(conn.send).toHaveBeenCalledTimes(0);
19431943

19441944
// send a heartbeat
1945-
conn.emitData(
1946-
session.options.codec.toBuffer(
1947-
session.constructMsg({
1948-
streamId: 'heartbeat',
1949-
controlFlags: ControlFlags.AckBit,
1950-
payload: {
1951-
type: 'ACK',
1952-
} satisfies Static<typeof ControlMessageAckSchema>,
1953-
}),
1954-
),
1945+
conn.onData(
1946+
session.options.codec.toBuffer({
1947+
id: 'msgid',
1948+
to: 'SERVER',
1949+
from: 'client',
1950+
seq: 0,
1951+
ack: 0,
1952+
streamId: 'heartbeat',
1953+
controlFlags: ControlFlags.AckBit,
1954+
payload: {
1955+
type: 'ACK',
1956+
} satisfies Static<typeof ControlMessageAckSchema>,
1957+
}),
19551958
);
19561959

19571960
// make sure the session acks the heartbeat
@@ -1964,16 +1967,19 @@ describe('session state machine', () => {
19641967
const conn = session.conn;
19651968

19661969
// send a heartbeat
1967-
conn.emitData(
1968-
session.options.codec.toBuffer(
1969-
session.constructMsg({
1970-
streamId: 'heartbeat',
1971-
controlFlags: ControlFlags.AckBit,
1972-
payload: {
1973-
type: 'ACK',
1974-
} satisfies Static<typeof ControlMessageAckSchema>,
1975-
}),
1976-
),
1970+
conn.onData(
1971+
session.options.codec.toBuffer({
1972+
id: 'msgid',
1973+
to: 'SERVER',
1974+
from: 'client',
1975+
seq: 0,
1976+
ack: 0,
1977+
streamId: 'heartbeat',
1978+
controlFlags: ControlFlags.AckBit,
1979+
payload: {
1980+
type: 'ACK',
1981+
} satisfies Static<typeof ControlMessageAckSchema>,
1982+
}),
19771983
);
19781984

19791985
expect(sessionHandle.onMessage).not.toHaveBeenCalled();

transport/sessionStateMachine/transitions.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ function inheritSharedSession(
5050
to: session.to,
5151
seq: session.seq,
5252
ack: session.ack,
53+
seqSent: session.seqSent,
5354
sendBuffer: session.sendBuffer,
5455
telemetry: session.telemetry,
5556
options: session.options,
@@ -90,6 +91,7 @@ export const SessionStateGraph = {
9091
to,
9192
seq: 0,
9293
ack: 0,
94+
seqSent: 0,
9395
graceExpiryTime: Date.now() + options.sessionDisconnectGraceMs,
9496
sendBuffer,
9597
telemetry,
@@ -251,12 +253,13 @@ export const SessionStateGraph = {
251253
? // old session exists, inherit state
252254
inheritSharedSession(oldSession)
253255
: // old session does not exist, create new state
254-
{
256+
({
255257
id: sessionId,
256258
from,
257259
to,
258260
seq: 0,
259261
ack: 0,
262+
seqSent: 0,
260263
sendBuffer: [],
261264
telemetry: createSessionTelemetryInfo(
262265
pendingSession.tracer,
@@ -269,7 +272,7 @@ export const SessionStateGraph = {
269272
tracer: pendingSession.tracer,
270273
log: pendingSession.log,
271274
protocolVersion,
272-
};
275+
} satisfies IdentifiedSessionProps);
273276

274277
pendingSession._handleStateExit();
275278
oldSession?._handleStateExit();

0 commit comments

Comments
 (0)