From 3627c1c59958685f38cffd085ce055e41d9fb9ce Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Fri, 28 Mar 2025 10:49:54 -0700 Subject: [PATCH 1/6] moar checks --- .../sessionStateMachine/SessionConnected.ts | 16 ++++++++++++++++ transport/sessionStateMachine/common.ts | 4 ++++ 2 files changed, 20 insertions(+) diff --git a/transport/sessionStateMachine/SessionConnected.ts b/transport/sessionStateMachine/SessionConnected.ts index f224585f..13f764a6 100644 --- a/transport/sessionStateMachine/SessionConnected.ts +++ b/transport/sessionStateMachine/SessionConnected.ts @@ -43,6 +43,14 @@ export class SessionConnected< private heartbeatMisses = 0; isActivelyHeartbeating: boolean; + private lastConstructedMsgs: Array = []; + private pushLastConstructedMsgs = (msg: OpaqueTransportMessage) => { + this.lastConstructedMsgs.push(msg); + if (this.lastConstructedMsgs.length > 10) { + this.lastConstructedMsgs.shift(); + } + }; + updateBookkeeping(ack: number, seq: number) { this.sendBuffer = this.sendBuffer.filter((unacked) => unacked.seq >= ack); this.ack = seq + 1; @@ -56,6 +64,13 @@ export class SessionConnected< ...this.loggingMetadata, transportMessage: constructedMsg, tags: ['invariant-violation'], + extras: { + lastConstructedMsgs: this.lastConstructedMsgs.map((msg) => ({ + id: msg.id, + seq: msg.seq, + streamId: msg.streamId, + })), + }, }); throw new Error(msg); @@ -64,6 +79,7 @@ export class SessionConnected< send(msg: PartialTransportMessage): string { const constructedMsg = this.constructMsg(msg); + this.pushLastConstructedMsgs(constructedMsg); this.assertSendOrdering(constructedMsg); this.sendBuffer.push(constructedMsg); this.conn.send(this.options.codec.toBuffer(constructedMsg)); diff --git a/transport/sessionStateMachine/common.ts b/transport/sessionStateMachine/common.ts index 8a53d981..cfbbebc8 100644 --- a/transport/sessionStateMachine/common.ts +++ b/transport/sessionStateMachine/common.ts @@ -281,6 +281,10 @@ export abstract class IdentifiedSession extends CommonSession { constructMsg( partialMsg: PartialTransportMessage, ): TransportMessage { + if (this._isConsumed) { + throw new Error(ERR_CONSUMED); + } + const msg = { ...partialMsg, id: generateId(), From d201d46275e04f241b1af3c48179dad4bf8c0552 Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Fri, 28 Mar 2025 11:12:18 -0700 Subject: [PATCH 2/6] 0.207.1 --- package-lock.json | 4 ++-- package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index 50f492b4..4f8d9d58 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@replit/river", - "version": "0.207.0", + "version": "0.207.1", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@replit/river", - "version": "0.207.0", + "version": "0.207.1", "license": "MIT", "dependencies": { "@msgpack/msgpack": "^3.0.0-beta2", diff --git a/package.json b/package.json index 1f47db2c..17dba475 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@replit/river", "description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!", - "version": "0.207.0", + "version": "0.207.1", "type": "module", "exports": { ".": { From 90defd0846afa7ae4106ac5eee37e93b2f0c8d7b Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Fri, 28 Mar 2025 14:16:10 -0700 Subject: [PATCH 3/6] try immediate --- .../sessionStateMachine/SessionConnected.ts | 21 ++++--------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/transport/sessionStateMachine/SessionConnected.ts b/transport/sessionStateMachine/SessionConnected.ts index 13f764a6..09b53423 100644 --- a/transport/sessionStateMachine/SessionConnected.ts +++ b/transport/sessionStateMachine/SessionConnected.ts @@ -43,14 +43,6 @@ export class SessionConnected< private heartbeatMisses = 0; isActivelyHeartbeating: boolean; - private lastConstructedMsgs: Array = []; - private pushLastConstructedMsgs = (msg: OpaqueTransportMessage) => { - this.lastConstructedMsgs.push(msg); - if (this.lastConstructedMsgs.length > 10) { - this.lastConstructedMsgs.shift(); - } - }; - updateBookkeeping(ack: number, seq: number) { this.sendBuffer = this.sendBuffer.filter((unacked) => unacked.seq >= ack); this.ack = seq + 1; @@ -64,13 +56,6 @@ export class SessionConnected< ...this.loggingMetadata, transportMessage: constructedMsg, tags: ['invariant-violation'], - extras: { - lastConstructedMsgs: this.lastConstructedMsgs.map((msg) => ({ - id: msg.id, - seq: msg.seq, - streamId: msg.streamId, - })), - }, }); throw new Error(msg); @@ -79,7 +64,6 @@ export class SessionConnected< send(msg: PartialTransportMessage): string { const constructedMsg = this.constructMsg(msg); - this.pushLastConstructedMsgs(constructedMsg); this.assertSendOrdering(constructedMsg); this.sendBuffer.push(constructedMsg); this.conn.send(this.options.codec.toBuffer(constructedMsg)); @@ -242,7 +226,10 @@ export class SessionConnected< // if we are not actively heartbeating, we are in passive // heartbeat mode and should send a response to the ack if (!this.isActivelyHeartbeating) { - this.sendHeartbeat(); + // purposefully make this async to avoid weird browser behavior + void Promise.resolve().then(() => { + this.sendHeartbeat(); + }); } }; From 965a7e10e410724c93407c9cb68656edf815b645 Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Fri, 28 Mar 2025 14:20:38 -0700 Subject: [PATCH 4/6] update test --- transport/sessionStateMachine/stateMachine.test.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/transport/sessionStateMachine/stateMachine.test.ts b/transport/sessionStateMachine/stateMachine.test.ts index 1792d000..894f41ab 100644 --- a/transport/sessionStateMachine/stateMachine.test.ts +++ b/transport/sessionStateMachine/stateMachine.test.ts @@ -1958,7 +1958,9 @@ describe('session state machine', () => { ); // make sure the session acks the heartbeat - expect(conn.send).toHaveBeenCalledTimes(1); + await waitFor(() => { + expect(conn.send).toHaveBeenCalledTimes(1); + }); }); test('does not dispatch acks', async () => { From 565aeb93e8520a84f1405763e780bd98230bd04f Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Fri, 28 Mar 2025 14:23:20 -0700 Subject: [PATCH 5/6] fix doc comment --- transport/sessionStateMachine/SessionConnected.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/transport/sessionStateMachine/SessionConnected.ts b/transport/sessionStateMachine/SessionConnected.ts index 09b53423..dcbf2602 100644 --- a/transport/sessionStateMachine/SessionConnected.ts +++ b/transport/sessionStateMachine/SessionConnected.ts @@ -84,8 +84,7 @@ export class SessionConnected< // send any buffered messages if (this.sendBuffer.length > 0) { this.log?.info( - `sending ${ - this.sendBuffer.length + `sending ${this.sendBuffer.length } buffered messages, starting at seq ${this.nextSeq()}`, this.loggingMetadata, ); @@ -227,6 +226,9 @@ export class SessionConnected< // heartbeat mode and should send a response to the ack if (!this.isActivelyHeartbeating) { // purposefully make this async to avoid weird browser behavior + // where _some_ browsers will decide that it is ok to interrupt fully + // synchronous code execution (e.g. an existing .send) to receive a + // websocket message and hit this codepath void Promise.resolve().then(() => { this.sendHeartbeat(); }); From 0af9535b204c4c2f7168747f6965b372538a9566 Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Fri, 28 Mar 2025 14:24:46 -0700 Subject: [PATCH 6/6] fmt --- transport/sessionStateMachine/SessionConnected.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/transport/sessionStateMachine/SessionConnected.ts b/transport/sessionStateMachine/SessionConnected.ts index dcbf2602..60fc90e8 100644 --- a/transport/sessionStateMachine/SessionConnected.ts +++ b/transport/sessionStateMachine/SessionConnected.ts @@ -84,7 +84,8 @@ export class SessionConnected< // send any buffered messages if (this.sendBuffer.length > 0) { this.log?.info( - `sending ${this.sendBuffer.length + `sending ${ + this.sendBuffer.length } buffered messages, starting at seq ${this.nextSeq()}`, this.loggingMetadata, );