Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions docs/03_concepts/code/03_rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,10 @@ async def main() -> None:

# If you try to add an existing request again, it will not do anything
add_request_info = await queue.add_request(
Request.from_url('http://different-example.com/5')
Request.from_url('http://example.com/5')
)
Actor.log.info(f'Add request info: {add_request_info}')

processed_request = await queue.get_request(add_request_info.id)
Actor.log.info(f'Processed request: {processed_request}')

# Finally, process the queue until all requests are handled
while not await queue.is_finished():
# Fetch the next unhandled request in the queue
Expand Down
4 changes: 2 additions & 2 deletions docs/04_upgrading/upgrading_to_v3.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
id: upgrading-to-v2
title: Upgrading to v2
id: upgrading-to-v3
title: Upgrading to v3
---

This page summarizes the breaking changes between Apify Python SDK v2.x and v3.0.
Expand Down
3 changes: 1 addition & 2 deletions src/apify/scrapy/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def to_scrapy_request(apify_request: ApifyRequest, spider: Spider) -> ScrapyRequ

# Update the meta field with the meta field from the apify_request
meta = scrapy_request.meta or {}
meta.update({'apify_request_id': apify_request.id, 'apify_request_unique_key': apify_request.unique_key})
meta.update({'apify_request_unique_key': apify_request.unique_key})
# scrapy_request.meta is a property, so we have to set it like this
scrapy_request._meta = meta # noqa: SLF001

Expand All @@ -134,7 +134,6 @@ def to_scrapy_request(apify_request: ApifyRequest, spider: Spider) -> ScrapyRequ
url=apify_request.url,
method=apify_request.method,
meta={
'apify_request_id': apify_request.id,
'apify_request_unique_key': apify_request.unique_key,
},
)
Expand Down
4 changes: 2 additions & 2 deletions src/apify/storage_clients/_apify/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ class CachedRequest(BaseModel):
Only internal structure.
"""

id: str
"""The ID of the request."""
unique_key: str
"""Unique key of the request."""

was_already_handled: bool
"""Whether the request was already handled."""
Expand Down
278 changes: 149 additions & 129 deletions src/apify/storage_clients/_apify/_request_queue_client.py

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion tests/integration/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,13 @@

def generate_unique_resource_name(label: str) -> str:
"""Generates a unique resource name, which will contain the given label."""
name_template = 'python-sdk-tests-{}-generated-{}'
template_length = len(name_template.format('', ''))
api_name_limit = 63
generated_random_id_length = 8
label_length_limit = api_name_limit - template_length - generated_random_id_length

label = label.replace('_', '-')
return f'python-sdk-tests-{label}-generated-{crypto_random_object_id(8)}'
assert len(label) <= label_length_limit, f'Max label length is {label_length_limit}, but got {len(label)}'

return name_template.format(label, crypto_random_object_id(generated_random_id_length))
16 changes: 15 additions & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

import apify._actor
from ._utils import generate_unique_resource_name
from apify import Actor
from apify._models import ActorRun

if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Coroutine, Iterator, Mapping
from collections.abc import AsyncGenerator, Awaitable, Callable, Coroutine, Iterator, Mapping
from decimal import Decimal

from apify_client.clients.resource_clients import ActorClientAsync
from crawlee.storages import RequestQueue

_TOKEN_ENV_VAR = 'APIFY_TEST_USER_API_TOKEN'
_API_URL_ENV_VAR = 'APIFY_INTEGRATION_TESTS_API_URL'
Expand Down Expand Up @@ -109,6 +111,18 @@ def apify_client_async(apify_token: str) -> ApifyClientAsync:
return ApifyClientAsync(apify_token, api_url=api_url)


@pytest.fixture
async def request_queue_force_cloud(apify_token: str, monkeypatch: pytest.MonkeyPatch) -> AsyncGenerator[RequestQueue]:
"""Create an instance of the Apify request queue on the platform and drop it when the test is finished."""
request_queue_name = generate_unique_resource_name('request_queue')
monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_token)

async with Actor:
rq = await Actor.open_request_queue(name=request_queue_name, force_cloud=True)
yield rq
await rq.drop()


@pytest.fixture(scope='session')
def sdk_wheel_path(tmp_path_factory: pytest.TempPathFactory, testrun_uid: str) -> Path:
"""Build the package wheel if it hasn't been built yet, and return the path to the wheel."""
Expand Down
95 changes: 88 additions & 7 deletions tests/integration/test_actor_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from ._utils import generate_unique_resource_name
from apify import Actor, Request
from apify._models import ActorRun

