Skip to content

Commit a5b93d5

Browse files
committed
incremental: avoid double loop with stream from sync iterables
Replicates graphql/graphql-js@d245e65
1 parent c4c4142 commit a5b93d5

File tree

3 files changed

+32
-25
lines changed

3 files changed

+32
-25
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ disallow_untyped_defs = false
275275
[tool.pyright]
276276
# silence pyright since we're using mypy already
277277
reportArgumentType = false
278+
reportAttributeAccessIssue = false
278279
reportIncompatibleVariableOverride = false
279280
reportInvalidTypeForm = false
280281
reportMissingModuleSource = false

src/graphql/execution/execute.py

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1717,11 +1717,13 @@ def first_sync_stream_items(
17171717
item_type: GraphQLOutputType,
17181718
) -> StreamItemsRecord:
17191719
"""Get the first sync stream items."""
1720-
is_awaitable = self.is_awaitable
1721-
path = stream_record.path
1722-
initial_path = Path(path, initial_index, None)
17231720

17241721
async def await_result() -> StreamItemsResult:
1722+
is_awaitable = self.is_awaitable
1723+
prepend_next_stream_items = self.prepend_next_stream_items
1724+
path = stream_record.path
1725+
initial_path = Path(path, initial_index, None)
1726+
17251727
result = self.complete_stream_items(
17261728
stream_record,
17271729
initial_path,
@@ -1731,8 +1733,8 @@ async def await_result() -> StreamItemsResult:
17311733
info,
17321734
item_type,
17331735
)
1734-
results = [result]
1735-
append_result = results.append
1736+
first_stream_items = StreamItemsRecord(stream_record, result)
1737+
current_stream_items = first_stream_items
17361738
current_index = initial_index
17371739
errored_synchronously = False
17381740
for item in iterator:
@@ -1752,33 +1754,27 @@ async def await_result() -> StreamItemsResult:
17521754
info,
17531755
item_type,
17541756
)
1755-
append_result(result)
17561757

1757-
current_index = len(results) - 1
1758+
next_stream_items = StreamItemsRecord(stream_record, result)
1759+
current_stream_items.result = prepend_next_stream_items(
1760+
current_stream_items.result, next_stream_items
1761+
)
1762+
current_stream_items = next_stream_items
1763+
17581764
# If a non-reconcilable stream items result was encountered,
17591765
# then the stream terminates in error. Otherwise, add a stream terminator.
1760-
prepend_next_stream_items = self.prepend_next_stream_items
1761-
current_result = (
1762-
results[current_index]
1763-
if errored_synchronously
1764-
else prepend_next_stream_items(
1765-
results[current_index],
1766+
if not errored_synchronously:
1767+
current_stream_items.result = prepend_next_stream_items(
1768+
current_stream_items.result,
17661769
StreamItemsRecord(
17671770
stream_record, TerminatingStreamItemsResult(stream_record)
17681771
),
17691772
)
1770-
)
17711773

1772-
while current_index > 0:
1773-
current_index -= 1
1774-
current_result = prepend_next_stream_items(
1775-
results[current_index],
1776-
StreamItemsRecord(stream_record, current_result),
1777-
)
1778-
1779-
if is_awaitable(current_result):
1780-
return await current_result # type: ignore
1781-
return current_result # type: ignore
1774+
result = first_stream_items.result
1775+
if is_awaitable(result):
1776+
return await result # type: ignore
1777+
return result # type: ignore
17821778

17831779
return StreamItemsRecord(stream_record, await_result())
17841780

src/graphql/execution/incremental_publisher.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -938,12 +938,22 @@ class NonReconcilableStreamItemsResult(NamedTuple):
938938
]
939939

940940

941-
class StreamItemsRecord(NamedTuple):
941+
class StreamItemsRecord:
942942
"""Stream items record"""
943943

944+
__slots__ = "result", "stream_record"
945+
944946
stream_record: SubsequentResultRecord
945947
result: AwaitableOrValue[StreamItemsResult]
946948

949+
def __init__(
950+
self,
951+
stream_record: SubsequentResultRecord,
952+
result: AwaitableOrValue[StreamItemsResult],
953+
) -> None:
954+
self.stream_record = stream_record
955+
self.result = result
956+
947957

948958
IncrementalDataRecord = Union[DeferredGroupedFieldSetRecord, StreamItemsRecord]
949959

0 commit comments

Comments
 (0)