Skip to content

Commit ff13eeb

Browse files
feat/river schema v2 compatibility (#106)
Why === River Schema v2 compatibility so we can start getting rid of the compatibility layer. What changed ============ Client --- Significantly restructured the `river.v2.*`. - Nearly all `asyncio.Lock`'s have been removed - Background tasks are now cleaned up correctly, which should reduce the amount of noise around tasks being GC'd while they're still running - Move websocket state ownership from Client transport down into Client Session. This significantly simplifies transparent reconnects, since we don't need to coordinate state shifting. - There are four background tasks: - Heartbeat checker. This keeps track of whether we've gotten any heartbeats from the server recently. - "Close session" checker. This keeps track of whether we've exceeded the grace period for transparent reconnects. - Buffered message sender. One downside with the previous implementation was that we had [a race](https://us5.datadoghq.com/logs?query=%40river.tags%3Ainvariant-violation%20-%40river.extras.badASN%3Atrue%20service%3Apid2%20status%3Awarn%20%22received%20out%20of%20order%20msg%20closing%20connection%20got%20seq%22%20%22wanted%20seq%22&agg_m=count&agg_m_source=base&agg_t=count&cols=host%2Cservice&fromUser=true&messageDisplay=inline&refresh_mode=sliding&storage=hot&stream_sort=desc&viz=stream&from_ts=1742751250088&to_ts=1743010450088&live=true) on sending messages over the websocket, since we had no coordination between calls to `send_message` when called concurrently, so we'd end up with `seq` bound and incremented but then messages would become out-of-order by the time the `ws.send` method returned. Now we just atomically append to a [`deque`](https://docs.python.org/3/library/collections.html#collections.deque) as we push onto the send buffer, and when we get a confirmation from the ws library we move the message over to an ack buffer (also a `deque`) waiting for incoming messages from the server to allow us to drain. Codegen --- - `init` is now required for all method types, `input` is now optional. This simplifies `rpc` and `stream` codegen. - Fixed some bugs where `init` was being used in `input` position and vice versa. - Swapping `f"'{foo}'"` encoding for `f"{repr(foo)}"`, giving greater safety post-generation (to avoid situations where `f"'{None}'"` gets rendered as `"None"`. `f"{repr(None)}"` would render as `None` which would fail typechecks.) Test plan ========= Manually ran codegen against v2 generated schemas, everything typechecks.
1 parent b72caf8 commit ff13eeb

File tree

99 files changed

+5468
-698
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

99 files changed

+5468
-698
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ dependencies = [
2828
"nanoid>=2.0.0",
2929
"protobuf>=5.28.3",
3030
"pydantic-core>=2.20.1",
31-
"websockets>=12.0",
31+
"websockets>=13.0,<14",
3232
"opentelemetry-sdk>=1.28.2",
3333
"opentelemetry-api>=1.28.2",
3434
]

scripts/parity.sh

Lines changed: 0 additions & 40 deletions
This file was deleted.

scripts/parity/check_parity.py

Lines changed: 0 additions & 295 deletions
This file was deleted.

src/replit_river/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from . import v2
12
from .client import Client
23
from .error_schema import RiverError
34
from .rpc import (
@@ -20,4 +21,5 @@
2021
"subscription_method_handler",
2122
"upload_method_handler",
2223
"stream_method_handler",
24+
"v2",
2325
]

src/replit_river/client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,9 +226,11 @@ def _trace_procedure(
226226
span_handle = _SpanHandle(span)
227227
try:
228228
yield span_handle
229+
span_handle.set_status(StatusCode.OK)
229230
except GeneratorExit:
230231
# This error indicates the caller is done with the async generator
231232
# but messages are still left. This is okay, we do not consider it an error.
233+
span_handle.set_status(StatusCode.OK)
232234
raise
233235
except RiverException as e:
234236
span.record_exception(e, escaped=True)
@@ -239,7 +241,6 @@ def _trace_procedure(
239241
span_handle.set_status(StatusCode.ERROR, f"{type(e).__name__}: {e}")
240242
raise e
241243
finally:
242-
span_handle.set_status(StatusCode.OK)
243244
span.end()
244245

245246

0 commit comments

Comments
 (0)