Skip to content

Commit c22bc50

Browse files
authored
allow ctx.cancel to be returned in all procs (#297)
## Why we want to be able to cancel a procedure from within a handler (e.g. in the upload case, if the client unexpectedly disconnects, we probably want to just cancel instead of returning some bogus result) ## What changed union the cancel type into all procedure handlers, we don't need to change the client as the client already has cancel in the reader type <!-- Describe the changes you made in this pull request or pointers for the reviewer --> ## Versioning - [ ] Breaking protocol change - [ ] Breaking ts/js API change <!-- Kind reminder to add tests and updated documentation if needed -->
1 parent 3fd9a59 commit c22bc50

File tree

8 files changed

+99
-31
lines changed

8 files changed

+99
-31
lines changed

__tests__/e2e.test.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,43 @@ describe.each(testMatrix())(
530530
});
531531
});
532532

533+
test('upload server cancel', async () => {
534+
// setup
535+
const clientTransport = getClientTransport('client');
536+
const serverTransport = getServerTransport();
537+
const services = {
538+
uploadable: UploadableServiceSchema,
539+
};
540+
const server = createServer(serverTransport, services);
541+
const client = createClient<typeof services>(
542+
clientTransport,
543+
serverTransport.clientId,
544+
);
545+
546+
addPostTestCleanup(async () => {
547+
await cleanupTransports([clientTransport, serverTransport]);
548+
});
549+
550+
// test
551+
const { reqWritable, finalize } = client.uploadable.cancellableAdd.upload(
552+
{},
553+
);
554+
reqWritable.write({ n: 9 });
555+
reqWritable.write({ n: 1 });
556+
557+
const result = await finalize();
558+
expect(result).toStrictEqual({
559+
ok: false,
560+
payload: { code: CANCEL_CODE, message: "can't add more than 10" },
561+
});
562+
563+
await testFinishesCleanly({
564+
clientTransports: [clientTransport],
565+
serverTransport,
566+
server,
567+
});
568+
});
569+
533570
test('upload with init message', async () => {
534571
// setup
535572
const clientTransport = getClientTransport('client');

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@replit/river",
33
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
4-
"version": "0.205.2",
4+
"version": "0.206.0",
55
"type": "module",
66
"exports": {
77
".": {

router/context.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import { Span } from '@opentelemetry/api';
22
import { TransportClientId } from '../transport/message';
33
import { SessionId } from '../transport/sessionStateMachine/common';
4+
import { ErrResult } from './result';
5+
import { CancelResultSchema } from './errors';
6+
import { Static } from '@sinclair/typebox';
47

58
/**
69
* ServiceContext exist for the purpose of declaration merging
@@ -75,7 +78,7 @@ export type ProcedureHandlerContext<State> = ServiceContext & {
7578
* Cancelling is not the same as closing procedure calls gracefully, please refer to
7679
* the river documentation to understand the difference between the two concepts.
7780
*/
78-
cancel: () => void;
81+
cancel: (message?: string) => ErrResult<Static<typeof CancelResultSchema>>;
7982
/**
8083
* This signal is a standard [AbortSignal](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal)
8184
* triggered when the procedure invocation is done. This signal tracks the invocation/request finishing

router/errors.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,14 @@ export function castTypeboxValueErrors(
7272
return result;
7373
}
7474

75+
/**
76+
* A schema for cancel payloads sent from the client
77+
*/
78+
export const CancelResultSchema = Type.Object({
79+
code: Type.Literal(CANCEL_CODE),
80+
message: Type.String(),
81+
});
82+
7583
/**
7684
* {@link ReaderErrorSchema} is the schema for all the built-in river errors that
7785
* can be emitted to a reader (request reader on the server, and response reader
@@ -96,10 +104,7 @@ export const ReaderErrorSchema = Type.Union([
96104
}),
97105
),
98106
}),
99-
Type.Object({
100-
code: Type.Literal(CANCEL_CODE),
101-
message: Type.String(),
102-
}),
107+
CancelResultSchema,
103108
]) satisfies ProcedureErrorSchemaType;
104109

105110
/**

router/procedures.ts

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@ import { Static, TNever, TSchema, Type } from '@sinclair/typebox';
33
import { ProcedureHandlerContext } from './context';
44
import { Result } from './result';
55
import { Readable, Writable } from './streams';
6-
import { ProcedureErrorSchemaType, ReaderErrorSchema } from './errors';
6+
import {
7+
CancelResultSchema,
8+
ProcedureErrorSchemaType,
9+
ReaderErrorSchema,
10+
} from './errors';
711

812
/**
913
* Brands a type to prevent it from being directly constructed.
@@ -35,6 +39,8 @@ export type ValidProcType =
3539
*/
3640
export type PayloadType = TSchema;
3741

42+
export type Cancellable<T> = T | Static<typeof CancelResultSchema>;
43+
3844
/**
3945
* Procedure for a single message in both directions (1:1).
4046
*
@@ -57,7 +63,7 @@ export interface RpcProcedure<
5763
handler(param: {
5864
ctx: ProcedureHandlerContext<State>;
5965
reqInit: Static<RequestInit>;
60-
}): Promise<Result<Static<ResponseData>, Static<ResponseErr>>>;
66+
}): Promise<Result<Static<ResponseData>, Cancellable<Static<ResponseErr>>>>;
6167
}
6268

6369
/**
@@ -90,7 +96,7 @@ export interface UploadProcedure<
9096
Static<RequestData>,
9197
Static<typeof ReaderErrorSchema>
9298
>;
93-
}): Promise<Result<Static<ResponseData>, Static<ResponseErr>>>;
99+
}): Promise<Result<Static<ResponseData>, Cancellable<Static<ResponseErr>>>>;
94100
}
95101

96102
/**
@@ -115,7 +121,9 @@ export interface SubscriptionProcedure<
115121
handler(param: {
116122
ctx: ProcedureHandlerContext<State>;
117123
reqInit: Static<RequestInit>;
118-
resWritable: Writable<Result<Static<ResponseData>, Static<ResponseErr>>>;
124+
resWritable: Writable<
125+
Result<Static<ResponseData>, Cancellable<Static<ResponseErr>>>
126+
>;
119127
}): Promise<void | undefined>;
120128
}
121129

@@ -149,7 +157,9 @@ export interface StreamProcedure<
149157
Static<RequestData>,
150158
Static<typeof ReaderErrorSchema>
151159
>;
152-
resWritable: Writable<Result<Static<ResponseData>, Static<ResponseErr>>>;
160+
resWritable: Writable<
161+
Result<Static<ResponseData>, Cancellable<Static<ResponseErr>>>
162+
>;
153163
}): Promise<void | undefined>;
154164
}
155165

router/server.ts

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Static, Type } from '@sinclair/typebox';
1+
import { Static } from '@sinclair/typebox';
22
import { PayloadType, AnyProcedure } from './procedures';
33
import {
44
ReaderErrorSchema,
@@ -7,9 +7,9 @@ import {
77
CANCEL_CODE,
88
INVALID_REQUEST_CODE,
99
BaseErrorSchemaType,
10-
ErrResultSchema,
1110
ValidationErrors,
1211
castTypeboxValueErrors,
12+
CancelResultSchema,
1313
} from './errors';
1414
import {
1515
AnyService,
@@ -53,16 +53,6 @@ import { SessionBoundSendFn } from '../transport/transport';
5353

5454
type StreamId = string;
5555

56-
/**
57-
* A schema for cancel payloads sent from the client
58-
*/
59-
const CancelResultSchema = ErrResultSchema(
60-
Type.Object({
61-
code: Type.Literal(CANCEL_CODE),
62-
message: Type.String(),
63-
}),
64-
);
65-
6656
/**
6757
* Represents a server with a set of services. Use {@link createServer} to create it.
6858
* @template Services - The type of services provided by the server.
@@ -293,9 +283,9 @@ class RiverServer<Services extends AnyServiceSchemaMap>
293283
}
294284

295285
if (isStreamCancelBackwardsCompat(msg.controlFlags, protocolVersion)) {
296-
let cancelResult: Static<typeof CancelResultSchema>;
286+
let cancelResult: ErrResult<Static<typeof CancelResultSchema>>;
297287
if (Value.Check(CancelResultSchema, msg.payload)) {
298-
cancelResult = msg.payload;
288+
cancelResult = Err(msg.payload);
299289
} else {
300290
// If the payload is unexpected, then we just construct our own cancel result
301291
cancelResult = Err({
@@ -576,11 +566,15 @@ class RiverServer<Services extends AnyServiceSchemaMap>
576566
sessionId,
577567
metadata: sessionMetadata,
578568
span,
579-
cancel: () => {
580-
onServerCancel({
569+
cancel: (message?: string) => {
570+
const errRes = {
581571
code: CANCEL_CODE,
582-
message: 'cancelled by server procedure handler',
583-
});
572+
message: message ?? 'cancelled by server procedure handler',
573+
} as const;
574+
575+
onServerCancel(errRes);
576+
577+
return Err(errRes);
584578
},
585579
signal: finishedController.signal,
586580
});

testUtil/fixtures/services.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,25 @@ export const UploadableServiceSchema = ServiceSchema.define({
287287
return Ok({ result: `${reqInit.prefix} ${result}` });
288288
},
289289
}),
290+
291+
cancellableAdd: Procedure.upload({
292+
requestInit: Type.Object({}),
293+
requestData: Type.Object({ n: Type.Number() }),
294+
responseData: Type.Object({ result: Type.Number() }),
295+
async handler({ ctx, reqReadable }) {
296+
let result = 0;
297+
for await (const req of reqReadable) {
298+
const n = unwrapOrThrow(req).n;
299+
if (result + n >= 10) {
300+
return ctx.cancel("can't add more than 10");
301+
}
302+
303+
result += n;
304+
}
305+
306+
return Ok({ result: result });
307+
},
308+
}),
290309
});
291310

292311
const RecursivePayload = Type.Recursive((This) =>

0 commit comments

Comments
 (0)