Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@replit/river",
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
"version": "0.205.1",
"version": "0.205.2",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this feels a bit minory: some args of callbacks change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the interface change is backwards compatible (we passed the session field when the type didnt say it had a session field, now we declare a new id field which we do pass properly)

"type": "module",
"exports": {
".": {
Expand Down
5 changes: 4 additions & 1 deletion testUtil/fixtures/mockTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ export class InMemoryConnection extends Connection {
constructor(pipe: Duplex) {
super();
this.conn = pipe;
this.conn.allowHalfOpen = false;

this.conn.on('data', (data: Uint8Array) => {
for (const cb of this.dataListeners) {
cb(data);
}
});

this.conn.on('end', () => {
this.conn.on('close', () => {
for (const cb of this.closeListeners) {
cb();
}
Expand All @@ -46,6 +47,7 @@ export class InMemoryConnection extends Connection {
close(): void {
setImmediate(() => {
this.conn.end();
this.conn.emit('close');
});
}
}
Expand Down Expand Up @@ -153,6 +155,7 @@ export function createMockTransportNetwork(
simulatePhantomDisconnect() {
for (const conn of Object.values(connections.get())) {
conn.serverToClient.pause();
conn.clientToServer.pause();
}
},
async restartServer() {
Expand Down
11 changes: 6 additions & 5 deletions transport/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { type Static } from '@sinclair/typebox';
import { Connection } from './connection';
import { OpaqueTransportMessage, HandshakeErrorResponseCodes } from './message';
import { Session, SessionState } from './sessionStateMachine';
import { SessionId } from './sessionStateMachine/common';
import { TransportStatus } from './transport';

export const ProtocolError = {
Expand All @@ -26,11 +27,11 @@ export interface EventMap {
session: Pick<Session<Connection>, 'id' | 'to'>;
};
sessionTransition:
| { state: SessionState.Connected }
| { state: SessionState.Handshaking }
| { state: SessionState.Connecting }
| { state: SessionState.BackingOff }
| { state: SessionState.NoConnection };
| { state: SessionState.Connected; id: SessionId }
| { state: SessionState.Handshaking; id: SessionId }
| { state: SessionState.Connecting; id: SessionId }
| { state: SessionState.BackingOff; id: SessionId }
| { state: SessionState.NoConnection; id: SessionId };
protocolError:
| {
type: (typeof ProtocolError)['HandshakeFailed'];
Expand Down
3 changes: 1 addition & 2 deletions transport/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,8 @@ export abstract class ServerTransport<
);

oldSession = noConnectionSession;
this.updateSession(oldSession);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there any implications for moving this update up into the if block?

}

this.updateSession(oldSession);
} else if (oldSession) {
connectCase = 'hard reconnection';

Expand Down
20 changes: 19 additions & 1 deletion transport/sessionStateMachine/SessionConnected.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
ControlMessageAckSchema,
OpaqueTransportMessage,
PartialTransportMessage,
TransportMessage,
isAck,
} from '../message';
import {
Expand Down Expand Up @@ -48,10 +49,25 @@ export class SessionConnected<
this.heartbeatMisses = 0;
}

private assertSendOrdering(constructedMsg: TransportMessage) {
if (constructedMsg.seq > this.seqSent + 1) {
const msg = `invariant violation: would have sent out of order msg (seq: ${constructedMsg.seq}, expected: ${this.seqSent} + 1)`;
this.log?.error(msg, {
...this.loggingMetadata,
transportMessage: constructedMsg,
tags: ['invariant-violation'],
});

throw new Error(msg);
}
}

send(msg: PartialTransportMessage): string {
const constructedMsg = this.constructMsg(msg);
this.assertSendOrdering(constructedMsg);
this.sendBuffer.push(constructedMsg);
this.conn.send(this.options.codec.toBuffer(constructedMsg));
this.seqSent = constructedMsg.seq;

return constructedMsg.id;
}
Expand All @@ -75,7 +91,9 @@ export class SessionConnected<
);

for (const msg of this.sendBuffer) {
this.assertSendOrdering(msg);
this.conn.send(this.options.codec.toBuffer(msg));
this.seqSent = msg.seq;
}
}

Expand Down Expand Up @@ -165,7 +183,7 @@ export class SessionConnected<
);
} else {
const reason = `received out-of-order msg, closing connection (got seq: ${parsedMsg.seq}, wanted seq: ${this.ack})`;
this.log?.warn(reason, {
this.log?.error(reason, {
...this.loggingMetadata,
transportMessage: parsedMsg,
tags: ['invariant-violation'],
Expand Down
20 changes: 18 additions & 2 deletions transport/sessionStateMachine/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ export interface IdentifiedSessionProps extends CommonSessionProps {
to: TransportClientId;
seq: number;
ack: number;
seqSent: number;
sendBuffer: Array<OpaqueTransportMessage>;
telemetry: TelemetryInfo;
protocolVersion: ProtocolVersion;
Expand All @@ -224,15 +225,29 @@ export abstract class IdentifiedSession extends CommonSession {
*/
seq: number;

/**
* Last seq we sent over the wire this session (excluding handshake) and retransmissions
*/
seqSent: number;

/**
* Number of unique messages we've received this session (excluding handshake)
*/
ack: number;
sendBuffer: Array<OpaqueTransportMessage>;

constructor(props: IdentifiedSessionProps) {
const { id, to, seq, ack, sendBuffer, telemetry, log, protocolVersion } =
props;
const {
id,
to,
seq,
ack,
sendBuffer,
telemetry,
log,
protocolVersion,
seqSent: messagesSent,
} = props;
super(props);
this.id = id;
this.to = to;
Expand All @@ -242,6 +257,7 @@ export abstract class IdentifiedSession extends CommonSession {
this.telemetry = telemetry;
this.log = log;
this.protocolVersion = protocolVersion;
this.seqSent = messagesSent;
}

get loggingMetadata(): MessageMetadata {
Expand Down
46 changes: 26 additions & 20 deletions transport/sessionStateMachine/stateMachine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1942,16 +1942,19 @@ describe('session state machine', () => {
expect(conn.send).toHaveBeenCalledTimes(0);

// send a heartbeat
conn.emitData(
session.options.codec.toBuffer(
session.constructMsg({
streamId: 'heartbeat',
controlFlags: ControlFlags.AckBit,
payload: {
type: 'ACK',
} satisfies Static<typeof ControlMessageAckSchema>,
}),
),
conn.onData(
session.options.codec.toBuffer({
id: 'msgid',
to: 'SERVER',
from: 'client',
seq: 0,
ack: 0,
streamId: 'heartbeat',
controlFlags: ControlFlags.AckBit,
payload: {
type: 'ACK',
} satisfies Static<typeof ControlMessageAckSchema>,
}),
);

// make sure the session acks the heartbeat
Expand All @@ -1964,16 +1967,19 @@ describe('session state machine', () => {
const conn = session.conn;

// send a heartbeat
conn.emitData(
session.options.codec.toBuffer(
session.constructMsg({
streamId: 'heartbeat',
controlFlags: ControlFlags.AckBit,
payload: {
type: 'ACK',
} satisfies Static<typeof ControlMessageAckSchema>,
}),
),
conn.onData(
session.options.codec.toBuffer({
id: 'msgid',
to: 'SERVER',
from: 'client',
seq: 0,
ack: 0,
streamId: 'heartbeat',
controlFlags: ControlFlags.AckBit,
payload: {
type: 'ACK',
} satisfies Static<typeof ControlMessageAckSchema>,
}),
);

expect(sessionHandle.onMessage).not.toHaveBeenCalled();
Expand Down
7 changes: 5 additions & 2 deletions transport/sessionStateMachine/transitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ function inheritSharedSession(
to: session.to,
seq: session.seq,
ack: session.ack,
seqSent: session.seqSent,
sendBuffer: session.sendBuffer,
telemetry: session.telemetry,
options: session.options,
Expand Down Expand Up @@ -90,6 +91,7 @@ export const SessionStateGraph = {
to,
seq: 0,
ack: 0,
seqSent: 0,
graceExpiryTime: Date.now() + options.sessionDisconnectGraceMs,
sendBuffer,
telemetry,
Expand Down Expand Up @@ -251,12 +253,13 @@ export const SessionStateGraph = {
? // old session exists, inherit state
inheritSharedSession(oldSession)
: // old session does not exist, create new state
{
({
id: sessionId,
from,
to,
seq: 0,
ack: 0,
seqSent: 0,
sendBuffer: [],
telemetry: createSessionTelemetryInfo(
pendingSession.tracer,
Expand All @@ -269,7 +272,7 @@ export const SessionStateGraph = {
tracer: pendingSession.tracer,
log: pendingSession.log,
protocolVersion,
};
} satisfies IdentifiedSessionProps);

pendingSession._handleStateExit();
oldSession?._handleStateExit();
Expand Down
Loading