Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/04-upgrading/upgrading_to_v20.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ Attributes suffixed with `_millis` were renamed to remove said suffix and have t

## Actor

The `Actor.main` method has been removed as it brings no benefits compared to using `async with Actor`.
- The `Actor.main` method has been removed as it brings no benefits compared to using `async with Actor`.
- The `Actor.add_webhook`, `Actor.start`, `Actor.call` and `Actor.start_task` methods now accept instances of the `apify.Webhook` model instead of an untyped `dict`.
- `Actor.start`, `Actor.call`, `Actor.start_task`, `Actor.set_status_message` and `Actor.abort` return instances of the `ActorRun` model instead of an untyped `dict`.

## Scrapy integration

Expand Down
13 changes: 12 additions & 1 deletion src/apify/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
from importlib import metadata

from apify_shared.consts import WebhookEventType
from crawlee.events._types import Event

from apify._actor import Actor
from apify._configuration import Configuration
from apify._models import Webhook
from apify._proxy_configuration import ProxyConfiguration, ProxyInfo

__version__ = metadata.version('apify')

__all__ = ['Actor', 'Event', 'Configuration', 'ProxyConfiguration', 'ProxyInfo', '__version__']
__all__ = [
'Actor',
'Event',
'Configuration',
'ProxyConfiguration',
'ProxyInfo',
'Webhook',
'WebhookEventType',
'__version__',
]
92 changes: 59 additions & 33 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
from typing_extensions import Self

from apify_client import ApifyClientAsync
from apify_shared.consts import ActorEnvVars, ActorExitCodes, ApifyEnvVars, WebhookEventType
from apify_shared.consts import ActorEnvVars, ActorExitCodes, ApifyEnvVars
from apify_shared.utils import ignore_docs, maybe_extract_enum_member_value
from crawlee import service_container
from crawlee.events._types import Event, EventPersistStateData

from apify._configuration import Configuration
from apify._consts import EVENT_LISTENERS_TIMEOUT
from apify._crypto import decrypt_input_secrets, load_private_key
from apify._models import ActorRun
from apify._platform_event_manager import EventManager, LocalEventManager, PlatformEventManager
from apify._proxy_configuration import ProxyConfiguration
from apify._utils import get_system_info, is_running_in_ipython
Expand All @@ -32,6 +33,8 @@

from crawlee.proxy_configuration import _NewUrlFunction

from apify._models import Webhook


MainReturnType = TypeVar('MainReturnType')

Expand Down Expand Up @@ -533,8 +536,8 @@ async def start(
memory_mbytes: int | None = None,
timeout: timedelta | None = None,
wait_for_finish: int | None = None,
webhooks: list[dict] | None = None,
) -> dict:
webhooks: list[Webhook] | None = None,
) -> ActorRun:
"""Run an Actor on the Apify platform.

Unlike `Actor.call`, this method just starts the run without waiting for finish.
Expand All @@ -555,10 +558,6 @@ async def start(
webhooks: Optional ad-hoc webhooks (https://docs.apify.com/webhooks/ad-hoc-webhooks) associated with
the Actor run which can be used to receive a notification, e.g. when the Actor finished or failed.
If you already have a webhook set up for the Actor or task, you do not have to add it again here.
Each webhook is represented by a dictionary containing these items:
* `event_types`: list of `WebhookEventType` values which trigger the webhook
* `request_url`: URL to which to send the webhook HTTP request
* `payload_template` (optional): Optional template for the request payload

Returns:
Info about the started Actor run
Expand All @@ -567,24 +566,33 @@ async def start(

client = self.new_client(token=token) if token else self._apify_client

return await client.actor(actor_id).start(
if webhooks:
serialized_webhooks = [
hook.model_dump(by_alias=True, exclude_unset=True, exclude_defaults=True) for hook in webhooks
]
else:
serialized_webhooks = None

api_result = await client.actor(actor_id).start(
run_input=run_input,
content_type=content_type,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=int(timeout.total_seconds()) if timeout is not None else None,
wait_for_finish=wait_for_finish,
webhooks=webhooks,
webhooks=serialized_webhooks,
)

return ActorRun.model_validate(api_result)

async def abort(
self,
run_id: str,
*,
token: str | None = None,
status_message: str | None = None,
gracefully: bool | None = None,
) -> dict:
) -> ActorRun:
"""Abort given Actor run on the Apify platform using the current user account.

