Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 24 additions & 45 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ interface ControlAck {

interface ControlHandshakeRequest {
type: 'HANDSHAKE_REQ';
protocolVersion: 'v2.0';
protocolVersion: 'v0' | 'v1' | 'v1.1' | 'v2.0';
sessionId: string;
expectedSessionState: {
nextExpectedSeq: number; // integer
Expand All @@ -223,7 +223,22 @@ interface ControlHandshakeRequest {

interface ControlHandshakeResponse {
type: 'HANDSHAKE_RESP';
status: { ok: true; sessionId: string } | { ok: false; reason: string };
status:
| {
ok: true;
sessionId: string;
}
| {
ok: false;
reason: string;
code: // retriable
| 'SESSION_STATE_MISMATCH'
// fatal
| 'MALFORMED_HANDSHAKE_META'
| 'MALFORMED_HANDSHAKE'
| 'PROTOCOL_VERSION_MISMATCH'
| 'REJECTED_BY_CUSTOM_HANDLER';
};
}

type Control =
Expand All @@ -238,7 +253,7 @@ type Control =
## Streams

Streams tie together a series of messages into a single logical 'stream' of communication associated with a single remote procedure invocation.
For example, in the case of a `stream` RPC, the client will send a series of messages with the same `streamId`, and the server must respond with a series of messages with the same `streamId`.
For example, in the case of a `stream` procedure type, the client will send a series of messages with the same `streamId`, and the server must respond with a series of messages with the same `streamId`.

### Starting streams

Expand Down Expand Up @@ -307,7 +322,7 @@ Then, depending on whether this is a client or server, the message must undergo
For an incoming message to be considered valid on the client, the transport message MUST fulfill the following criteria:

- It should have a `streamId` that the client recognizes. That is, there MUST already be a message listener waiting for messages on the `streamId` of the original request message (recall that streams are only initiated by clients).
- If a server sends an `ProtocolError` message the client MUST NOT send any further messages to the server for that stream including a control messages.
- If a server sends a `ProtocolError` message the client MUST NOT send any further messages to the server for that stream including a control message.

If the message is invalid, the client MUST silently discard the message.
Otherwise, this is a normal message. Unwrap the payload and return it to the caller of the original procedure.
Expand Down Expand Up @@ -533,11 +548,11 @@ The process differs slightly between the client and server:
│ connect success ──────────────┤ connect failure
▼ │
3. SessionHandshaking │
│ handshake success ┌────── connection drop
│ handshake success ┌──────╪─ connection drop
5. WaitingForHandshake │ handshake failure ─────┤ │
│ handshake success ▼ │ │ connection drop
├───────────────────────► 4. SessionConnected │ │ heartbeat misses
│ │ invalid message ─────────────┘
│ │ invalid message ─────────────┘
│ ▼ │
└───────────────────────► x. Destroy Session ◄─────┘
handshake failure
Expand All @@ -555,43 +570,7 @@ Handshake messages are identical to normal transport messages except with:
2. `ack: 0`
3. no control flags

The handshake request payload schema is the following:

```ts
type HandshakeRequest = {
type: 'HANDSHAKE_REQ';
protocolVersion: 'v0' | 'v1' | 'v1.1' | 'v2.0';
sessionId: string;
expectedSessionState: {
nextExpectedSeq: number;
nextSentSeq: number;
};
};
```

The handshake response payload schema is the following:

```ts
type HandshakeResponse = {
type: 'HANDSHAKE_RESP';
status:
| {
ok: true;
sessionId: string;
}
| {
ok: false;
reason: string;
code: // retriable
| 'SESSION_STATE_MISMATCH'
// fatal
| 'MALFORMED_HANDSHAKE_META'
| 'MALFORMED_HANDSHAKE'
| 'PROTOCOL_VERSION_MISMATCH'
| 'REJECTED_BY_CUSTOM_HANDLER';
};
};
```
The full handshake request and response payload schemas are defined above in the `Control` payload documentation.

The server will send an error response if either:

Expand All @@ -616,7 +595,7 @@ Though this is very [TCP](https://jzhao.xyz/thoughts/TCP) inspired, River has th
The send buffer is a queue of messages that have been sent but not yet acknowledged by the other side.
When a message is sent (including `Control` messages like explicit acks), it is added to the send buffer.

All messages have an `ack` and the `ack` corresponds to the number of messages the other side has processed.
All messages have an `ack` field, and the `ack` value corresponds to the number of messages the other side has processed.
When receiving message a valid message (see the 'Handling Messages for Streams' section for the definition of 'valid'), sessions should ensure that the incoming message `msg.seq` MUST match the session's `session.ack`.
This helps to ensure exactly once delivery and ensures that duplicate and out-of-order messages don't mistakingly update the session's bookkeeping.

Expand All @@ -641,7 +620,7 @@ The client and server should both have a grace period `sessionDisconnectGraceMs`

It is important to note that this implies that there are two types of 'reconnects' in River:

1. Transparent reconnects: the connection dropped and reconnected but the session metadata is in-tact so resending the buffered messages will restore order. At the application level, nothing happened.
1. Transparent reconnects: the connection dropped and reconnected but the session metadata is intact so resending the buffered messages will restore order. At the application level, nothing happened.
2. Hard reconnect: the other transport has lost all state and current transport should invalidate all state and start from scratch.

The TypeScript implementation of the transport explicitly emits `connectionStatus` events for transparent reconnects and `sessionStatus` events for hard reconnects which the client and server can listen to.
Expand Down