Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/04-upgrading/upgrading_to_v20.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ Attributes suffixed with `_millis` were renamed to remove said suffix and have t

## Actor

The `Actor.main` method has been removed as it brings no benefits compared to using `async with Actor`.
- The `Actor.main` method has been removed as it brings no benefits compared to using `async with Actor`.
- The `Actor.add_webhook`, `Actor.start`, `Actor.call` and `Actor.start_task` methods now accept instances of the `apify.Webhook` model instead of an untyped `dict`.
- `Actor.start`, `Actor.call`, `Actor.start_task`, `Actor.set_status_message` and `Actor.abort` return instances of the `ActorRun` model instead of an untyped `dict`.

## Scrapy integration

Expand Down
13 changes: 12 additions & 1 deletion src/apify/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
from importlib import metadata

from apify_shared.consts import WebhookEventType
from crawlee.events._types import Event

from apify._actor import Actor
from apify._configuration import Configuration
from apify._models import Webhook
from apify._proxy_configuration import ProxyConfiguration, ProxyInfo

__version__ = metadata.version('apify')

__all__ = ['Actor', 'Event', 'Configuration', 'ProxyConfiguration', 'ProxyInfo', '__version__']
__all__ = [
'Actor',
'Event',
'Configuration',
'ProxyConfiguration',
'ProxyInfo',
'Webhook',
'WebhookEventType',
'__version__',
]
117 changes: 66 additions & 51 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
from typing_extensions import Self

from apify_client import ApifyClientAsync
from apify_shared.consts import ActorEnvVars, ActorExitCodes, ApifyEnvVars, WebhookEventType
from apify_shared.consts import ActorEnvVars, ActorExitCodes, ApifyEnvVars
from apify_shared.utils import ignore_docs, maybe_extract_enum_member_value
from crawlee import service_container
from crawlee.events._types import Event, EventPersistStateData

from apify._configuration import Configuration
from apify._consts import EVENT_LISTENERS_TIMEOUT
from apify._crypto import decrypt_input_secrets, load_private_key
from apify._models import ActorRun
from apify._platform_event_manager import EventManager, LocalEventManager, PlatformEventManager
from apify._proxy_configuration import ProxyConfiguration
from apify._utils import get_system_info, is_running_in_ipython
Expand All @@ -32,6 +33,8 @@

from crawlee.proxy_configuration import _NewUrlFunction

from apify._models import Webhook


MainReturnType = TypeVar('MainReturnType')

Expand Down Expand Up @@ -533,8 +536,8 @@ async def start(
memory_mbytes: int | None = None,
timeout: timedelta | None = None,
wait_for_finish: int | None = None,
webhooks: list[dict] | None = None,
) -> dict:
webhooks: list[Webhook] | None = None,
) -> ActorRun:
"""Run an Actor on the Apify platform.

Unlike `Actor.call`, this method just starts the run without waiting for finish.
Expand All @@ -555,10 +558,6 @@ async def start(
webhooks: Optional ad-hoc webhooks (https://docs.apify.com/webhooks/ad-hoc-webhooks) associated with
the Actor run which can be used to receive a notification, e.g. when the Actor finished or failed.
If you already have a webhook set up for the Actor or task, you do not have to add it again here.
Each webhook is represented by a dictionary containing these items:
* `event_types`: list of `WebhookEventType` values which trigger the webhook
* `request_url`: URL to which to send the webhook HTTP request
* `payload_template` (optional): Optional template for the request payload

Returns:
Info about the started Actor run
Expand All @@ -567,14 +566,20 @@ async def start(

client = self.new_client(token=token) if token else self._apify_client

return await client.actor(actor_id).start(
run_input=run_input,
content_type=content_type,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=int(timeout.total_seconds()) if timeout is not None else None,
wait_for_finish=wait_for_finish,
webhooks=webhooks,
return ActorRun.model_validate(
await client.actor(actor_id).start(
run_input=run_input,
content_type=content_type,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=int(timeout.total_seconds()) if timeout is not None else None,
wait_for_finish=wait_for_finish,
webhooks=[
hook.model_dump(by_alias=True, exclude_unset=True, exclude_defaults=True) for hook in webhooks
]
if webhooks
else None,
)
)

async def abort(
Expand All @@ -584,7 +589,7 @@ async def abort(
token: str | None = None,
status_message: str | None = None,
gracefully: bool | None = None,
) -> dict:
) -> ActorRun:
"""Abort given Actor run on the Apify platform using the current user account.

