Skip to content

Commit f1b9ac4

Browse files
authored
add more debug for message ordering violation (#300)
## Why still getting `invariant violation: would have sent out of order msg` ## What changed track last 10 sent messages along with their stack trace this unfortunately degrades message throughput from ~46krps to ~16kps but should be more than enough for prod as is, we just need to find the bug and fix it and then remove this again ## Versioning - [ ] Breaking protocol change - [ ] Breaking ts/js API change <!-- Kind reminder to add tests and updated documentation if needed -->
1 parent a211674 commit f1b9ac4

File tree

3 files changed

+31
-11
lines changed

3 files changed

+31
-11
lines changed

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@replit/river",
33
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
4-
"version": "0.207.1",
4+
"version": "0.207.2",
55
"type": "module",
66
"exports": {
77
".": {

transport/sessionStateMachine/SessionConnected.ts

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ export interface SessionConnectedProps<ConnType extends Connection>
2828
listeners: SessionConnectedListeners;
2929
}
3030

31+
interface TrackedMsg {
32+
id: string;
33+
seq: number;
34+
streamId: string;
35+
stack?: string;
36+
}
37+
3138
/*
3239
* A session that is connected and can send and receive messages.
3340
* See transitions.ts for valid transitions.
@@ -43,6 +50,22 @@ export class SessionConnected<
4350
private heartbeatMisses = 0;
4451
isActivelyHeartbeating: boolean;
4552

53+
private lastConstructedMsgs: Array<TrackedMsg> = [];
54+
private pushLastConstructedMsgs = (msg: OpaqueTransportMessage) => {
55+
const trackedMsg = {
56+
id: msg.id,
57+
seq: msg.seq,
58+
streamId: msg.streamId,
59+
stack: new Error().stack,
60+
};
61+
62+
this.lastConstructedMsgs.push(trackedMsg);
63+
64+
if (this.lastConstructedMsgs.length > 10) {
65+
this.lastConstructedMsgs.shift();
66+
}
67+
};
68+
4669
updateBookkeeping(ack: number, seq: number) {
4770
this.sendBuffer = this.sendBuffer.filter((unacked) => unacked.seq >= ack);
4871
this.ack = seq + 1;
@@ -56,6 +79,9 @@ export class SessionConnected<
5679
...this.loggingMetadata,
5780
transportMessage: constructedMsg,
5881
tags: ['invariant-violation'],
82+
extras: {
83+
lastConstructedMsgs: this.lastConstructedMsgs,
84+
},
5985
});
6086

6187
throw new Error(msg);
@@ -64,6 +90,7 @@ export class SessionConnected<
6490

6591
send(msg: PartialTransportMessage): string {
6692
const constructedMsg = this.constructMsg(msg);
93+
this.pushLastConstructedMsgs(constructedMsg);
6794
this.assertSendOrdering(constructedMsg);
6895
this.sendBuffer.push(constructedMsg);
6996
this.conn.send(this.options.codec.toBuffer(constructedMsg));
@@ -156,13 +183,6 @@ export class SessionConnected<
156183
});
157184
}
158185

159-
closeConnection() {
160-
this.conn.removeDataListener(this.onMessageData);
161-
this.conn.removeCloseListener(this.listeners.onConnectionClosed);
162-
this.conn.removeErrorListener(this.listeners.onConnectionErrored);
163-
this.conn.close();
164-
}
165-
166186
onMessageData = (msg: Uint8Array) => {
167187
const parsedMsg = this.parseMsg(msg);
168188
if (parsedMsg === null) {
@@ -196,7 +216,7 @@ export class SessionConnected<
196216

197217
// try to recover by closing the connection and re-handshaking
198218
// with the session intact
199-
this.closeConnection();
219+
this.conn.close();
200220
}
201221

202222
return;

0 commit comments

Comments
 (0)