The user account is determined by the `APIFY_TOKEN` environment variable.
Expand All @@ -607,7 +615,9 @@ async def abort(
if status_message:
await client.run(run_id).update(status_message=status_message)

return await client.run(run_id).abort(gracefully=gracefully)
api_result = await client.run(run_id).abort(gracefully=gracefully)

return ActorRun.model_validate(api_result)

async def call(
self,
Expand All @@ -619,9 +629,9 @@ async def call(
build: str | None = None,
memory_mbytes: int | None = None,
timeout: timedelta | None = None,
webhooks: list[dict] | None = None,
webhooks: list[Webhook] | None = None,
wait: timedelta | None = None,
) -> dict | None:
) -> ActorRun | None:
"""Start an Actor on the Apify Platform and wait for it to finish before returning.

It waits indefinitely, unless the wait argument is provided.
Expand Down Expand Up @@ -650,16 +660,25 @@ async def call(

client = self.new_client(token=token) if token else self._apify_client

return await client.actor(actor_id).call(
if webhooks:
serialized_webhooks = [
hook.model_dump(by_alias=True, exclude_unset=True, exclude_defaults=True) for hook in webhooks
]
else:
serialized_webhooks = None

api_result = await client.actor(actor_id).call(
run_input=run_input,
content_type=content_type,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=int(timeout.total_seconds()) if timeout is not None else None,
webhooks=webhooks,
webhooks=serialized_webhooks,
wait_secs=int(wait.total_seconds()) if wait is not None else None,
)

return ActorRun.model_validate(api_result)

async def call_task(
self,
task_id: str,
Expand All @@ -668,10 +687,10 @@ async def call_task(
build: str | None = None,
memory_mbytes: int | None = None,
timeout: timedelta | None = None,
webhooks: list[dict] | None = None,
webhooks: list[Webhook] | None = None,
wait: timedelta | None = None,
token: str | None = None,
) -> dict | None:
) -> ActorRun | None:
"""Start an Actor task on the Apify Platform and wait for it to finish before returning.

It waits indefinitely, unless the wait argument is provided.
Expand Down Expand Up @@ -703,15 +722,24 @@ async def call_task(

client = self.new_client(token=token) if token else self._apify_client

return await client.task(task_id).call(
if webhooks:
serialized_webhooks = [
hook.model_dump(by_alias=True, exclude_unset=True, exclude_defaults=True) for hook in webhooks
]
else:
serialized_webhooks = None

api_result = await client.task(task_id).call(
task_input=task_input,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=int(timeout.total_seconds()) if timeout is not None else None,
webhooks=webhooks,
webhooks=serialized_webhooks,
wait_secs=int(wait.total_seconds()) if wait is not None else None,
)

return ActorRun.model_validate(api_result)

async def metamorph(
self,
target_actor_id: str,
Expand Down Expand Up @@ -796,14 +824,12 @@ async def reboot(

async def add_webhook(
self,
webhook: Webhook,
*,
event_types: list[WebhookEventType],
request_url: str,
payload_template: str | None = None,
ignore_ssl_errors: bool | None = None,
do_not_retry: bool | None = None,
idempotency_key: str | None = None,
) -> dict | None:
) -> None:
"""Create an ad-hoc webhook for the current Actor run.

This webhook lets you receive a notification when the Actor run finished or failed.
Expand All @@ -814,9 +840,7 @@ async def add_webhook(
For more information about Apify Actor webhooks, please see the [documentation](https://docs.apify.com/webhooks).

Args:
event_types: List of event types that should trigger the webhook. At least one is required.
request_url: URL that will be invoked once the webhook is triggered.
payload_template: Specification of the payload that will be sent to request_url
webhook: The webhook to be added
ignore_ssl_errors: Whether the webhook should ignore SSL errors returned by request_url
do_not_retry: Whether the webhook should retry sending the payload to request_url upon failure.
idempotency_key: A unique identifier of a webhook. You can use it to ensure that you won't create
Expand All @@ -829,17 +853,17 @@ async def add_webhook(

if not self.is_at_home():
self.log.error('Actor.add_webhook() is only supported when running on the Apify platform.')
return None
return

# If is_at_home() is True, config.actor_run_id is always set
if not self._configuration.actor_run_id:
raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')

return await self._apify_client.webhooks().create(
await self._apify_client.webhooks().create(
actor_run_id=self._configuration.actor_run_id,
event_types=event_types,
request_url=request_url,
payload_template=payload_template,
event_types=webhook.event_types,
request_url=webhook.request_url,
payload_template=webhook.payload_template,
ignore_ssl_errors=ignore_ssl_errors,
do_not_retry=do_not_retry,
idempotency_key=idempotency_key,
Expand All @@ -850,7 +874,7 @@ async def set_status_message(
status_message: str,
*,
is_terminal: bool | None = None,
) -> dict | None:
) -> ActorRun | None:
"""Set the status message for the current Actor run.

Args:
Expand All @@ -871,10 +895,12 @@ async def set_status_message(
if not self._configuration.actor_run_id:
raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')

return await self._apify_client.run(self._configuration.actor_run_id).update(
api_result = await self._apify_client.run(self._configuration.actor_run_id).update(
status_message=status_message, is_status_message_terminal=is_terminal
)

return ActorRun.model_validate(api_result)

async def create_proxy_configuration(
self,
*,
Expand Down
110 changes: 110 additions & 0 deletions src/apify/_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# ruff: noqa: TCH001 TCH002 TCH003 (Pydantic)
from __future__ import annotations

from datetime import datetime, timedelta
from typing import Annotated

from pydantic import BaseModel, BeforeValidator, ConfigDict, Field

from apify_shared.consts import ActorJobStatus, MetaOrigin, WebhookEventType
from crawlee._utils.models import timedelta_ms
from crawlee._utils.urls import validate_http_url


class Webhook(BaseModel):
__model_config__ = ConfigDict(populate_by_name=True)

event_types: Annotated[
list[WebhookEventType],
Field(description='Event types that should trigger the webhook'),
]
request_url: Annotated[
str,
Field(description='URL that the webhook should call'),
BeforeValidator(validate_http_url),
]
payload_template: Annotated[
str | None,
Field(description='Template for the payload sent by the webook'),
] = None


class ActorRunMeta(BaseModel):
__model_config__ = ConfigDict(populate_by_name=True)

origin: Annotated[MetaOrigin, Field()]


class ActorRunStats(BaseModel):
__model_config__ = ConfigDict(populate_by_name=True)

input_body_len: Annotated[int, Field(alias='inputBodyLen')]
restart_count: Annotated[int, Field(alias='restartCount')]
resurrect_count: Annotated[int, Field(alias='resurrectCount')]
mem_avg_bytes: Annotated[float | None, Field(alias='memAvgBytes')] = None
mem_max_bytes: Annotated[int | None, Field(alias='memMaxBytes')] = None
mem_current_bytes: Annotated[int | None, Field(alias='memCurrentBytes')] = None
cpu_avg_usage: Annotated[float | None, Field(alias='cpuAvgUsage')] = None
cpu_max_usage: Annotated[float | None, Field(alias='cpuMaxUsage')] = None
cpu_current_usage: Annotated[float | None, Field(alias='cpuCurrentUsage')] = None
net_rx_bytes: Annotated[int | None, Field(alias='netRxBytes')] = None
net_tx_bytes: Annotated[int | None, Field(alias='netTxBytes')] = None
duration: Annotated[timedelta_ms | None, Field(alias='durationMillis')] = None
run_time: Annotated[timedelta | None, Field(alias='runTimeSecs')] = None
metamorph: Annotated[int | None, Field(alias='metamorph')] = None
compute_units: Annotated[float, Field(alias='computeUnits')]


class ActorRunOptions(BaseModel):
__model_config__ = ConfigDict(populate_by_name=True)

build: str
timeout: Annotated[timedelta, Field(alias='timeoutSecs')]
memory_mbytes: Annotated[int, Field(alias='memoryMbytes')]
disk_mbytes: Annotated[int, Field(alias='diskMbytes')]


class ActorRunUsage(BaseModel):
__model_config__ = ConfigDict(populate_by_name=True)

actor_compute_units: Annotated[float | None, Field(alias='ACTOR_COMPUTE_UNITS')] = None
dataset_reads: Annotated[float | None, Field(alias='DATASET_READS')] = None
dataset_writes: Annotated[float | None, Field(alias='DATASET_WRITES')] = None
key_value_store_reads: Annotated[float | None, Field(alias='KEY_VALUE_STORE_READS')] = None
key_value_store_writes: Annotated[float | None, Field(alias='KEY_VALUE_STORE_WRITES')] = None
key_value_store_lists: Annotated[float | None, Field(alias='KEY_VALUE_STORE_LISTS')] = None
request_queue_reads: Annotated[float | None, Field(alias='REQUEST_QUEUE_READS')] = None
request_queue_writes: Annotated[float | None, Field(alias='REQUEST_QUEUE_WRITES')] = None
data_transfer_internal_gbytes: Annotated[float | None, Field(alias='DATA_TRANSFER_INTERNAL_GBYTES')] = None
data_transfer_external_gbytes: Annotated[float | None, Field(alias='DATA_TRANSFER_EXTERNAL_GBYTES')] = None
proxy_residential_transfer_gbytes: Annotated[float | None, Field(alias='PROXY_RESIDENTIAL_TRANSFER_GBYTES')] = None
proxy_serps: Annotated[float | None, Field(alias='PROXY_SERPS')] = None


class ActorRun(BaseModel):
__model_config__ = ConfigDict(populate_by_name=True)

id: Annotated[str, Field(alias='id')]
act_id: Annotated[str, Field(alias='actId')]
user_id: Annotated[str, Field(alias='userId')]
actor_task_id: Annotated[str | None, Field(alias='actorTaskId')] = None
started_at: Annotated[datetime, Field(alias='startedAt')]
finished_at: Annotated[datetime | None, Field(alias='finishedAt')] = None
status: Annotated[ActorJobStatus, Field(alias='status')]
status_message: Annotated[str | None, Field(alias='statusMessage')] = None
is_status_message_terminal: Annotated[bool | None, Field(alias='isStatusMessageTerminal')] = None
meta: Annotated[ActorRunMeta, Field(alias='meta')]
stats: Annotated[ActorRunStats, Field(alias='stats')]
options: Annotated[ActorRunOptions, Field(alias='options')]
build_id: Annotated[str, Field(alias='buildId')]
exit_code: Annotated[int | None, Field(alias='exitCode')] = None
default_key_value_store_id: Annotated[str, Field(alias='defaultKeyValueStoreId')]
default_dataset_id: Annotated[str, Field(alias='defaultDatasetId')]
default_request_queue_id: Annotated[str, Field(alias='defaultRequestQueueId')]
build_number: Annotated[str | None, Field(alias='buildNumber')] = None
container_url: Annotated[str, Field(alias='containerUrl')]
is_container_server_ready: Annotated[bool | None, Field(alias='isContainerServerReady')] = None
git_branch_name: Annotated[str | None, Field(alias='gitBranchName')] = None
usage: Annotated[ActorRunUsage | None, Field(alias='usage')] = None
usage_total_usd: Annotated[float | None, Field(alias='usageTotalUsd')] = None
usage_usd: Annotated[ActorRunUsage | None, Field(alias='usageUsd')] = None
Loading