Skip to content

Commit 7b5ee07

Browse files
committed
Fixing integration test test_push_large_data_chunks_over_9mb
1 parent 98b76c5 commit 7b5ee07

File tree

1 file changed

+84
-1
lines changed

1 file changed

+84
-1
lines changed

src/apify/storage_clients/_apify/_dataset_client.py

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from typing_extensions import override
88

99
from apify_client import ApifyClientAsync
10+
from crawlee._utils.byte_size import ByteSize
11+
from crawlee._utils.file import json_dumps
1012
from crawlee.storage_clients._base import DatasetClient
1113
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata
1214

@@ -15,6 +17,7 @@
1517
from datetime import datetime
1618

1719
from apify_client.clients import DatasetClientAsync
20+
from crawlee._types import JsonSerializable
1821
from crawlee.configuration import Configuration
1922

2023
logger = getLogger(__name__)
@@ -23,6 +26,15 @@
2326
class ApifyDatasetClient(DatasetClient):
2427
"""An Apify platform implementation of the dataset client."""
2528

29+
_MAX_PAYLOAD_SIZE = ByteSize.from_mb(9)
30+
"""Maximum size for a single payload."""
31+
32+
_SAFETY_BUFFER_PERCENT = 0.01 / 100 # 0.01%
33+
"""Percentage buffer to reduce payload limit slightly for safety."""
34+
35+
_EFFECTIVE_LIMIT_SIZE = _MAX_PAYLOAD_SIZE - (_MAX_PAYLOAD_SIZE * _SAFETY_BUFFER_PERCENT)
36+
"""Calculated payload limit considering safety buffer."""
37+
2638
def __init__(
2739
self,
2840
*,
@@ -135,8 +147,22 @@ async def drop(self) -> None:
135147

136148
@override
137149
async def push_data(self, data: list[Any] | dict[str, Any]) -> None:
150+
async def payloads_generator() -> AsyncIterator[str]:
151+
for index, item in enumerate(data):
152+
yield await self._check_and_serialize(item, index)
153+
138154
async with self._lock:
139-
await self._api_client.push_items(items=data)
155+
# Handle lists
156+
if isinstance(data, list):
157+
# Invoke client in series to preserve the order of data
158+
async for items in self._chunk_by_size(payloads_generator()):
159+
await self._api_client.push_items(items=items)
160+
161+
# Handle singular items
162+
else:
163+
items = await self._check_and_serialize(data)
164+
await self._api_client.push_items(items=items)
165+
140166
await self._update_metadata()
141167

142168
@override
@@ -205,3 +231,60 @@ async def _update_metadata(self) -> None:
205231
"""Update the dataset metadata file with current information."""
206232
metadata = await self._api_client.get()
207233
self._metadata = DatasetMetadata.model_validate(metadata)
234+
235+
@classmethod
236+
async def _check_and_serialize(cls, item: JsonSerializable, index: int | None = None) -> str:
237+
"""Serialize a given item to JSON, checks its serializability and size against a limit.
238+
239+
Args:
240+
item: The item to serialize.
241+
index: Index of the item, used for error context.
242+
243+
Returns:
244+
Serialized JSON string.
245+
246+
Raises:
247+
ValueError: If item is not JSON serializable or exceeds size limit.
248+
"""
249+
s = ' ' if index is None else f' at index {index} '
250+
251+
try:
252+
payload = await json_dumps(item)
253+
except Exception as exc:
254+
raise ValueError(f'Data item{s}is not serializable to JSON.') from exc
255+
256+
payload_size = ByteSize(len(payload.encode('utf-8')))
257+
if payload_size > cls._EFFECTIVE_LIMIT_SIZE:
258+
raise ValueError(f'Data item{s}is too large (size: {payload_size}, limit: {cls._EFFECTIVE_LIMIT_SIZE})')
259+
260+
return payload
261+
262+
async def _chunk_by_size(self, items: AsyncIterator[str]) -> AsyncIterator[str]:
263+
"""Yield chunks of JSON arrays composed of input strings, respecting a size limit.
264+
265+
Groups an iterable of JSON string payloads into larger JSON arrays, ensuring the total size
266+
of each array does not exceed `EFFECTIVE_LIMIT_SIZE`. Each output is a JSON array string that
267+
contains as many payloads as possible without breaching the size threshold, maintaining the
268+
order of the original payloads. Assumes individual items are below the size limit.
269+
270+
Args:
271+
items: Iterable of JSON string payloads.
272+
273+
Yields:
274+
Strings representing JSON arrays of payloads, each staying within the size limit.
275+
"""
276+
last_chunk_size = ByteSize(2) # Add 2 bytes for [] wrapper.
277+
current_chunk = []
278+
279+
async for payload in items:
280+
payload_size = ByteSize(len(payload.encode('utf-8')))
281+
282+
if last_chunk_size + payload_size <= self._EFFECTIVE_LIMIT_SIZE:
283+
current_chunk.append(payload)
284+
last_chunk_size += payload_size + ByteSize(1) # Add 1 byte for ',' separator.
285+
else:
286+
yield f'[{",".join(current_chunk)}]'
287+
current_chunk = [payload]
288+
last_chunk_size = payload_size + ByteSize(2) # Add 2 bytes for [] wrapper.
289+
290+
yield f'[{",".join(current_chunk)}]'

0 commit comments

Comments
 (0)