Skip to content

Commit 90252dd

Browse files
Taking a stand on request nullability in river v2
1 parent 028a49a commit 90252dd

File tree

3 files changed

+23
-49
lines changed

3 files changed

+23
-49
lines changed

src/replit_river/codegen/client.py

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -978,32 +978,9 @@ async def {name}(
978978
]
979979
)
980980
elif protocol_version == "v2.0":
981-
assert init_meta, "Protocol v2 requires init to be defined"
982-
_, init_type, render_init_method = init_meta
983-
current_chunks.extend(
984-
[
985-
reindent(
986-
" ",
987-
f"""\
988-
async def {name}(
989-
self,
990-
init: {render_type_expr(init_type)},
991-
) -> { # TODO(dstewart) This should just be output_type
992-
render_type_expr(output_or_error_type)
993-
}:
994-
return await self.client.send_upload(
995-
{repr(schema_name)},
996-
{repr(name)},
997-
init,
998-
None,
999-
{reindent(" ", render_init_method)},
1000-
None,
1001-
{reindent(" ", parse_output_method)},
1002-
{reindent(" ", parse_error_method)},
1003-
)
1004-
""",
1005-
)
1006-
]
981+
raise ValueError(
982+
"It is expected that protocol v2 uploads have both init and input "
983+
"defined, otherwise it's no different than rpc",
1007984
)
1008985
else:
1009986
assert_never(protocol_version)

src/replit_river/v2/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ async def send_upload[I, R, A](
7575
service_name: str,
7676
procedure_name: str,
7777
init: I,
78-
request: AsyncIterable[R] | None,
78+
request: AsyncIterable[R],
7979
init_serializer: Callable[[I], Any],
80-
request_serializer: Callable[[R], Any] | None,
80+
request_serializer: Callable[[R], Any],
8181
response_deserializer: Callable[[Any], A],
8282
error_deserializer: Callable[[Any], RiverError],
8383
) -> A:

src/replit_river/v2/session.py

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -661,9 +661,9 @@ async def send_upload[I, R, A](
661661
service_name: str,
662662
procedure_name: str,
663663
init: I,
664-
request: AsyncIterable[R] | None,
664+
request: AsyncIterable[R],
665665
init_serializer: Callable[[I], Any],
666-
request_serializer: Callable[[R], Any] | None,
666+
request_serializer: Callable[[R], Any],
667667
response_deserializer: Callable[[Any], A],
668668
error_deserializer: Callable[[Any], RiverError],
669669
span: Span,
@@ -684,25 +684,22 @@ async def send_upload[I, R, A](
684684

685685
async with self._with_stream(stream_id, 1) as (backpressure_waiter, output):
686686
try:
687-
if request:
688-
assert request_serializer, "send_stream missing request_serializer"
689-
690-
# If this request is not closed and the session is killed, we should
691-
# throw exception here
692-
async for item in request:
693-
# Block for backpressure
694-
await backpressure_waiter.wait()
695-
if output.closed():
696-
logger.debug("Stream is closed, avoid sending the rest")
697-
break
698-
await self._send_message(
699-
stream_id=stream_id,
700-
service_name=service_name,
701-
procedure_name=procedure_name,
702-
control_flags=0,
703-
payload=request_serializer(item),
704-
span=span,
705-
)
687+
# If this request is not closed and the session is killed, we should
688+
# throw exception here
689+
async for item in request:
690+
# Block for backpressure
691+
await backpressure_waiter.wait()
692+
if output.closed():
693+
logger.debug("Stream is closed, avoid sending the rest")
694+
break
695+
await self._send_message(
696+
stream_id=stream_id,
697+
service_name=service_name,
698+
procedure_name=procedure_name,
699+
control_flags=0,
700+
payload=request_serializer(item),
701+
span=span,
702+
)
706703
except WebsocketClosedException as e:
707704
raise RiverServiceException(
708705
ERROR_CODE_STREAM_CLOSED, str(e), service_name, procedure_name

0 commit comments

Comments
 (0)