Skip to content

Commit 78d5a9e

Browse files
authored
rename session connect/disconnect -> created/closing/closed (#288)
## Why nomenclature of 'connected'/'disconnected' is an artifact of when sessions were what connections are today, thats no longer accurate this also allows us to disambiguate the 'about to close' state and 'already closed' state these have two different cases: 1. about to close -> do any last minute cleanup and send things to the otherside before we totally yank the connection 2. already closed -> consumers can do things like manually open a new session if an old session is closed ## What changed 1. rename 'connect' -> 'created', 'disconnect' -> 'closing' 2. add a new 'closed' state which is emitted after the cleanup is done 3. fix a case where we could loop infinitely by mutating a map while iterating its values 4. add a test for the create a new session after previous session is gone case ## Versioning - [ ] Breaking protocol change - [x] Breaking ts/js API change <!-- Kind reminder to add tests and updated documentation if needed -->
1 parent 549d27d commit 78d5a9e

File tree

11 files changed

+132
-40
lines changed

11 files changed

+132
-40
lines changed

README.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,14 @@ If your application is stateful on either the server or the client, the service
189189

190190
```ts
191191
transport.addEventListener('sessionStatus', (evt) => {
192-
if (evt.status === 'connect') {
192+
if (evt.status === 'created') {
193193
// do something
194-
} else if (evt.status === 'disconnect') {
195-
// do something else
194+
} else if (evt.status === 'closing') {
195+
// do other things
196+
} else if (evt.status === 'closed') {
197+
// note that evt.session only has id + to
198+
// this is useful for doing things like creating a new session if
199+
// a session just got yanked
196200
}
197201
});
198202

__tests__/negative.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ describe('should handle incompatabilities', async () => {
205205
expect(errMock).toHaveBeenCalledTimes(0);
206206
expect(spy).toHaveBeenCalledWith(
207207
expect.objectContaining({
208-
status: 'connect',
208+
status: 'created',
209209
}),
210210
);
211211

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@replit/river",
33
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
4-
"version": "0.204.0",
4+
"version": "0.205.0",
55
"type": "module",
66
"exports": {
77
".": {

router/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ function handleProc(
456456

457457
function onSessionStatus(evt: EventMap['sessionStatus']) {
458458
if (
459-
evt.status !== 'disconnect' ||
459+
evt.status !== 'closing' ||
460460
evt.session.to !== serverId ||
461461
session.id !== evt.session.id
462462
) {

router/server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ class RiverServer<Services extends AnyServiceSchemaMap>
216216
};
217217

218218
const handleSessionStatus = (evt: EventMap['sessionStatus']) => {
219-
if (evt.status !== 'disconnect') return;
219+
if (evt.status !== 'closing') return;
220220

221221
const disconnectedClientId = evt.session.to;
222222
this.log?.info(

testUtil/fixtures/matrix.ts

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ interface TestMatrixEntry {
1717
/**
1818
* Defines a selector type that pairs a valid transport with a valid codec.
1919
*/
20-
type Selector = [ValidTransports, ValidCodecs];
20+
type Selector = [ValidTransports | 'all', ValidCodecs | 'all'];
2121

2222
/**
2323
* Generates a matrix of test entries for each combination of transport and codec.
@@ -26,16 +26,23 @@ type Selector = [ValidTransports, ValidCodecs];
2626
* @param selector An optional tuple specifying a transport and codec to filter the matrix.
2727
* @returns An array of TestMatrixEntry objects representing the combinations of transport and codec.
2828
*/
29-
export const testMatrix = (selector?: Selector): Array<TestMatrixEntry> =>
30-
transports
29+
export const testMatrix = (
30+
[transportSelector, codecSelector]: Selector = ['all', 'all'],
31+
): Array<TestMatrixEntry> => {
32+
const filteredTransports = transports.filter(
33+
(t) => transportSelector === 'all' || t.name === transportSelector,
34+
);
35+
36+
const filteredCodecs = codecs.filter(
37+
(c) => codecSelector === 'all' || c.name === codecSelector,
38+
);
39+
40+
return filteredTransports
3141
.map((transport) =>
32-
// If a selector is provided, filter transport + codecs to match the selector; otherwise, use all codecs.
33-
(selector
34-
? codecs.filter((codec) => selector[1] === codec.name)
35-
: codecs
36-
).map((codec) => ({
42+
filteredCodecs.map((codec) => ({
3743
transport,
3844
codec,
3945
})),
4046
)
4147
.flat();
48+
};

transport/client.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,9 @@ export abstract class ClientTransport<
372372
* and don't want to wait for the grace period to elapse.
373373
*/
374374
hardDisconnect() {
375-
for (const session of this.sessions.values()) {
375+
// create a copy of the sessions to avoid modifying the map while iterating
376+
const sessions = Array.from(this.sessions.values());
377+
for (const session of sessions) {
376378
this.deleteSession(session);
377379
}
378380
}

transport/events.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,15 @@ export type ProtocolErrorType =
1616

1717
export interface EventMap {
1818
message: OpaqueTransportMessage;
19-
sessionStatus: {
20-
status: 'connect' | 'disconnect';
21-
session: Session<Connection>;
22-
};
19+
sessionStatus:
20+
| {
21+
status: 'created' | 'closing';
22+
session: Session<Connection>;
23+
}
24+
| {
25+
status: 'closed';
26+
session: Pick<Session<Connection>, 'id' | 'to'>;
27+
};
2328
sessionTransition:
2429
| { state: SessionState.Connected }
2530
| { state: SessionState.Handshaking }

transport/transport.test.ts

Lines changed: 84 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ describe.each(testMatrix())(
254254
const msg = createDummyTransportMessage();
255255
const msgPromise = waitForMessage(serverTransport);
256256
const sendHandle = (evt: EventMap['sessionStatus']) => {
257-
if (evt.status === 'connect') {
257+
if (evt.status === 'created') {
258258
getClientSendFn(clientTransport, serverTransport)(msg);
259259
}
260260
};
@@ -354,7 +354,7 @@ describe.each(testMatrix())(
354354
});
355355
});
356356

357-
test('both client and server transport get connect/disconnect notifs', async () => {
357+
test('both client and server transport get session created/closing/closed notifs', async () => {
358358
const clientTransport = getClientTransport('client');
359359
const serverTransport = getServerTransport();
360360
const clientConnStart = vi.fn();
@@ -367,13 +367,17 @@ describe.each(testMatrix())(
367367
};
368368

369369
const clientSessStart = vi.fn();
370+
const clientSessStopping = vi.fn();
370371
const clientSessStop = vi.fn();
371372
const clientSessHandler = (evt: EventMap['sessionStatus']) => {
372373
switch (evt.status) {
373-
case 'connect':
374+
case 'created':
374375
clientSessStart();
375376
break;
376-
case 'disconnect':
377+
case 'closing':
378+
clientSessStopping();
379+
break;
380+
case 'closed':
377381
clientSessStop();
378382
break;
379383
}
@@ -389,13 +393,17 @@ describe.each(testMatrix())(
389393
};
390394

391395
const serverSessStart = vi.fn();
396+
const serverSessStopping = vi.fn();
392397
const serverSessStop = vi.fn();
393398
const serverSessHandler = (evt: EventMap['sessionStatus']) => {
394399
switch (evt.status) {
395-
case 'connect':
400+
case 'created':
396401
serverSessStart();
397402
break;
398-
case 'disconnect':
403+
case 'closing':
404+
serverSessStopping();
405+
break;
406+
case 'closed':
399407
serverSessStop();
400408
break;
401409
}
@@ -438,6 +446,8 @@ describe.each(testMatrix())(
438446
expect(serverSessStart).toHaveBeenCalledTimes(0);
439447
expect(clientSessStop).toHaveBeenCalledTimes(0);
440448
expect(serverSessStop).toHaveBeenCalledTimes(0);
449+
expect(clientSessStopping).toHaveBeenCalledTimes(0);
450+
expect(serverSessStopping).toHaveBeenCalledTimes(0);
441451

442452
clientTransport.connect(serverTransport.clientId);
443453
const clientSendFn = getClientSendFn(clientTransport, serverTransport);
@@ -455,6 +465,8 @@ describe.each(testMatrix())(
455465
expect(serverSessStart).toHaveBeenCalledTimes(1);
456466
expect(clientSessStop).toHaveBeenCalledTimes(0);
457467
expect(serverSessStop).toHaveBeenCalledTimes(0);
468+
expect(clientSessStopping).toHaveBeenCalledTimes(0);
469+
expect(serverSessStopping).toHaveBeenCalledTimes(0);
458470

459471
// clean disconnect + reconnect within grace period
460472
closeAllConnections(clientTransport);
@@ -471,6 +483,8 @@ describe.each(testMatrix())(
471483
await waitFor(() => expect(serverSessStart).toHaveBeenCalledTimes(1));
472484
await waitFor(() => expect(clientSessStop).toHaveBeenCalledTimes(0));
473485
await waitFor(() => expect(serverSessStop).toHaveBeenCalledTimes(0));
486+
await waitFor(() => expect(clientSessStopping).toHaveBeenCalledTimes(0));
487+
await waitFor(() => expect(serverSessStopping).toHaveBeenCalledTimes(0));
474488

475489
// by this point the client should have reconnected
476490
// session > c----------| (connected)
@@ -486,6 +500,8 @@ describe.each(testMatrix())(
486500
expect(clientSessStop).toHaveBeenCalledTimes(0);
487501
expect(serverSessStart).toHaveBeenCalledTimes(1);
488502
expect(serverSessStop).toHaveBeenCalledTimes(0);
503+
expect(clientSessStopping).toHaveBeenCalledTimes(0);
504+
expect(serverSessStopping).toHaveBeenCalledTimes(0);
489505

490506
// disconnect session entirely
491507
// session > c------------x | (disconnected)
@@ -500,6 +516,8 @@ describe.each(testMatrix())(
500516
await waitFor(() => expect(serverSessStart).toHaveBeenCalledTimes(1));
501517
await waitFor(() => expect(clientSessStop).toHaveBeenCalledTimes(1));
502518
await waitFor(() => expect(serverSessStop).toHaveBeenCalledTimes(1));
519+
await waitFor(() => expect(clientSessStopping).toHaveBeenCalledTimes(1));
520+
await waitFor(() => expect(serverSessStopping).toHaveBeenCalledTimes(1));
503521

504522
await testFinishesCleanly({
505523
clientTransports: [clientTransport],
@@ -591,6 +609,57 @@ describe.each(testMatrix())(
591609
serverTransport,
592610
});
593611
});
612+
613+
test('listening on session disconnect and manually reconnecting works', async () => {
614+
const clientTransport = getClientTransport('client');
615+
const serverTransport = getServerTransport();
616+
clientTransport.connect(serverTransport.clientId);
617+
618+
addPostTestCleanup(async () => {
619+
await cleanupTransports([clientTransport, serverTransport]);
620+
});
621+
622+
await waitFor(() => expect(numberOfConnections(clientTransport)).toBe(1));
623+
await waitFor(() => expect(numberOfConnections(serverTransport)).toBe(1));
624+
625+
const onSessionDisconnect = vi.fn();
626+
const onSessionConnect = vi.fn();
627+
const sessionStatusListener = (evt: EventMap['sessionStatus']) => {
628+
if (evt.status === 'created') {
629+
onSessionConnect();
630+
}
631+
632+
if (evt.status === 'closed') {
633+
onSessionDisconnect();
634+
clientTransport.connect(serverTransport.clientId);
635+
}
636+
};
637+
638+
clientTransport.addEventListener('sessionStatus', sessionStatusListener);
639+
addPostTestCleanup(async () => {
640+
clientTransport.removeEventListener(
641+
'sessionStatus',
642+
sessionStatusListener,
643+
);
644+
});
645+
646+
expect(onSessionDisconnect).toHaveBeenCalledTimes(0);
647+
expect(onSessionDisconnect).toHaveBeenCalledTimes(0);
648+
649+
// cause a session disconnect
650+
clientTransport.hardDisconnect();
651+
652+
await waitFor(() => expect(onSessionDisconnect).toHaveBeenCalledTimes(1));
653+
await waitFor(() => expect(onSessionConnect).toHaveBeenCalledTimes(1));
654+
655+
await waitFor(() => expect(numberOfConnections(clientTransport)).toBe(1));
656+
await waitFor(() => expect(numberOfConnections(serverTransport)).toBe(1));
657+
658+
await testFinishesCleanly({
659+
clientTransports: [clientTransport],
660+
serverTransport,
661+
});
662+
});
594663
},
595664
);
596665

@@ -883,7 +952,7 @@ describe.each(testMatrix())(
883952

884953
const onSessionDisconnect = vi.fn();
885954
const sessionStatusListener = (evt: EventMap['sessionStatus']) => {
886-
if (evt.status === 'disconnect') {
955+
if (evt.status === 'closed') {
887956
onSessionDisconnect();
888957
}
889958
};
@@ -953,10 +1022,10 @@ describe.each(testMatrix())(
9531022
const serverSessStop = vi.fn();
9541023
const serverSessHandler = (evt: EventMap['sessionStatus']) => {
9551024
switch (evt.status) {
956-
case 'connect':
1025+
case 'created':
9571026
serverSessStart();
9581027
break;
959-
case 'disconnect':
1028+
case 'closed':
9601029
serverSessStop();
9611030
break;
9621031
}
@@ -1045,10 +1114,10 @@ describe.each(testMatrix())(
10451114
const clientSessStop = vi.fn();
10461115
const clientSessHandler = (evt: EventMap['sessionStatus']) => {
10471116
switch (evt.status) {
1048-
case 'connect':
1117+
case 'created':
10491118
clientSessStart();
10501119
break;
1051-
case 'disconnect':
1120+
case 'closed':
10521121
clientSessStop();
10531122
break;
10541123
}
@@ -1177,10 +1246,10 @@ describe.each(testMatrix())(
11771246
const clientSessStop = vi.fn();
11781247
const clientSessHandler = (evt: EventMap['sessionStatus']) => {
11791248
switch (evt.status) {
1180-
case 'connect':
1249+
case 'created':
11811250
clientSessStart();
11821251
break;
1183-
case 'disconnect':
1252+
case 'closed':
11841253
clientSessStop();
11851254
break;
11861255
}
@@ -1190,10 +1259,10 @@ describe.each(testMatrix())(
11901259
const serverSessStop = vi.fn();
11911260
const serverSessHandler = (evt: EventMap['sessionStatus']) => {
11921261
switch (evt.status) {
1193-
case 'connect':
1262+
case 'created':
11941263
serverSessStart();
11951264
break;
1196-
case 'disconnect':
1265+
case 'closed':
11971266
serverSessStop();
11981267
break;
11991268
}

0 commit comments

Comments
 (0)