The user account is determined by the `APIFY_TOKEN` environment variable.
Expand All @@ -607,7 +612,7 @@ async def abort(
if status_message:
await client.run(run_id).update(status_message=status_message)

return await client.run(run_id).abort(gracefully=gracefully)
return ActorRun.model_validate(await client.run(run_id).abort(gracefully=gracefully))

async def call(
self,
Expand All @@ -619,9 +624,9 @@ async def call(
build: str | None = None,
memory_mbytes: int | None = None,
timeout: timedelta | None = None,
webhooks: list[dict] | None = None,
webhooks: list[Webhook] | None = None,
wait: timedelta | None = None,
) -> dict | None:
) -> ActorRun | None:
"""Start an Actor on the Apify Platform and wait for it to finish before returning.

It waits indefinitely, unless the wait argument is provided.
Expand Down Expand Up @@ -650,14 +655,20 @@ async def call(

client = self.new_client(token=token) if token else self._apify_client

return await client.actor(actor_id).call(
run_input=run_input,
content_type=content_type,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=int(timeout.total_seconds()) if timeout is not None else None,
webhooks=webhooks,
wait_secs=int(wait.total_seconds()) if wait is not None else None,
return ActorRun.model_validate(
await client.actor(actor_id).call(
run_input=run_input,
content_type=content_type,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=int(timeout.total_seconds()) if timeout is not None else None,
webhooks=[
hook.model_dump(by_alias=True, exclude_defaults=True, exclude_unset=True) for hook in webhooks
]
if webhooks
else [],
wait_secs=int(wait.total_seconds()) if wait is not None else None,
)
)

async def call_task(
Expand All @@ -668,10 +679,10 @@ async def call_task(
build: str | None = None,
memory_mbytes: int | None = None,
timeout: timedelta | None = None,
webhooks: list[dict] | None = None,
webhooks: list[Webhook] | None = None,
wait: timedelta | None = None,
token: str | None = None,
) -> dict | None:
) -> ActorRun | None:
"""Start an Actor task on the Apify Platform and wait for it to finish before returning.

It waits indefinitely, unless the wait argument is provided.
Expand Down Expand Up @@ -703,13 +714,19 @@ async def call_task(

client = self.new_client(token=token) if token else self._apify_client

return await client.task(task_id).call(
task_input=task_input,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=int(timeout.total_seconds()) if timeout is not None else None,
webhooks=webhooks,
wait_secs=int(wait.total_seconds()) if wait is not None else None,
return ActorRun.model_validate(
await client.task(task_id).call(
task_input=task_input,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=int(timeout.total_seconds()) if timeout is not None else None,
webhooks=[
hook.model_dump(by_alias=True, exclude_defaults=True, exclude_unset=True) for hook in webhooks
]
if webhooks
else [],
wait_secs=int(wait.total_seconds()) if wait is not None else None,
)
)

async def metamorph(
Expand Down Expand Up @@ -796,14 +813,12 @@ async def reboot(

async def add_webhook(
self,
webhook: Webhook,
*,
event_types: list[WebhookEventType],
request_url: str,
payload_template: str | None = None,
ignore_ssl_errors: bool | None = None,
do_not_retry: bool | None = None,
idempotency_key: str | None = None,
) -> dict | None:
) -> None:
"""Create an ad-hoc webhook for the current Actor run.

This webhook lets you receive a notification when the Actor run finished or failed.
Expand All @@ -814,9 +829,7 @@ async def add_webhook(
For more information about Apify Actor webhooks, please see the [documentation](https://docs.apify.com/webhooks).

Args:
event_types: List of event types that should trigger the webhook. At least one is required.
request_url: URL that will be invoked once the webhook is triggered.
payload_template: Specification of the payload that will be sent to request_url
webhook: The webhook to be added
ignore_ssl_errors: Whether the webhook should ignore SSL errors returned by request_url
do_not_retry: Whether the webhook should retry sending the payload to request_url upon failure.
idempotency_key: A unique identifier of a webhook. You can use it to ensure that you won't create
Expand All @@ -829,17 +842,17 @@ async def add_webhook(

if not self.is_at_home():
self.log.error('Actor.add_webhook() is only supported when running on the Apify platform.')
return None
return

# If is_at_home() is True, config.actor_run_id is always set
if not self._configuration.actor_run_id:
raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')

return await self._apify_client.webhooks().create(
await self._apify_client.webhooks().create(
actor_run_id=self._configuration.actor_run_id,
event_types=event_types,
request_url=request_url,
payload_template=payload_template,
event_types=webhook.event_types,
request_url=webhook.request_url,
payload_template=webhook.payload_template,
ignore_ssl_errors=ignore_ssl_errors,
do_not_retry=do_not_retry,
idempotency_key=idempotency_key,
Expand All @@ -850,7 +863,7 @@ async def set_status_message(
status_message: str,
*,
is_terminal: bool | None = None,
) -> dict | None:
) -> ActorRun | None:
"""Set the status message for the current Actor run.

Args:
Expand All @@ -871,8 +884,10 @@ async def set_status_message(
if not self._configuration.actor_run_id:
raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')

return await self._apify_client.run(self._configuration.actor_run_id).update(
status_message=status_message, is_status_message_terminal=is_terminal
return ActorRun.model_validate(
await self._apify_client.run(self._configuration.actor_run_id).update(
status_message=status_message, is_status_message_terminal=is_terminal
)
)

async def create_proxy_configuration(
Expand Down
108 changes: 108 additions & 0 deletions src/apify/_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# ruff: noqa: TCH001 TCH002 TCH003 (Pydantic)
from __future__ import annotations

from datetime import datetime, timedelta
from typing import Annotated

from pydantic import BaseModel, BeforeValidator, ConfigDict, Field

from apify_shared.consts import ActorJobStatus, MetaOrigin, WebhookEventType
from crawlee._utils.models import timedelta_ms
from crawlee._utils.urls import validate_http_url


class Webhook(BaseModel):
__model_config__ = ConfigDict(populate_by_name=True)

event_types: Annotated[
list[WebhookEventType],
Field(description='Event types that should trigger the webhook'),
]
request_url: Annotated[
str,
Field(description='URL that the webhook should call'),
BeforeValidator(validate_http_url),
]
payload_template: Annotated[
str | None,
Field(description='Template for the payload sent by the webook'),
] = None


class ActorRunMeta(BaseModel):
__model_config__ = ConfigDict(populate_by_name=True)

origin: Annotated[MetaOrigin, Field()]


class ActorRunStats(BaseModel):
__model_config__ = ConfigDict(populate_by_name=True)

input_body_len: Annotated[int, Field(alias='inputBodyLen')]
restart_count: Annotated[int, Field(alias='restartCount')]
resurrect_count: Annotated[int, Field(alias='resurrectCount')]
mem_avg_bytes: Annotated[float | None, Field(alias='memAvgBytes')] = None
mem_max_bytes: Annotated[int | None, Field(alias='memMaxBytes')] = None
mem_current_bytes: Annotated[int | None, Field(alias='memCurrentBytes')] = None
cpu_avg_usage: Annotated[float | None, Field(alias='cpuAvgUsage')] = None
cpu_max_usage: Annotated[float | None, Field(alias='cpuMaxUsage')] = None
cpu_current_usage: Annotated[float | None, Field(alias='cpuCurrentUsage')] = None
net_rx_bytes: Annotated[int | None, Field(alias='netRxBytes')] = None
net_tx_bytes: Annotated[int | None, Field(alias='netTxBytes')] = None
duration: Annotated[timedelta_ms | None, Field(alias='durationMillis')] = None
run_time: Annotated[timedelta | None, Field(alias='runTimeSecs')] = None
metamorph: Annotated[int | None, Field(alias='metamorph')] = None
compute_units: Annotated[float, Field(alias='computeUnits')]


class ActorRunOptions(BaseModel):
__model_config__ = ConfigDict(populate_by_name=True)
build: str
timeout: Annotated[timedelta, Field(alias='timeoutSecs')]
memory_mbytes: Annotated[int, Field(alias='memoryMbytes')]
disk_mbytes: Annotated[int, Field(alias='diskMbytes')]


class ActorRunUsage(BaseModel):
__model_config__ = ConfigDict(populate_by_name=True)
actor_compute_units: Annotated[float | None, Field(alias='ACTOR_COMPUTE_UNITS')] = None
dataset_reads: Annotated[float | None, Field(alias='DATASET_READS')] = None
dataset_writes: Annotated[float | None, Field(alias='DATASET_WRITES')] = None
key_value_store_reads: Annotated[float | None, Field(alias='KEY_VALUE_STORE_READS')] = None
key_value_store_writes: Annotated[float | None, Field(alias='KEY_VALUE_STORE_WRITES')] = None
key_value_store_lists: Annotated[float | None, Field(alias='KEY_VALUE_STORE_LISTS')] = None
request_queue_reads: Annotated[float | None, Field(alias='REQUEST_QUEUE_READS')] = None
request_queue_writes: Annotated[float | None, Field(alias='REQUEST_QUEUE_WRITES')] = None
data_transfer_internal_gbytes: Annotated[float | None, Field(alias='DATA_TRANSFER_INTERNAL_GBYTES')] = None
data_transfer_external_gbytes: Annotated[float | None, Field(alias='DATA_TRANSFER_EXTERNAL_GBYTES')] = None
proxy_residential_transfer_gbytes: Annotated[float | None, Field(alias='PROXY_RESIDENTIAL_TRANSFER_GBYTES')] = None
proxy_serps: Annotated[float | None, Field(alias='PROXY_SERPS')] = None


class ActorRun(BaseModel):
__model_config__ = ConfigDict(populate_by_name=True)

id: Annotated[str, Field(alias='id')]
act_id: Annotated[str, Field(alias='actId')]
user_id: Annotated[str, Field(alias='userId')]
actor_task_id: Annotated[str | None, Field(alias='actorTaskId')] = None
started_at: Annotated[datetime, Field(alias='startedAt')]
finished_at: Annotated[datetime | None, Field(alias='finishedAt')] = None
status: Annotated[ActorJobStatus, Field(alias='status')]
status_message: Annotated[str | None, Field(alias='statusMessage')] = None
is_status_message_terminal: Annotated[bool | None, Field(alias='isStatusMessageTerminal')] = None
meta: Annotated[ActorRunMeta, Field(alias='meta')]
stats: Annotated[ActorRunStats, Field(alias='stats')]
options: Annotated[ActorRunOptions, Field(alias='options')]
build_id: Annotated[str, Field(alias='buildId')]
exit_code: Annotated[int | None, Field(alias='exitCode')] = None
default_key_value_store_id: Annotated[str, Field(alias='defaultKeyValueStoreId')]
default_dataset_id: Annotated[str, Field(alias='defaultDatasetId')]
default_request_queue_id: Annotated[str, Field(alias='defaultRequestQueueId')]
build_number: Annotated[str | None, Field(alias='buildNumber')] = None
container_url: Annotated[str, Field(alias='containerUrl')]
is_container_server_ready: Annotated[bool | None, Field(alias='isContainerServerReady')] = None
git_branch_name: Annotated[str | None, Field(alias='gitBranchName')] = None
usage: Annotated[ActorRunUsage | None, Field(alias='usage')] = None
usage_total_usd: Annotated[float | None, Field(alias='usageTotalUsd')] = None
usage_usd: Annotated[ActorRunUsage | None, Field(alias='usageUsd')] = None
Loading