Skip to content

Commit d95e0d5

Browse files
committed
rename session connect/disconnect -> created/closing/closed
1 parent 549d27d commit d95e0d5

File tree

11 files changed

+113
-32
lines changed

11 files changed

+113
-32
lines changed

README.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
# River
22

3-
⚠️ Not production ready, while Replit is using parts of River in production, we are still going through rapid breaking changes. First production ready version will be `1.x.x` ⚠️
4-
53
River allows multiple clients to connect to and make remote procedure calls to a remote server as if they were local procedures.
64

75
## Long-lived streaming remote procedure calls

__tests__/negative.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ describe('should handle incompatabilities', async () => {
205205
expect(errMock).toHaveBeenCalledTimes(0);
206206
expect(spy).toHaveBeenCalledWith(
207207
expect.objectContaining({
208-
status: 'connect',
208+
status: 'created',
209209
}),
210210
);
211211

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@replit/river",
33
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
4-
"version": "0.204.0",
4+
"version": "1.0.0",
55
"type": "module",
66
"exports": {
77
".": {

router/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ function handleProc(
456456

457457
function onSessionStatus(evt: EventMap['sessionStatus']) {
458458
if (
459-
evt.status !== 'disconnect' ||
459+
evt.status !== 'closing' ||
460460
evt.session.to !== serverId ||
461461
session.id !== evt.session.id
462462
) {

router/server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ class RiverServer<Services extends AnyServiceSchemaMap>
216216
};
217217

218218
const handleSessionStatus = (evt: EventMap['sessionStatus']) => {
219-
if (evt.status !== 'disconnect') return;
219+
if (evt.status !== 'closing') return;
220220

221221
const disconnectedClientId = evt.session.to;
222222
this.log?.info(

testUtil/fixtures/matrix.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ export const testMatrix = (selector?: Selector): Array<TestMatrixEntry> =>
3131
.map((transport) =>
3232
// If a selector is provided, filter transport + codecs to match the selector; otherwise, use all codecs.
3333
(selector
34-
? codecs.filter((codec) => selector[1] === codec.name)
34+
? codecs.filter((codec) => {
35+
return selector[0] === transport.name && selector[1] === codec.name;
36+
})
3537
: codecs
3638
).map((codec) => ({
3739
transport,

transport/client.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,9 @@ export abstract class ClientTransport<
372372
* and don't want to wait for the grace period to elapse.
373373
*/
374374
hardDisconnect() {
375-
for (const session of this.sessions.values()) {
375+
// create a copy of the sessions to avoid modifying the map while iterating
376+
const sessions = Array.from(this.sessions.values());
377+
for (const session of sessions) {
376378
this.deleteSession(session);
377379
}
378380
}

transport/events.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,15 @@ export type ProtocolErrorType =
1616

1717
export interface EventMap {
1818
message: OpaqueTransportMessage;
19-
sessionStatus: {
20-
status: 'connect' | 'disconnect';
21-
session: Session<Connection>;
22-
};
19+
sessionStatus:
20+
| {
21+
status: 'created' | 'closing';
22+
session: Session<Connection>;
23+
}
24+
| {
25+
status: 'closed';
26+
session: Pick<Session<Connection>, 'id' | 'to'>;
27+
};
2328
sessionTransition:
2429
| { state: SessionState.Connected }
2530
| { state: SessionState.Handshaking }

transport/transport.test.ts

Lines changed: 84 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ describe.each(testMatrix())(
254254
const msg = createDummyTransportMessage();
255255
const msgPromise = waitForMessage(serverTransport);
256256
const sendHandle = (evt: EventMap['sessionStatus']) => {
257-
if (evt.status === 'connect') {
257+
if (evt.status === 'created') {
258258
getClientSendFn(clientTransport, serverTransport)(msg);
259259
}
260260
};
@@ -354,7 +354,7 @@ describe.each(testMatrix())(
354354
});
355355
});
356356

357-
test('both client and server transport get connect/disconnect notifs', async () => {
357+
test('both client and server transport get session created/closing/closed notifs', async () => {
358358
const clientTransport = getClientTransport('client');
359359
const serverTransport = getServerTransport();
360360
const clientConnStart = vi.fn();
@@ -367,13 +367,17 @@ describe.each(testMatrix())(
367367
};
368368

369369
const clientSessStart = vi.fn();
370+
const clientSessStopping = vi.fn();
370371
const clientSessStop = vi.fn();
371372
const clientSessHandler = (evt: EventMap['sessionStatus']) => {
372373
switch (evt.status) {
373-
case 'connect':
374+
case 'created':
374375
clientSessStart();
375376
break;
376-
case 'disconnect':
377+
case 'closing':
378+
clientSessStopping();
379+
break;
380+
case 'closed':
377381
clientSessStop();
378382
break;
379383
}
@@ -389,13 +393,17 @@ describe.each(testMatrix())(
389393
};
390394

391395
const serverSessStart = vi.fn();
396+
const serverSessStopping = vi.fn();
392397
const serverSessStop = vi.fn();
393398
const serverSessHandler = (evt: EventMap['sessionStatus']) => {
394399
switch (evt.status) {
395-
case 'connect':
400+
case 'created':
396401
serverSessStart();
397402
break;
398-
case 'disconnect':
403+
case 'closing':
404+
serverSessStopping();
405+
break;
406+
case 'closed':
399407
serverSessStop();
400408
break;
401409
}
@@ -438,6 +446,8 @@ describe.each(testMatrix())(
438446
expect(serverSessStart).toHaveBeenCalledTimes(0);
439447
expect(clientSessStop).toHaveBeenCalledTimes(0);
440448
expect(serverSessStop).toHaveBeenCalledTimes(0);
449+
expect(clientSessStopping).toHaveBeenCalledTimes(0);
450+
expect(serverSessStopping).toHaveBeenCalledTimes(0);
441451

442452
clientTransport.connect(serverTransport.clientId);
443453
const clientSendFn = getClientSendFn(clientTransport, serverTransport);
@@ -455,6 +465,8 @@ describe.each(testMatrix())(
455465
expect(serverSessStart).toHaveBeenCalledTimes(1);
456466
expect(clientSessStop).toHaveBeenCalledTimes(0);
457467
expect(serverSessStop).toHaveBeenCalledTimes(0);
468+
expect(clientSessStopping).toHaveBeenCalledTimes(0);
469+
expect(serverSessStopping).toHaveBeenCalledTimes(0);
458470

459471
// clean disconnect + reconnect within grace period
460472
closeAllConnections(clientTransport);
@@ -471,6 +483,8 @@ describe.each(testMatrix())(
471483
await waitFor(() => expect(serverSessStart).toHaveBeenCalledTimes(1));
472484
await waitFor(() => expect(clientSessStop).toHaveBeenCalledTimes(0));
473485
await waitFor(() => expect(serverSessStop).toHaveBeenCalledTimes(0));
486+
await waitFor(() => expect(clientSessStopping).toHaveBeenCalledTimes(0));
487+
await waitFor(() => expect(serverSessStopping).toHaveBeenCalledTimes(0));
474488

475489
// by this point the client should have reconnected
476490
// session > c----------| (connected)
@@ -486,6 +500,8 @@ describe.each(testMatrix())(
486500
expect(clientSessStop).toHaveBeenCalledTimes(0);
487501
expect(serverSessStart).toHaveBeenCalledTimes(1);
488502
expect(serverSessStop).toHaveBeenCalledTimes(0);
503+
expect(clientSessStopping).toHaveBeenCalledTimes(0);
504+
expect(serverSessStopping).toHaveBeenCalledTimes(0);
489505

490506
// disconnect session entirely
491507
// session > c------------x | (disconnected)
@@ -500,6 +516,8 @@ describe.each(testMatrix())(
500516
await waitFor(() => expect(serverSessStart).toHaveBeenCalledTimes(1));
501517
await waitFor(() => expect(clientSessStop).toHaveBeenCalledTimes(1));
502518
await waitFor(() => expect(serverSessStop).toHaveBeenCalledTimes(1));
519+
await waitFor(() => expect(clientSessStopping).toHaveBeenCalledTimes(1));
520+
await waitFor(() => expect(serverSessStopping).toHaveBeenCalledTimes(1));
503521

504522
await testFinishesCleanly({
505523
clientTransports: [clientTransport],
@@ -591,6 +609,57 @@ describe.each(testMatrix())(
591609
serverTransport,
592610
});
593611
});
612+
613+
test('listening on session disconnect and manually reconnecting works', async () => {
614+
const clientTransport = getClientTransport('client');
615+
const serverTransport = getServerTransport();
616+
clientTransport.connect(serverTransport.clientId);
617+
618+
addPostTestCleanup(async () => {
619+
await cleanupTransports([clientTransport, serverTransport]);
620+
});
621+
622+
await waitFor(() => expect(numberOfConnections(clientTransport)).toBe(1));
623+
await waitFor(() => expect(numberOfConnections(serverTransport)).toBe(1));
624+
625+
const onSessionDisconnect = vi.fn();
626+
const onSessionConnect = vi.fn();
627+
const sessionStatusListener = (evt: EventMap['sessionStatus']) => {
628+
if (evt.status === 'created') {
629+
onSessionConnect();
630+
}
631+
632+
if (evt.status === 'closed') {
633+
onSessionDisconnect();
634+
clientTransport.connect(serverTransport.clientId);
635+
}
636+
};
637+
638+
clientTransport.addEventListener('sessionStatus', sessionStatusListener);
639+
addPostTestCleanup(async () => {
640+
clientTransport.removeEventListener(
641+
'sessionStatus',
642+
sessionStatusListener,
643+
);
644+
});
645+
646+
expect(onSessionDisconnect).toHaveBeenCalledTimes(0);
647+
expect(onSessionDisconnect).toHaveBeenCalledTimes(0);
648+
649+
// cause a session disconnect
650+
clientTransport.hardDisconnect();
651+
652+
await waitFor(() => expect(onSessionDisconnect).toHaveBeenCalledTimes(1));
653+
await waitFor(() => expect(onSessionConnect).toHaveBeenCalledTimes(1));
654+
655+
await waitFor(() => expect(numberOfConnections(clientTransport)).toBe(1));
656+
await waitFor(() => expect(numberOfConnections(serverTransport)).toBe(1));
657+
658+
await testFinishesCleanly({
659+
clientTransports: [clientTransport],
660+
serverTransport,
661+
});
662+
});
594663
},
595664
);
596665

@@ -883,7 +952,7 @@ describe.each(testMatrix())(
883952

884953
const onSessionDisconnect = vi.fn();
885954
const sessionStatusListener = (evt: EventMap['sessionStatus']) => {
886-
if (evt.status === 'disconnect') {
955+
if (evt.status === 'closed') {
887956
onSessionDisconnect();
888957
}
889958
};
@@ -953,10 +1022,10 @@ describe.each(testMatrix())(
9531022
const serverSessStop = vi.fn();
9541023
const serverSessHandler = (evt: EventMap['sessionStatus']) => {
9551024
switch (evt.status) {
956-
case 'connect':
1025+
case 'created':
9571026
serverSessStart();
9581027
break;
959-
case 'disconnect':
1028+
case 'closed':
9601029
serverSessStop();
9611030
break;
9621031
}
@@ -1045,10 +1114,10 @@ describe.each(testMatrix())(
10451114
const clientSessStop = vi.fn();
10461115
const clientSessHandler = (evt: EventMap['sessionStatus']) => {
10471116
switch (evt.status) {
1048-
case 'connect':
1117+
case 'created':
10491118
clientSessStart();
10501119
break;
1051-
case 'disconnect':
1120+
case 'closed':
10521121
clientSessStop();
10531122
break;
10541123
}
@@ -1177,10 +1246,10 @@ describe.each(testMatrix())(
11771246
const clientSessStop = vi.fn();
11781247
const clientSessHandler = (evt: EventMap['sessionStatus']) => {
11791248
switch (evt.status) {
1180-
case 'connect':
1249+
case 'created':
11811250
clientSessStart();
11821251
break;
1183-
case 'disconnect':
1252+
case 'closed':
11841253
clientSessStop();
11851254
break;
11861255
}
@@ -1190,10 +1259,10 @@ describe.each(testMatrix())(
11901259
const serverSessStop = vi.fn();
11911260
const serverSessHandler = (evt: EventMap['sessionStatus']) => {
11921261
switch (evt.status) {
1193-
case 'connect':
1262+
case 'created':
11941263
serverSessStart();
11951264
break;
1196-
case 'disconnect':
1265+
case 'closed':
11971266
serverSessStop();
11981267
break;
11991268
}

0 commit comments

Comments
 (0)