Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ keywords = [
"scraping",
]
dependencies = [
"apify-client>=2.0.0,<3.0.0",
"apify-client>=2.1.0,<3.0.0",
"apify-shared>=2.0.0,<3.0.0",
"crawlee==1.0.0rc1",
"cachetools>=5.5.0",
Expand Down
54 changes: 7 additions & 47 deletions src/apify/storage_clients/_apify/_dataset_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

from typing_extensions import override

from apify_client import ApifyClientAsync
from crawlee._utils.byte_size import ByteSize
from crawlee._utils.file import json_dumps
from crawlee.storage_clients._base import DatasetClient
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata

from apify.storage_clients._apify._utils import create_apify_client, resolve_storage_id

if TYPE_CHECKING:
from collections.abc import AsyncIterator

Expand Down Expand Up @@ -39,7 +40,6 @@ def __init__(
self,
*,
api_client: DatasetClientAsync,
api_public_base_url: str,
lock: asyncio.Lock,
) -> None:
"""Initialize a new instance.
Expand All @@ -49,9 +49,6 @@ def __init__(
self._api_client = api_client
"""The Apify dataset client for API operations."""

self._api_public_base_url = api_public_base_url
"""The public base URL for accessing the key-value store records."""

self._lock = lock
"""A lock to ensure that only one operation is performed at a time."""

Expand Down Expand Up @@ -90,52 +87,16 @@ async def open(
are provided, or if neither `id` nor `name` is provided and no default storage ID is available in
the configuration.
"""
token = configuration.token
if not token:
raise ValueError(f'Apify storage client requires a valid token in Configuration (token={token}).')

api_url = configuration.api_base_url
if not api_url:
raise ValueError(f'Apify storage client requires a valid API URL in Configuration (api_url={api_url}).')

api_public_base_url = configuration.api_public_base_url
if not api_public_base_url:
raise ValueError(
'Apify storage client requires a valid API public base URL in Configuration '
f'(api_public_base_url={api_public_base_url}).'
)

# Create Apify client with the provided token and API URL.
apify_client_async = ApifyClientAsync(
token=token,
api_url=api_url,
max_retries=8,
min_delay_between_retries_millis=500,
timeout_secs=360,
)
apify_client_async = create_apify_client(configuration=configuration)
apify_datasets_client = apify_client_async.datasets()

# If both id and name are provided, raise an error.
if id and name:
raise ValueError('Only one of "id" or "name" can be specified, not both.')

# If id is provided, get the storage by ID.
if id and name is None:
apify_dataset_client = apify_client_async.dataset(dataset_id=id)

# If name is provided, get or create the storage by name.
if name and id is None:
id = DatasetMetadata.model_validate(
async def id_getter() -> str:
return DatasetMetadata.model_validate(
await apify_datasets_client.get_or_create(name=name),
).id
apify_dataset_client = apify_client_async.dataset(dataset_id=id)

# If both id and name are None, try to get the default storage ID from environment variables.
# The default storage ID environment variable is set by the Apify platform. It also contains
# a new storage ID after Actor's reboot or migration.
if id is None and name is None:
id = configuration.default_dataset_id
apify_dataset_client = apify_client_async.dataset(dataset_id=id)
id = await resolve_storage_id(id, name, default_id=configuration.default_dataset_id, id_getter=id_getter())
apify_dataset_client = apify_client_async.dataset(dataset_id=id)

# Fetch its metadata.
metadata = await apify_dataset_client.get()
Expand All @@ -154,7 +115,6 @@ async def open(

return cls(
api_client=apify_dataset_client,
api_public_base_url=api_public_base_url,
lock=asyncio.Lock(),
)

Expand Down
65 changes: 19 additions & 46 deletions src/apify/storage_clients/_apify/_key_value_store_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
from typing_extensions import override
from yarl import URL

from apify_client import ApifyClientAsync
from crawlee.storage_clients._base import KeyValueStoreClient
from crawlee.storage_clients.models import KeyValueStoreRecord, KeyValueStoreRecordMetadata

from ._models import ApifyKeyValueStoreMetadata, KeyValueStoreListKeysPage
from ._utils import create_apify_client, resolve_storage_id
from apify._crypto import create_hmac_signature

if TYPE_CHECKING:
Expand All @@ -31,7 +31,6 @@ def __init__(
self,
*,
api_client: KeyValueStoreClientAsync,
api_public_base_url: str,
lock: asyncio.Lock,
) -> None:
"""Initialize a new instance.
Expand All @@ -41,9 +40,6 @@ def __init__(
self._api_client = api_client
"""The Apify KVS client for API operations."""

self._api_public_base_url = api_public_base_url
"""The public base URL for accessing the key-value store records."""

self._lock = lock
"""A lock to ensure that only one operation is performed at a time."""

Expand Down Expand Up @@ -82,55 +78,29 @@ async def open(
are provided, or if neither `id` nor `name` is provided and no default storage ID is available
in the configuration.
"""
token = configuration.token
if not token:
raise ValueError(f'Apify storage client requires a valid token in Configuration (token={token}).')

api_url = configuration.api_base_url
if not api_url:
raise ValueError(f'Apify storage client requires a valid API URL in Configuration (api_url={api_url}).')

api_public_base_url = configuration.api_public_base_url
if not api_public_base_url:
raise ValueError(
'Apify storage client requires a valid API public base URL in Configuration '
f'(api_public_base_url={api_public_base_url}).'
)

# Create Apify client with the provided token and API URL.
apify_client_async = ApifyClientAsync(
token=token,
api_url=api_url,
max_retries=8,
min_delay_between_retries_millis=500,
timeout_secs=360,
)
apify_client_async = create_apify_client(configuration=configuration)
apify_kvss_client = apify_client_async.key_value_stores()

# If both id and name are provided, raise an error.
if id and name:
raise ValueError('Only one of "id" or "name" can be specified, not both.')

# If id is provided, get the storage by ID.
if id and name is None:
apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=id)

# If name is provided, get or create the storage by name.
if name and id is None:
id = ApifyKeyValueStoreMetadata.model_validate(
async def id_getter() -> str:
return ApifyKeyValueStoreMetadata.model_validate(
await apify_kvss_client.get_or_create(name=name),
).id
apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=id)

# If both id and name are None, try to get the default storage ID from environment variables.
# The default storage ID environment variable is set by the Apify platform. It also contains
# a new storage ID after Actor's reboot or migration.
if id is None and name is None:
id = configuration.default_key_value_store_id
apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=id)
def client_creator(id: str) -> KeyValueStoreClientAsync:
return apify_client_async.key_value_store(key_value_store_id=id)

# Fetch its metadata.
metadata = await apify_kvs_client.get()
id = await resolve_storage_id(
id, name, default_id=configuration.default_key_value_store_id, id_getter=id_getter()
)
# Create the resource client
apify_rq_client = client_creator(id=id)

# Verify that the storage exists by fetching its metadata.
metadata = await apify_rq_client.get()

# If metadata is None, it means the storage does not exist, so we create it.
if metadata is None:
Expand All @@ -146,7 +116,6 @@ async def open(

return cls(
api_client=apify_kvs_client,
api_public_base_url=api_public_base_url,
lock=asyncio.Lock(),
)

Expand Down Expand Up @@ -231,7 +200,11 @@ async def get_public_url(self, key: str) -> str:
raise ValueError('resource_id cannot be None when generating a public URL')

public_url = (
URL(self._api_public_base_url) / 'v2' / 'key-value-stores' / self._api_client.resource_id / 'records' / key
URL(self._api_client.root_client.public_base_url)
/ 'key-value-stores'
/ self._api_client.resource_id
/ 'records'
/ key
)
metadata = await self.get_metadata()

Expand Down
56 changes: 12 additions & 44 deletions src/apify/storage_clients/_apify/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
from cachetools import LRUCache
from typing_extensions import override

from apify_client import ApifyClientAsync
from crawlee._utils.crypto import crypto_random_object_id
from crawlee.storage_clients._base import RequestQueueClient
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata

from ._models import CachedRequest, ProlongRequestLockResponse, RequestQueueHead
from ._utils import create_apify_client, resolve_storage_id
from apify import Request

if TYPE_CHECKING:
Expand Down Expand Up @@ -66,6 +66,7 @@ def __init__(
self,
*,
api_client: RequestQueueClientAsync,
lock: asyncio.Lock,
metadata: RequestQueueMetadata,
) -> None:
"""Initialize a new instance.
Expand All @@ -90,7 +91,7 @@ def __init__(
self._should_check_for_forefront_requests = False
"""Whether to check for forefront requests in the next list_head call."""

self._fetch_lock = asyncio.Lock()
self._fetch_lock = lock
"""Fetch lock to minimize race conditions when communicating with API."""

async def _get_metadata_estimate(self) -> RequestQueueMetadata:
Expand Down Expand Up @@ -160,51 +161,17 @@ async def open(
are provided, or if neither `id` nor `name` is provided and no default storage ID is available
in the configuration.
"""
token = configuration.token
if not token:
raise ValueError(f'Apify storage client requires a valid token in Configuration (token={token}).')

api_url = configuration.api_base_url
if not api_url:
raise ValueError(f'Apify storage client requires a valid API URL in Configuration (api_url={api_url}).')

api_public_base_url = configuration.api_public_base_url
if not api_public_base_url:
raise ValueError(
'Apify storage client requires a valid API public base URL in Configuration '
f'(api_public_base_url={api_public_base_url}).'
)

# Create Apify client with the provided token and API URL.
apify_client_async = ApifyClientAsync(
token=token,
api_url=api_url,
max_retries=8,
min_delay_between_retries_millis=500,
timeout_secs=360,
)
apify_client_async = create_apify_client(configuration=configuration)
apify_rqs_client = apify_client_async.request_queues()

match (id, name):
case (None, None):
# If both id and name are None, try to get the default storage ID from environment variables.
# The default storage ID environment variable is set by the Apify platform. It also contains
# a new storage ID after Actor's reboot or migration.
id = configuration.default_request_queue_id
case (None, name):
# If only name is provided, get or create the storage by name.
id = RequestQueueMetadata.model_validate(
await apify_rqs_client.get_or_create(name=name),
).id
case (_, None):
# If only id is provided, use it.
pass
case (_, _):
# If both id and name are provided, raise an error.
raise ValueError('Only one of "id" or "name" can be specified, not both.')
if id is None:
raise RuntimeError('Unreachable code')
async def id_getter() -> str:
return RequestQueueMetadata.model_validate(await apify_rqs_client.get_or_create(name=name)).id

id = await resolve_storage_id(
id, name, default_id=configuration.default_request_queue_id, id_getter=id_getter()
)

# Create the resource client
# Use suitable client_key to make `hadMultipleClients` response of Apify API useful.
# It should persist across migrated or resurrected Actor runs on the Apify platform.
_api_max_client_key_length = 32
Expand Down Expand Up @@ -233,6 +200,7 @@ async def open(

return cls(
api_client=apify_rq_client,
lock=asyncio.Lock(),
metadata=metadata_model,
)

Expand Down
65 changes: 65 additions & 0 deletions src/apify/storage_clients/_apify/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from apify_client import ApifyClientAsync

if TYPE_CHECKING:
from collections.abc import Coroutine

from apify import Configuration


def create_apify_client(configuration: Configuration) -> ApifyClientAsync:
"""Create and return an ApifyClientAsync instance using the provided configuration."""
token = configuration.token
if not token:
raise ValueError(f'Apify storage client requires a valid token in Configuration (token={token}).')

api_url = configuration.api_base_url
if not api_url:
raise ValueError(f'Apify storage client requires a valid API URL in Configuration (api_url={api_url}).')

api_public_base_url = configuration.api_public_base_url
if not api_public_base_url:
raise ValueError(
'Apify storage client requires a valid API public base URL in Configuration '
f'(api_public_base_url={api_public_base_url}).'
)

# Create Apify client with the provided token and API URL.
return ApifyClientAsync(
token=token,
api_url=api_url,
api_public_url=api_public_base_url,
max_retries=8,
min_delay_between_retries_millis=500,
timeout_secs=360,
)


async def resolve_storage_id(
id: str | None, name: str | None, default_id: str, id_getter: Coroutine[None, None, str]
) -> str:
"""Resolve and return the storage ID from arguments."""
try:
match (id, name):
case (None, None):
# If both id and name are None, try to get the default storage ID from environment variables.
# The default storage ID environment variable is set by the Apify platform. It also contains
# a new storage ID after Actor's reboot or migration.
id = default_id
case (None, name):
# If only name is provided, get or create the storage by name.
id = await id_getter
case (_, None):
# If only id is provided, use it.
pass
case (_, _):
# If both id and name are provided, raise an error.
raise ValueError('Only one of "id" or "name" can be specified, not both.')
if id is None:
raise RuntimeError('Unreachable code')
return id
finally:
id_getter.close()
Loading
Loading