diff --git a/package-lock.json b/package-lock.json index 4f8d9d58..37d5f767 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@replit/river", - "version": "0.207.1", + "version": "0.207.2", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@replit/river", - "version": "0.207.1", + "version": "0.207.2", "license": "MIT", "dependencies": { "@msgpack/msgpack": "^3.0.0-beta2", diff --git a/package.json b/package.json index 17dba475..6866a298 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@replit/river", "description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!", - "version": "0.207.1", + "version": "0.207.2", "type": "module", "exports": { ".": { diff --git a/transport/sessionStateMachine/SessionConnected.ts b/transport/sessionStateMachine/SessionConnected.ts index 60fc90e8..5a9253fc 100644 --- a/transport/sessionStateMachine/SessionConnected.ts +++ b/transport/sessionStateMachine/SessionConnected.ts @@ -28,6 +28,13 @@ export interface SessionConnectedProps listeners: SessionConnectedListeners; } +interface TrackedMsg { + id: string; + seq: number; + streamId: string; + stack?: string; +} + /* * A session that is connected and can send and receive messages. * See transitions.ts for valid transitions. @@ -43,6 +50,22 @@ export class SessionConnected< private heartbeatMisses = 0; isActivelyHeartbeating: boolean; + private lastConstructedMsgs: Array = []; + private pushLastConstructedMsgs = (msg: OpaqueTransportMessage) => { + const trackedMsg = { + id: msg.id, + seq: msg.seq, + streamId: msg.streamId, + stack: new Error().stack, + }; + + this.lastConstructedMsgs.push(trackedMsg); + + if (this.lastConstructedMsgs.length > 10) { + this.lastConstructedMsgs.shift(); + } + }; + updateBookkeeping(ack: number, seq: number) { this.sendBuffer = this.sendBuffer.filter((unacked) => unacked.seq >= ack); this.ack = seq + 1; @@ -56,6 +79,9 @@ export class SessionConnected< ...this.loggingMetadata, transportMessage: constructedMsg, tags: ['invariant-violation'], + extras: { + lastConstructedMsgs: this.lastConstructedMsgs, + }, }); throw new Error(msg); @@ -64,6 +90,7 @@ export class SessionConnected< send(msg: PartialTransportMessage): string { const constructedMsg = this.constructMsg(msg); + this.pushLastConstructedMsgs(constructedMsg); this.assertSendOrdering(constructedMsg); this.sendBuffer.push(constructedMsg); this.conn.send(this.options.codec.toBuffer(constructedMsg)); @@ -156,13 +183,6 @@ export class SessionConnected< }); } - closeConnection() { - this.conn.removeDataListener(this.onMessageData); - this.conn.removeCloseListener(this.listeners.onConnectionClosed); - this.conn.removeErrorListener(this.listeners.onConnectionErrored); - this.conn.close(); - } - onMessageData = (msg: Uint8Array) => { const parsedMsg = this.parseMsg(msg); if (parsedMsg === null) { @@ -196,7 +216,7 @@ export class SessionConnected< // try to recover by closing the connection and re-handshaking // with the session intact - this.closeConnection(); + this.conn.close(); } return;