Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,14 @@ If your application is stateful on either the server or the client, the service

```ts
transport.addEventListener('sessionStatus', (evt) => {
if (evt.status === 'connect') {
if (evt.status === 'created') {
// do something
} else if (evt.status === 'disconnect') {
// do something else
} else if (evt.status === 'closing') {
// do other things
} else if (evt.status === 'closed') {
// note that evt.session only has id + to
// this is useful for doing things like creating a new session if
// a session just got yanked
}
});

Expand Down
2 changes: 1 addition & 1 deletion __tests__/negative.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ describe('should handle incompatabilities', async () => {
expect(errMock).toHaveBeenCalledTimes(0);
expect(spy).toHaveBeenCalledWith(
expect.objectContaining({
status: 'connect',
status: 'created',
}),
);

Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@replit/river",
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
"version": "0.204.0",
"version": "0.205.0",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1️⃣ wen

if there's no good answer, let's do it on Apr 1 (pls also blogpost)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes i'd like to! faris thinks that we need more docs but tbd on what we still need there

"type": "module",
"exports": {
".": {
Expand Down
2 changes: 1 addition & 1 deletion router/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ function handleProc(

function onSessionStatus(evt: EventMap['sessionStatus']) {
if (
evt.status !== 'disconnect' ||
evt.status !== 'closing' ||
evt.session.to !== serverId ||
session.id !== evt.session.id
) {
Expand Down
2 changes: 1 addition & 1 deletion router/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class RiverServer<Services extends AnyServiceSchemaMap>
};

const handleSessionStatus = (evt: EventMap['sessionStatus']) => {
if (evt.status !== 'disconnect') return;
if (evt.status !== 'closing') return;

const disconnectedClientId = evt.session.to;
this.log?.info(
Expand Down
23 changes: 15 additions & 8 deletions testUtil/fixtures/matrix.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ interface TestMatrixEntry {
/**
* Defines a selector type that pairs a valid transport with a valid codec.
*/
type Selector = [ValidTransports, ValidCodecs];
type Selector = [ValidTransports | 'all', ValidCodecs | 'all'];

/**
* Generates a matrix of test entries for each combination of transport and codec.
Expand All @@ -26,16 +26,23 @@ type Selector = [ValidTransports, ValidCodecs];
* @param selector An optional tuple specifying a transport and codec to filter the matrix.
* @returns An array of TestMatrixEntry objects representing the combinations of transport and codec.
*/
export const testMatrix = (selector?: Selector): Array<TestMatrixEntry> =>
transports
export const testMatrix = (
[transportSelector, codecSelector]: Selector = ['all', 'all'],
): Array<TestMatrixEntry> => {
const filteredTransports = transports.filter(
(t) => transportSelector === 'all' || t.name === transportSelector,
);

const filteredCodecs = codecs.filter(
(c) => codecSelector === 'all' || c.name === codecSelector,
);

return filteredTransports
.map((transport) =>
// If a selector is provided, filter transport + codecs to match the selector; otherwise, use all codecs.
(selector
? codecs.filter((codec) => selector[1] === codec.name)
: codecs
).map((codec) => ({
filteredCodecs.map((codec) => ({
transport,
codec,
})),
)
.flat();
};
4 changes: 3 additions & 1 deletion transport/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,9 @@ export abstract class ClientTransport<
* and don't want to wait for the grace period to elapse.
*/
hardDisconnect() {
for (const session of this.sessions.values()) {
// create a copy of the sessions to avoid modifying the map while iterating
const sessions = Array.from(this.sessions.values());
for (const session of sessions) {
this.deleteSession(session);
}
}
Expand Down
13 changes: 9 additions & 4 deletions transport/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ export type ProtocolErrorType =

export interface EventMap {
message: OpaqueTransportMessage;
sessionStatus: {
status: 'connect' | 'disconnect';
session: Session<Connection>;
};
sessionStatus:
| {
status: 'created' | 'closing';
session: Session<Connection>;
}
| {
status: 'closed';
session: Pick<Session<Connection>, 'id' | 'to'>;
};
sessionTransition:
| { state: SessionState.Connected }
| { state: SessionState.Handshaking }
Expand Down
99 changes: 84 additions & 15 deletions transport/transport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ describe.each(testMatrix())(
const msg = createDummyTransportMessage();
const msgPromise = waitForMessage(serverTransport);
const sendHandle = (evt: EventMap['sessionStatus']) => {
if (evt.status === 'connect') {
if (evt.status === 'created') {
getClientSendFn(clientTransport, serverTransport)(msg);
}
};
Expand Down Expand Up @@ -354,7 +354,7 @@ describe.each(testMatrix())(
});
});

test('both client and server transport get connect/disconnect notifs', async () => {
test('both client and server transport get session created/closing/closed notifs', async () => {
const clientTransport = getClientTransport('client');
const serverTransport = getServerTransport();
const clientConnStart = vi.fn();
Expand All @@ -367,13 +367,17 @@ describe.each(testMatrix())(
};

const clientSessStart = vi.fn();
const clientSessStopping = vi.fn();
const clientSessStop = vi.fn();
const clientSessHandler = (evt: EventMap['sessionStatus']) => {
switch (evt.status) {
case 'connect':
case 'created':
clientSessStart();
break;
case 'disconnect':
case 'closing':
clientSessStopping();
break;
case 'closed':
clientSessStop();
break;
}
Expand All @@ -389,13 +393,17 @@ describe.each(testMatrix())(
};

const serverSessStart = vi.fn();
const serverSessStopping = vi.fn();
const serverSessStop = vi.fn();
const serverSessHandler = (evt: EventMap['sessionStatus']) => {
switch (evt.status) {
case 'connect':
case 'created':
serverSessStart();
break;
case 'disconnect':
case 'closing':
serverSessStopping();
break;
case 'closed':
serverSessStop();
break;
}
Expand Down Expand Up @@ -438,6 +446,8 @@ describe.each(testMatrix())(
expect(serverSessStart).toHaveBeenCalledTimes(0);
expect(clientSessStop).toHaveBeenCalledTimes(0);
expect(serverSessStop).toHaveBeenCalledTimes(0);
expect(clientSessStopping).toHaveBeenCalledTimes(0);
expect(serverSessStopping).toHaveBeenCalledTimes(0);

clientTransport.connect(serverTransport.clientId);
const clientSendFn = getClientSendFn(clientTransport, serverTransport);
Expand All @@ -455,6 +465,8 @@ describe.each(testMatrix())(
expect(serverSessStart).toHaveBeenCalledTimes(1);
expect(clientSessStop).toHaveBeenCalledTimes(0);
expect(serverSessStop).toHaveBeenCalledTimes(0);
expect(clientSessStopping).toHaveBeenCalledTimes(0);
expect(serverSessStopping).toHaveBeenCalledTimes(0);

// clean disconnect + reconnect within grace period
closeAllConnections(clientTransport);
Expand All @@ -471,6 +483,8 @@ describe.each(testMatrix())(
await waitFor(() => expect(serverSessStart).toHaveBeenCalledTimes(1));
await waitFor(() => expect(clientSessStop).toHaveBeenCalledTimes(0));
await waitFor(() => expect(serverSessStop).toHaveBeenCalledTimes(0));
await waitFor(() => expect(clientSessStopping).toHaveBeenCalledTimes(0));
await waitFor(() => expect(serverSessStopping).toHaveBeenCalledTimes(0));

// by this point the client should have reconnected
// session > c----------| (connected)
Expand All @@ -486,6 +500,8 @@ describe.each(testMatrix())(
expect(clientSessStop).toHaveBeenCalledTimes(0);
expect(serverSessStart).toHaveBeenCalledTimes(1);
expect(serverSessStop).toHaveBeenCalledTimes(0);
expect(clientSessStopping).toHaveBeenCalledTimes(0);
expect(serverSessStopping).toHaveBeenCalledTimes(0);

// disconnect session entirely
// session > c------------x | (disconnected)
Expand All @@ -500,6 +516,8 @@ describe.each(testMatrix())(
await waitFor(() => expect(serverSessStart).toHaveBeenCalledTimes(1));
await waitFor(() => expect(clientSessStop).toHaveBeenCalledTimes(1));
await waitFor(() => expect(serverSessStop).toHaveBeenCalledTimes(1));
await waitFor(() => expect(clientSessStopping).toHaveBeenCalledTimes(1));
await waitFor(() => expect(serverSessStopping).toHaveBeenCalledTimes(1));

await testFinishesCleanly({
clientTransports: [clientTransport],
Expand Down Expand Up @@ -591,6 +609,57 @@ describe.each(testMatrix())(
serverTransport,
});
});

test('listening on session disconnect and manually reconnecting works', async () => {
const clientTransport = getClientTransport('client');
const serverTransport = getServerTransport();
clientTransport.connect(serverTransport.clientId);

addPostTestCleanup(async () => {
await cleanupTransports([clientTransport, serverTransport]);
});

await waitFor(() => expect(numberOfConnections(clientTransport)).toBe(1));
await waitFor(() => expect(numberOfConnections(serverTransport)).toBe(1));

const onSessionDisconnect = vi.fn();
const onSessionConnect = vi.fn();
const sessionStatusListener = (evt: EventMap['sessionStatus']) => {
if (evt.status === 'created') {
onSessionConnect();
}

if (evt.status === 'closed') {
onSessionDisconnect();
clientTransport.connect(serverTransport.clientId);
}
};

clientTransport.addEventListener('sessionStatus', sessionStatusListener);
addPostTestCleanup(async () => {
clientTransport.removeEventListener(
'sessionStatus',
sessionStatusListener,
);
});

expect(onSessionDisconnect).toHaveBeenCalledTimes(0);
expect(onSessionDisconnect).toHaveBeenCalledTimes(0);

// cause a session disconnect
clientTransport.hardDisconnect();

await waitFor(() => expect(onSessionDisconnect).toHaveBeenCalledTimes(1));
await waitFor(() => expect(onSessionConnect).toHaveBeenCalledTimes(1));

await waitFor(() => expect(numberOfConnections(clientTransport)).toBe(1));
await waitFor(() => expect(numberOfConnections(serverTransport)).toBe(1));

await testFinishesCleanly({
clientTransports: [clientTransport],
serverTransport,
});
});
},
);

Expand Down Expand Up @@ -883,7 +952,7 @@ describe.each(testMatrix())(

const onSessionDisconnect = vi.fn();
const sessionStatusListener = (evt: EventMap['sessionStatus']) => {
if (evt.status === 'disconnect') {
if (evt.status === 'closed') {
onSessionDisconnect();
}
};
Expand Down Expand Up @@ -953,10 +1022,10 @@ describe.each(testMatrix())(
const serverSessStop = vi.fn();
const serverSessHandler = (evt: EventMap['sessionStatus']) => {
switch (evt.status) {
case 'connect':
case 'created':
serverSessStart();
break;
case 'disconnect':
case 'closed':
serverSessStop();
break;
}
Expand Down Expand Up @@ -1045,10 +1114,10 @@ describe.each(testMatrix())(
const clientSessStop = vi.fn();
const clientSessHandler = (evt: EventMap['sessionStatus']) => {
switch (evt.status) {
case 'connect':
case 'created':
clientSessStart();
break;
case 'disconnect':
case 'closed':
clientSessStop();
break;
}
Expand Down Expand Up @@ -1177,10 +1246,10 @@ describe.each(testMatrix())(
const clientSessStop = vi.fn();
const clientSessHandler = (evt: EventMap['sessionStatus']) => {
switch (evt.status) {
case 'connect':
case 'created':
clientSessStart();
break;
case 'disconnect':
case 'closed':
clientSessStop();
break;
}
Expand All @@ -1190,10 +1259,10 @@ describe.each(testMatrix())(
const serverSessStop = vi.fn();
const serverSessHandler = (evt: EventMap['sessionStatus']) => {
switch (evt.status) {
case 'connect':
case 'created':
serverSessStart();
break;
case 'disconnect':
case 'closed':
serverSessStop();
break;
}
Expand Down
11 changes: 8 additions & 3 deletions transport/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ export abstract class Transport<ConnType extends Connection> {
close() {
this.status = 'closed';

for (const session of this.sessions.values()) {
const sessions = Array.from(this.sessions.values());
for (const session of sessions) {
this.deleteSession(session);
}

Expand Down Expand Up @@ -195,7 +196,7 @@ export abstract class Transport<ConnType extends Connection> {

this.sessions.set(session.to, session);
this.eventDispatcher.dispatchEvent('sessionStatus', {
status: 'connect',
status: 'created',
session: session,
});

Expand Down Expand Up @@ -246,13 +247,17 @@ export abstract class Transport<ConnType extends Connection> {

session.log?.info(`closing session ${session.id}`, loggingMetadata);
this.eventDispatcher.dispatchEvent('sessionStatus', {
status: 'disconnect',
status: 'closing',
session: session,
});

const to = session.to;
session.close();
this.sessions.delete(to);
this.eventDispatcher.dispatchEvent('sessionStatus', {
status: 'closed',
session: { id: session.id, to: to },
});
}

// common listeners
Expand Down