Skip to content

Commit 3c74f1d

Browse files
Translating a little more of PROTOCOL into code
1 parent 2ddb3c7 commit 3c74f1d

File tree

3 files changed

+45
-6
lines changed

3 files changed

+45
-6
lines changed

src/replit_river/rpc.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,10 @@
4646
GenericRpcHandlerBuilder = Callable[
4747
[str, Channel[Any], Channel[Any]], Coroutine[None, None, None]
4848
]
49-
ACK_BIT = 0b00001
50-
STREAM_OPEN_BIT = 0b00010
49+
ACK_BIT_TYPE = Literal[0b00001]
50+
ACK_BIT: ACK_BIT_TYPE = 0b00001
51+
STREAM_OPEN_BIT_TYPE = Literal[0b00010]
52+
STREAM_OPEN_BIT: STREAM_OPEN_BIT_TYPE = 0b00010
5153

5254
# these codes are retriable
5355
# if the server sends a response with one of these codes,

src/replit_river/v2/client_session.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33
from collections.abc import AsyncIterable
44
from datetime import timedelta
5-
from typing import Any, AsyncGenerator, Callable, Coroutine
5+
from typing import Any, AsyncGenerator, Callable, Coroutine, Literal
66

77
import nanoid # type: ignore
88
import websockets
@@ -37,8 +37,10 @@
3737
from replit_river.session import Session
3838
from replit_river.transport_options import MAX_MESSAGE_BUFFER_SIZE, TransportOptions
3939

40-
STREAM_CANCEL_BIT = 0b00100 # Synonymous with the cancel bit in v2
41-
STREAM_CLOSED_BIT = 0b01000 # Synonymous with the cancel bit in v2
40+
STREAM_CANCEL_BIT_TYPE = Literal[0b00100]
41+
STREAM_CANCEL_BIT: STREAM_CANCEL_BIT_TYPE = 0b00100
42+
STREAM_CLOSED_BIT_TYPE = Literal[0b01000]
43+
STREAM_CLOSED_BIT: STREAM_CLOSED_BIT_TYPE = 0b01000
4244

4345

4446
logger = logging.getLogger(__name__)

src/replit_river/v2/schema.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
from typing import Any, Literal, NotRequired, TypeAlias, TypedDict
22

3-
from replit_river.rpc import ExpectedSessionState
3+
from grpc.aio import BaseError
4+
5+
from replit_river.rpc import ACK_BIT, ACK_BIT_TYPE, ExpectedSessionState
6+
from replit_river.v2.client_session import STREAM_CANCEL_BIT, STREAM_CANCEL_BIT_TYPE
47

58

69
class ControlClose(TypedDict):
@@ -33,7 +36,39 @@ class ControlHandshakeResponse(TypedDict):
3336
type: Literal["HANDSHAKE_RESP"]
3437
status: HandshakeOK | HandshakeError
3538

39+
# This is sent when the server encounters an internal error
40+
# i.e. an invariant has been violated
41+
class BaseErrorStructure(TypedDict):
42+
# This should be a defined literal to make sure errors are easily differentiated
43+
# code: str # Supplied by implementations
44+
# This can be any string
45+
message: str
46+
# Any extra metadata
47+
extra: NotRequired[Any]
48+
49+
# When a client sends a malformed request. This can be
50+
# for a variety of reasons which would be included
51+
# in the message.
52+
class InvalidRequestError(BaseErrorStructure):
53+
code: Literal['INVALID_REQUEST']
54+
55+
# This is sent when an exception happens in the handler of a stream.
56+
class UncaughtError(BaseErrorStructure):
57+
code: Literal['UNCAUGHT_ERROR']
58+
59+
# This is sent when one side wishes to cancel the stream
60+
# abruptly from user-space. Handling this is up to the procedure
61+
# implementation or the caller.
62+
class CancelError(BaseErrorStructure):
63+
code: Literal['CANCEL']
64+
65+
ProtocolError: TypeAlias = UncaughtError | InvalidRequestError | CancelError;
3666

3767
Control: TypeAlias = (
3868
ControlClose | ControlAck | ControlHandshakeRequest | ControlHandshakeResponse
3969
)
70+
71+
ValidPairings = (
72+
tuple[ACK_BIT_TYPE, ControlAck] |
73+
tuple[STREAM_CANCEL_BIT_TYPE, ProtocolError]
74+
)

0 commit comments

Comments
 (0)