if TYPE_CHECKING:
from collections.abc import AsyncGenerator
Expand Down Expand Up @@ -85,6 +86,7 @@ async def test_force_cloud(
) -> None:
request_queue_id = (await apify_named_rq.get_metadata()).id
request_info = await apify_named_rq.add_request(Request.from_url('http://example.com'))
assert request_info.id is not None
request_queue_client = apify_client_async.request_queue(request_queue_id)

request_queue_details = await request_queue_client.get()
Expand All @@ -99,18 +101,17 @@ async def test_force_cloud(
async def test_request_queue_is_finished(
apify_named_rq: RequestQueue,
) -> None:
request_queue = await Actor.open_request_queue(name=apify_named_rq.name, force_cloud=True)
await request_queue.add_request(Request.from_url('http://example.com'))
assert not await request_queue.is_finished()
await apify_named_rq.add_request(Request.from_url('http://example.com'))
assert not await apify_named_rq.is_finished()

request = await request_queue.fetch_next_request()
request = await apify_named_rq.fetch_next_request()
assert request is not None
assert not await request_queue.is_finished(), (
assert not await apify_named_rq.is_finished(), (
'RequestQueue should not be finished unless the request is marked as handled.'
)

await request_queue.mark_request_as_handled(request)
assert await request_queue.is_finished()
await apify_named_rq.mark_request_as_handled(request)
assert await apify_named_rq.is_finished()


async def test_request_queue_deduplication(
Expand Down Expand Up @@ -317,3 +318,83 @@ def return_unprocessed_requests(requests: list[dict], *_: Any, **__: Any) -> dic
Actor.log.info(stats_after)

assert (stats_after['writeCount'] - stats_before['writeCount']) == 1


async def test_request_queue_had_multiple_clients_platform(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
"""Test that `RequestQueue` clients created with different `client_key` appear as distinct clients."""

async def main() -> None:
from apify_client import ApifyClientAsync

async with Actor:
rq_1 = await Actor.open_request_queue()
await rq_1.fetch_next_request()

# Accessed with client created explicitly with `client_key=None` should appear as distinct client
api_client = ApifyClientAsync(token=Actor.configuration.token).request_queue(
request_queue_id=rq_1.id, client_key=None
)
await api_client.list_head()

assert (await rq_1.get_metadata()).had_multiple_clients is True

actor = await make_actor(label='rq-had-multiple-clients', main_func=main)
run_result = await run_actor(actor)

assert run_result.status == 'SUCCEEDED'


async def test_request_queue_not_had_multiple_clients_platform(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
"""Test that same `RequestQueue` created from Actor does not act as multiple clients."""

async def main() -> None:
async with Actor:
rq_1 = await Actor.open_request_queue()
# Two calls to API to create situation where unset `client_key` can cause `had_multiple_clients` to True
await rq_1.fetch_next_request()
await rq_1.fetch_next_request()

assert (await rq_1.get_metadata()).had_multiple_clients is False

actor = await make_actor(label='rq-not-had-multiple-clients', main_func=main)
run_result = await run_actor(actor)

assert run_result.status == 'SUCCEEDED'


async def test_request_queue_not_had_multiple_clients_platform_resurrection(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
apify_client_async: ApifyClientAsync,
) -> None:
"""Test `RequestQueue` created from Actor does not act as multiple clients even after resurrection."""

async def main() -> None:
async with Actor:
rq_1 = await Actor.open_request_queue()
assert (await rq_1.get_metadata()).had_multiple_clients is False, 'Not accessed yet, should be False'

await rq_1.fetch_next_request()

assert (await rq_1.get_metadata()).had_multiple_clients is False, (
'Accessed with the same client, should be False'
)

actor = await make_actor(label='rq-clients-resurrection', main_func=main)
run_result = await run_actor(actor)
assert run_result.status == 'SUCCEEDED'

# Resurrect the run, the RequestQueue should still use same client key and thus not have multiple clients.
run_client = apify_client_async.run(run_id=run_result.id)
# Redirect logs even from the resurrected run
streamed_log = await run_client.get_streamed_log(from_start=False)
await run_client.resurrect()
async with streamed_log:
run_result = ActorRun.model_validate(await run_client.wait_for_finish(wait_secs=600))
assert run_result.status == 'SUCCEEDED'
115 changes: 99 additions & 16 deletions tests/integration/test_request_queue.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING

import pytest

from crawlee import Request

from apify import Actor

if TYPE_CHECKING:
from apify_client import ApifyClientAsync
from crawlee.storages import RequestQueue

from .conftest import MakeActorFunction, RunActorFunction


Expand Down Expand Up @@ -399,38 +405,35 @@ async def main() -> None:
assert run_result.status == 'SUCCEEDED'


async def test_get_request_by_id(
async def test_get_request_by_unique_key(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
"""Test retrieving specific requests by their ID."""
"""Test retrieving specific requests by their unique_key."""

async def main() -> None:
async with Actor:
rq = await Actor.open_request_queue()
Actor.log.info('Request queue opened')

# Add a request and get its ID
# Add a request and get its unique_key
add_result = await rq.add_request('https://example.com/test')
request_id = add_result.id
Actor.log.info(f'Request added with ID: {request_id}')
request_unique_key = add_result.unique_key
Actor.log.info(f'Request added with unique_key: {request_unique_key}')

# Retrieve the request by ID
retrieved_request = await rq.get_request(request_id)
# Retrieve the request by unique_key
retrieved_request = await rq.get_request(request_unique_key)
assert retrieved_request is not None, f'retrieved_request={retrieved_request}'
assert retrieved_request.url == 'https://example.com/test', f'retrieved_request.url={retrieved_request.url}'
assert retrieved_request.id == request_id, (
f'retrieved_request.id={retrieved_request.id}',
f'request_id={request_id}',
)
Actor.log.info('Request retrieved successfully by ID')
assert retrieved_request.unique_key == request_unique_key, (f'{request_unique_key=}',)
Actor.log.info('Request retrieved successfully by unique_key')

# Test with non-existent ID
non_existent_request = await rq.get_request('non-existent-id')
# Test with non-existent unique_key
non_existent_request = await rq.get_request('non-existent-unique_key')
assert non_existent_request is None, f'non_existent_request={non_existent_request}'
Actor.log.info('Non-existent ID correctly returned None')
Actor.log.info('Non-existent unique_key correctly returned None')

actor = await make_actor(label='rq-get-by-id-test', main_func=main)
actor = await make_actor(label='rq-get-by-unique-key-test', main_func=main)
run_result = await run_actor(actor)
assert run_result.status == 'SUCCEEDED'

Expand Down Expand Up @@ -1195,3 +1198,83 @@ async def consumer() -> int:
actor = await make_actor(label='rq-performance-pattern-test', main_func=main)
run_result = await run_actor(actor)
assert run_result.status == 'SUCCEEDED'


async def test_request_queue_enhanced_metadata(
request_queue_force_cloud: RequestQueue,
apify_client_async: ApifyClientAsync,
) -> None:
"""Test metadata tracking.

Multiple clients scenarios are not guaranteed to give correct results without delay. But at least multiple clients,
single producer, should be reliable on the producer side."""

for i in range(1, 10):
await request_queue_force_cloud.add_request(Request.from_url(f'http://example.com/{i}'))
# Reliable information as the API response is enhanced with local metadata estimation.
assert (await request_queue_force_cloud.get_metadata()).total_request_count == i

# Accessed with client created explicitly with `client_key=None` should appear as distinct client
api_client = apify_client_async.request_queue(request_queue_id=request_queue_force_cloud.id, client_key=None)
await api_client.list_head()

# The presence of another non-producing client should not affect the metadata
for i in range(10, 20):
await request_queue_force_cloud.add_request(Request.from_url(f'http://example.com/{i}'))
# Reliable information as the API response is enhanced with local metadata estimation.
assert (await request_queue_force_cloud.get_metadata()).total_request_count == i


async def test_request_queue_metadata_another_client(
request_queue_force_cloud: RequestQueue,
apify_client_async: ApifyClientAsync,
) -> None:
"""Test metadata tracking. The delayed metadata should be reliable even when changed by another client."""
api_client = apify_client_async.request_queue(request_queue_id=request_queue_force_cloud.id, client_key=None)
await api_client.add_request(Request.from_url('http://example.com/1').model_dump(by_alias=True, exclude={'id'}))

# Wait to be sure that the API has updated the global metadata
await asyncio.sleep(10)

assert (await request_queue_force_cloud.get_metadata()).total_request_count == 1


async def test_request_queue_had_multiple_clients(
request_queue_force_cloud: RequestQueue,
apify_client_async: ApifyClientAsync,
) -> None:
"""Test that `RequestQueue` correctly detects multiple clients.

Clients created with different `client_key` should appear as distinct clients."""
await request_queue_force_cloud.fetch_next_request()

# Accessed with client created explicitly with `client_key=None` should appear as distinct client
api_client = apify_client_async.request_queue(request_queue_id=request_queue_force_cloud.id, client_key=None)
await api_client.list_head()

# Check that it is correctly in the RequestQueueClient metadata
assert (await request_queue_force_cloud.get_metadata()).had_multiple_clients is True

# Check that it is correctly in the API
api_response = await api_client.get()
assert api_response
assert api_response['hadMultipleClients'] is True


async def test_request_queue_not_had_multiple_clients(
request_queue_force_cloud: RequestQueue, apify_client_async: ApifyClientAsync
) -> None:
"""Test that same `RequestQueue` created from Actor does not act as multiple clients."""

# Two calls to API to create situation where different `client_key` can set `had_multiple_clients` to True
await request_queue_force_cloud.fetch_next_request()
await request_queue_force_cloud.fetch_next_request()

# Check that it is correctly in the RequestQueueClient metadata
assert (await request_queue_force_cloud.get_metadata()).had_multiple_clients is False

# Check that it is correctly in the API
api_client = apify_client_async.request_queue(request_queue_id=request_queue_force_cloud.id)
api_response = await api_client.get()
assert api_response
assert api_response['hadMultipleClients'] is False
2 changes: 0 additions & 2 deletions tests/unit/scrapy/requests/test_to_apify_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def test_with_id_and_unique_key(spider: Spider) -> None:
url='https://example.com',
method='GET',
meta={
'apify_request_id': 'abc123',
'apify_request_unique_key': 'https://example.com',
'userData': {'some_user_data': 'hello'},
},
Expand All @@ -77,7 +76,6 @@ def test_with_id_and_unique_key(spider: Spider) -> None:

assert apify_request.url == 'https://example.com'
assert apify_request.method == 'GET'
assert apify_request.id == 'abc123'
assert apify_request.unique_key == 'https://example.com'

user_data = apify_request.user_data
Expand Down
Loading
Loading