Skip to content

Commit 2f005b5

Browse files
authored
more invariant fixes (#306)
## Why + What changed - fix `got stream cancel without a valid protocol error` - fix `session state has been consumed and is no longer valid: getting loggingMetadata on consumed state` (logging after protocol error + session deletion) ## Versioning - [ ] Breaking protocol change - [ ] Breaking ts/js API change <!-- Kind reminder to add tests and updated documentation if needed -->
1 parent 38dc0d7 commit 2f005b5

File tree

8 files changed

+18
-42
lines changed

8 files changed

+18
-42
lines changed

router/client.ts

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,11 @@ import { Value } from '@sinclair/typebox/value';
3232
import { PayloadType, ValidProcType } from './procedures';
3333
import {
3434
BaseErrorSchemaType,
35-
ErrResultSchema,
3635
CANCEL_CODE,
37-
ReaderErrorSchema,
36+
ReaderErrorResultSchema,
3837
UNEXPECTED_DISCONNECT_CODE,
3938
} from './errors';
4039

41-
const ReaderErrResultSchema = ErrResultSchema(ReaderErrorSchema);
42-
4340
interface CallOptions {
4441
signal?: AbortSignal;
4542
}
@@ -384,22 +381,22 @@ function handleProc(
384381
cleanClose = false;
385382

386383
span.addEvent('received cancel');
387-
let cancelResult: Static<typeof ReaderErrResultSchema>;
384+
let cancelResult: Static<typeof ReaderErrorResultSchema>;
388385

389-
if (Value.Check(ReaderErrResultSchema, msg.payload)) {
386+
if (Value.Check(ReaderErrorResultSchema, msg.payload)) {
390387
cancelResult = msg.payload;
391388
} else {
392389
cancelResult = Err({
393390
code: CANCEL_CODE,
394391
message: 'stream cancelled with invalid payload',
395392
});
396-
transport.log?.error(
393+
transport.log?.warn(
397394
'got stream cancel without a valid protocol error',
398395
{
399396
clientId: transport.clientId,
400397
transportMessage: msg,
401398
validationErrors: [
402-
...Value.Errors(ReaderErrResultSchema, msg.payload),
399+
...Value.Errors(ReaderErrorResultSchema, msg.payload),
403400
],
404401
},
405402
);

router/context.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { Span } from '@opentelemetry/api';
22
import { TransportClientId } from '../transport/message';
33
import { SessionId } from '../transport/sessionStateMachine/common';
44
import { ErrResult } from './result';
5-
import { CancelResultSchema } from './errors';
5+
import { CancelErrorSchema } from './errors';
66
import { Static } from '@sinclair/typebox';
77

88
/**
@@ -78,7 +78,7 @@ export type ProcedureHandlerContext<State> = ServiceContext & {
7878
* Cancelling is not the same as closing procedure calls gracefully, please refer to
7979
* the river documentation to understand the difference between the two concepts.
8080
*/
81-
cancel: (message?: string) => ErrResult<Static<typeof CancelResultSchema>>;
81+
cancel: (message?: string) => ErrResult<Static<typeof CancelErrorSchema>>;
8282
/**
8383
* This signal is a standard [AbortSignal](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal)
8484
* triggered when the procedure invocation is done. This signal tracks the invocation/request finishing

router/errors.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,13 @@ export function castTypeboxValueErrors(
7575
/**
7676
* A schema for cancel payloads sent from the client
7777
*/
78-
export const CancelResultSchema = Type.Object({
78+
export const CancelErrorSchema = Type.Object({
7979
code: Type.Literal(CANCEL_CODE),
8080
message: Type.String(),
8181
});
8282

83+
export const CancelResultSchema = ErrResultSchema(CancelErrorSchema);
84+
8385
/**
8486
* {@link ReaderErrorSchema} is the schema for all the built-in river errors that
8587
* can be emitted to a reader (request reader on the server, and response reader
@@ -104,9 +106,11 @@ export const ReaderErrorSchema = Type.Union([
104106
}),
105107
),
106108
}),
107-
CancelResultSchema,
109+
CancelErrorSchema,
108110
]) satisfies ProcedureErrorSchemaType;
109111

112+
export const ReaderErrorResultSchema = ErrResultSchema(ReaderErrorSchema);
113+
110114
/**
111115
* Represents an acceptable schema to pass to a procedure.
112116
* Just a type of a schema, not an actual schema.

router/procedures.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { ProcedureHandlerContext } from './context';
44
import { Result } from './result';
55
import { Readable, Writable } from './streams';
66
import {
7-
CancelResultSchema,
7+
CancelErrorSchema,
88
ProcedureErrorSchemaType,
99
ReaderErrorSchema,
1010
} from './errors';
@@ -39,7 +39,7 @@ export type ValidProcType =
3939
*/
4040
export type PayloadType = TSchema;
4141

42-
export type Cancellable<T> = T | Static<typeof CancelResultSchema>;
42+
export type Cancellable<T> = T | Static<typeof CancelErrorSchema>;
4343

4444
/**
4545
* Procedure for a single message in both directions (1:1).

router/server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,9 +283,9 @@ class RiverServer<Services extends AnyServiceSchemaMap>
283283
}
284284

285285
if (isStreamCancelBackwardsCompat(msg.controlFlags, protocolVersion)) {
286-
let cancelResult: ErrResult<Static<typeof CancelResultSchema>>;
286+
let cancelResult: Static<typeof CancelResultSchema>;
287287
if (Value.Check(CancelResultSchema, msg.payload)) {
288-
cancelResult = Err(msg.payload);
288+
cancelResult = msg.payload;
289289
} else {
290290
// If the payload is unexpected, then we just construct our own cancel result
291291
cancelResult = Err({

transport/client.ts

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -319,17 +319,6 @@ export abstract class ClientTransport<
319319

320320
const res = connectedSession.sendBufferedMessages();
321321
if (!res.ok) {
322-
this.log?.error(`failed to send buffered messages: ${res.reason}`, {
323-
...connectedSession.loggingMetadata,
324-
transportMessage: msg,
325-
});
326-
327-
this.protocolError({
328-
type: ProtocolError.MessageSendFailure,
329-
message: res.reason,
330-
});
331-
this.deleteSession(connectedSession, { unhealthy: true });
332-
333322
return;
334323
}
335324

transport/server.ts

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -545,20 +545,6 @@ export abstract class ServerTransport<
545545

546546
const bufferSendRes = connectedSession.sendBufferedMessages();
547547
if (!bufferSendRes.ok) {
548-
this.log?.error(
549-
`failed to send buffered messages: ${bufferSendRes.reason}`,
550-
{
551-
...connectedSession.loggingMetadata,
552-
transportMessage: msg,
553-
},
554-
);
555-
556-
this.protocolError({
557-
type: ProtocolError.MessageSendFailure,
558-
message: bufferSendRes.reason,
559-
});
560-
this.deleteSession(connectedSession, { unhealthy: true });
561-
562548
return;
563549
}
564550

transport/transport.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ export abstract class Transport<ConnType extends Connection> {
260260

261261
// common listeners
262262
protected onSessionGracePeriodElapsed(session: Session<ConnType>) {
263-
this.log?.warn(
263+
this.log?.info(
264264
`session to ${session.to} grace period elapsed, closing`,
265265
session.loggingMetadata,
266266
);

0 commit comments

Comments
 (0)