Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
558 changes: 261 additions & 297 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ keywords = [

[tool.poetry.dependencies]
python = "^3.9"
apify-client = ">=1.9.1"
apify-client = ">=1.9.2"
apify-shared = ">=1.2.1"
crawlee = "~0.5.1"
cryptography = ">=42.0.0"
Expand Down
53 changes: 50 additions & 3 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
EventSystemInfoData,
)

from apify._charging import ChargeResult, ChargingManager
from apify._configuration import Configuration
from apify._consts import EVENT_LISTENERS_TIMEOUT
from apify._crypto import decrypt_input_secrets, load_private_key
Expand Down Expand Up @@ -97,6 +98,8 @@ def __init__(
)
)

self._charging_manager = ChargingManager(self._configuration, self._apify_client)

self._is_initialized = False

@ignore_docs
Expand Down Expand Up @@ -227,6 +230,10 @@ async def init(self) -> None:
# https://github.com/apify/apify-sdk-python/issues/146

await self._event_manager.__aenter__()
self.log.debug('Event manager initialized')

await self._charging_manager.init()
self.log.debug('Charging manager initialized')

self._is_initialized = True
_ActorType._is_any_instance_initialized = True
Expand Down Expand Up @@ -452,19 +459,46 @@ async def open_request_queue(
storage_client=storage_client,
)

async def push_data(self, data: dict | list[dict]) -> None:
@overload
async def push_data(self, data: dict | list[dict]) -> None: ...
@overload
async def push_data(self, data: dict | list[dict], charged_event_name: str) -> ChargeResult: ...
async def push_data(self, data: dict | list[dict], charged_event_name: str | None = None) -> ChargeResult | None:
"""Store an object or a list of objects to the default dataset of the current Actor run.

Args:
data: The data to push to the default dataset.
charged_event_name: If provided and if the Actor uses the pay-per-event pricing model,
the method will attempt to charge for the event for each pushed item.
"""
self._raise_if_not_initialized()

if not data:
return
return None

data = data if isinstance(data, list) else [data]

max_charged_count = (
self._charging_manager.calculate_max_event_charge_count_within_limit(charged_event_name)
if charged_event_name is not None
else None
)

dataset = await self.open_dataset()
await dataset.push_data(data)

if max_charged_count is not None and len(data) > max_charged_count:
# Push as many items as we can charge for
await dataset.push_data(data[:max_charged_count])
else:
await dataset.push_data(data)

if charged_event_name:
return await self._charging_manager.charge(
event_name=charged_event_name,
count=min(max_charged_count, len(data)) if max_charged_count is not None else len(data),
)

return None

async def get_input(self) -> Any:
"""Get the Actor input value from the default key-value store associated with the current Actor run."""
Expand Down Expand Up @@ -513,6 +547,19 @@ async def set_value(
key_value_store = await self.open_key_value_store()
return await key_value_store.set_value(key, value, content_type=content_type)

def get_charging_manager(self) -> ChargingManager:
"""Retrieve the charging manager to access granular pricing information."""
self._raise_if_not_initialized()
return self._charging_manager

async def charge(self, event_name: str, count: int = 1) -> ChargeResult:
"""Charge for a specified number of events - sub-operations of the Actor.

This is relevant only for the pay-per-event pricing model.
"""
self._raise_if_not_initialized()
return await self._charging_manager.charge(event_name, count)

@overload
def on(
self, event_name: Literal[Event.PERSIST_STATE], listener: EventListener[EventPersistStateData]
Expand Down
286 changes: 286 additions & 0 deletions src/apify/_charging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
from __future__ import annotations

import math
from dataclasses import dataclass
from datetime import datetime, timezone
from decimal import Decimal
from typing import TYPE_CHECKING, Union

from pydantic import TypeAdapter

from apify._models import ActorRun, PricingModel
from apify._utils import docs_group
from apify.log import logger
from apify.storages import Dataset

if TYPE_CHECKING:
from apify_client import ApifyClientAsync

from apify._configuration import Configuration


run_validator: TypeAdapter[ActorRun | None] = TypeAdapter(Union[ActorRun, None])


@docs_group('Classes')
class ChargingManager:
LOCAL_CHARGING_LOG_DATASET_NAME = 'charging_log'

def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> None:
self._max_total_charge_usd = configuration.max_total_charge_usd or Decimal('inf')
self._is_at_home = configuration.is_at_home
self._actor_run_id = configuration.actor_run_id
self._purge_charging_log_dataset = configuration.purge_on_start
self._pricing_model: PricingModel | None = None

if configuration.test_pay_per_event:
if self._is_at_home:
raise ValueError(
'Using the ACTOR_TEST_PAY_PER_EVENT environment variable is only supported '
'in a local development environment'
)

self._pricing_model = 'PAY_PER_EVENT'

self._client = client
self._charging_log_dataset: Dataset | None = None

self._charging_state: dict[str, ChargingStateItem] | None = None
self._pricing_info: dict[str, PricingInfoItem] = {}

self._not_ppe_warning_printed = False

async def init(self) -> None:
"""Initialize the charging manager - this is called by the `Actor` class and shouldn't be invoked manually."""
self._charging_state = {}

if self._is_at_home:
# Running on the Apify platform - fetch pricing info for the current run.

if self._actor_run_id is None:
raise RuntimeError('Actor run ID not found even though the Actor is running on Apify')

run = run_validator.validate_python(await self._client.run(self._actor_run_id).get())
if run is None:
raise RuntimeError('Actor run not found')

if run.pricing_info is not None:
self._pricing_model = run.pricing_info.pricing_model

if run.pricing_info.pricing_model == 'PAY_PER_EVENT':
for event_name, event_pricing in run.pricing_info.pricing_per_event.actor_charge_events.items():
self._pricing_info[event_name] = PricingInfoItem(
price=event_pricing.event_price_usd,
title=event_pricing.event_title,
)

self._max_total_charge_usd = run.options.max_total_charge_usd or self._max_total_charge_usd

for event_name, count in (run.charged_event_counts or {}).items():
price = self._pricing_info.get(event_name, PricingInfoItem(Decimal(), title='')).price
self._charging_state[event_name] = ChargingStateItem(
charge_count=count,
total_charged_amount=count * price,
)

if not self._is_at_home and self._pricing_model == 'PAY_PER_EVENT':
# We are not running on the Apify platform, but PPE is enabled for testing - open a dataset that
# will contain a log of all charge calls for debugging purposes.

if self._purge_charging_log_dataset:
dataset = await Dataset.open(name=self.LOCAL_CHARGING_LOG_DATASET_NAME)
await dataset.drop()

self._charging_log_dataset = await Dataset.open(name=self.LOCAL_CHARGING_LOG_DATASET_NAME)

async def charge(self, event_name: str, count: int = 1) -> ChargeResult:
"""Charge for a specified number of events - sub-operations of the Actor.
This is relevant only for the pay-per-event pricing model.
"""
if self._charging_state is None:
raise RuntimeError('Charging manager is not initialized')

def calculate_chargeable() -> dict[str, int | None]:
"""Calculate the maximum number of events of each type that can be charged within the current budget."""
return {
event_name: self.calculate_max_event_charge_count_within_limit(event_name)
for event_name in self._pricing_info
}

# For runs that do not use the pay-per-event pricing model, just print a warning and return
if self._pricing_model != 'PAY_PER_EVENT':
if not self._not_ppe_warning_printed:
logger.warning(
'Ignored attempt to charge for an event - the Actor does not use the pay-per-event pricing'
)
self._not_ppe_warning_printed = True

return ChargeResult(
event_charge_limit_reached=False,
charged_count=0,
chargeable_within_limit=calculate_chargeable(),
)

# START OF CRITICAL SECTION - no awaits here

# Determine the maximum amount of events that can be charged within the budget
charged_count = min(count, self.calculate_max_event_charge_count_within_limit(event_name) or count)

if charged_count == 0:
return ChargeResult(
event_charge_limit_reached=True,
charged_count=0,
chargeable_within_limit=calculate_chargeable(),
)

pricing_info = self._pricing_info.get(
event_name,
PricingInfoItem(
price=Decimal()
if self._is_at_home
else Decimal(
'1'
), # Use a nonzero price for local development so that the maximum budget can be reached,
title=f"Unknown event '{event_name}'",
),
)

# Update the charging state
self._charging_state.setdefault(event_name, ChargingStateItem(0, Decimal()))
self._charging_state[event_name].charge_count += charged_count
self._charging_state[event_name].total_charged_amount += charged_count * pricing_info.price

# END OF CRITICAL SECTION

# If running on the platform, call the charge endpoint
if self._is_at_home:
if self._actor_run_id is None:
raise RuntimeError('Actor run ID not configured')

if event_name in self._pricing_info:
await self._client.run(self._actor_run_id).charge(event_name, charged_count)
else:
logger.warning(f"Attempting to charge for an unknown event '{event_name}'")

# Log the charged operation (if enabled)
if self._charging_log_dataset:
await self._charging_log_dataset.push_data(
{
'event_name': event_name,
'event_title': pricing_info.title,
'event_price_usd': round(pricing_info.price, 3),
'charged_count': charged_count,
'timestamp': datetime.now(timezone.utc).isoformat(),
}
)

# If it is not possible to charge the full amount, log that fact
if charged_count < count:
subject = 'instance' if count == 1 else 'instances'
logger.info(
f"Charging {count} ${subject} of '{event_name}' event would exceed max_total_charge_usd "
'- only {charged_count} events were charged'
)

max_charge_count = self.calculate_max_event_charge_count_within_limit(event_name)

return ChargeResult(
event_charge_limit_reached=max_charge_count is not None and max_charge_count <= 0,
charged_count=charged_count,
chargeable_within_limit=calculate_chargeable(),
)

def calculate_total_charged_amount(self) -> Decimal:
"""Calculate the total amount of money charged for pay-per-event events so far."""
if self._charging_state is None:
raise RuntimeError('Charging manager is not initialized')

return sum(
(item.total_charged_amount for item in self._charging_state.values()),
start=Decimal(),
)

def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int | None:
"""Calculate how many instances of an event can be charged before we reach the configured limit."""
if self._charging_state is None:
raise RuntimeError('Charging manager is not initialized')

pricing_info = self._pricing_info.get(event_name)

if pricing_info is not None:
price = pricing_info.price
elif not self._is_at_home:
price = Decimal('1') # Use a nonzero price for local development so that the maximum budget can be reached
else:
price = Decimal()

if not price:
return None

result = (self._max_total_charge_usd - self.calculate_total_charged_amount()) / price
return math.floor(result) if result.is_finite() else None

def get_pricing_info(self) -> ActorPricingInfo:
"""Retrieve detailed information about the effective pricing of the current Actor run.
This can be used for instance when your code needs to support multiple pricing models in transition periods.
"""
if self._charging_state is None:
raise RuntimeError('Charging manager is not initialized')

return ActorPricingInfo(
pricing_model=self._pricing_model,
is_pay_per_event=self._pricing_model == 'PAY_PER_EVENT',
max_total_charge_usd=self._max_total_charge_usd
if self._max_total_charge_usd is not None
else Decimal('inf'),
per_event_prices={
event_name: pricing_info.price for event_name, pricing_info in self._pricing_info.items()
},
)


@docs_group('Data structures')
@dataclass(frozen=True)
class ChargeResult:
"""Result of the `ChargingManager.charge` method."""

event_charge_limit_reached: bool
"""If true, no more events of this type can be charged within the limit."""

charged_count: int
"""Total amount of charged events - may be lower than the requested amount."""

chargeable_within_limit: dict[str, int | None]
"""How many events of each known type can still be charged within the limit."""


@docs_group('Data structures')
@dataclass
class ActorPricingInfo:
"""Result of the `ChargingManager.get_pricing_info` method."""

pricing_model: PricingModel | None
"""The currently effective pricing model."""

max_total_charge_usd: Decimal
"""A configured limit for the total charged amount - if you exceed it, you won't receive more money than this."""

is_pay_per_event: bool
"""A shortcut - true if the Actor runs with the pay-per-event pricing model."""

per_event_prices: dict[str, Decimal]
"""Price of every known event type."""


@dataclass
class ChargingStateItem:
charge_count: int
total_charged_amount: Decimal


@dataclass
class PricingInfoItem:
price: Decimal
title: str
Loading