Skip to content

Commit 3dfb40e

Browse files
committed
Implement ChargingManager
1 parent 588db5c commit 3dfb40e

File tree

4 files changed

+278
-10
lines changed

4 files changed

+278
-10
lines changed

src/apify/_actor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ async def push_data(self, data: dict | list[dict], event_name: str | None = None
471471
data = data if isinstance(data, list) else [data]
472472

473473
max_charged_count = (
474-
self._charging_manager.calculate_max_event_charge_within_limit(event_name)
474+
self._charging_manager.calculate_max_event_charge_count_within_limit(event_name)
475475
if event_name is not None
476476
else None
477477
)

src/apify/_charging.py

Lines changed: 217 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,240 @@
11
from __future__ import annotations
22

3+
import math
34
from dataclasses import dataclass
4-
from typing import TYPE_CHECKING
5+
from datetime import datetime, timezone
6+
from decimal import Decimal
7+
from typing import TYPE_CHECKING, Union
8+
9+
from pydantic import TypeAdapter
10+
11+
from apify._models import ActorRun, PricingModel
12+
from apify.log import logger
13+
from apify.storages import Dataset
514

615
if TYPE_CHECKING:
716
from apify_client import ApifyClientAsync
817

918
from apify._configuration import Configuration
1019

1120

21+
run_validator: TypeAdapter[ActorRun | None] = TypeAdapter(Union[ActorRun, None])
22+
23+
1224
class ChargingManager:
25+
LOCAL_CHARGING_LOG_DATASET_NAME = 'charging_log'
26+
1327
def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> None:
14-
pass
28+
self._max_total_charge_usd = configuration.max_total_charge_usd or Decimal('inf')
29+
self._is_at_home = configuration.is_at_home
30+
self._actor_run_id = configuration.actor_run_id
31+
self._purge_charging_log_dataset = configuration.purge_on_start
32+
self._pricing_model: PricingModel | None = None
33+
34+
if configuration.test_pay_per_event:
35+
if self._is_at_home:
36+
raise ValueError(
37+
'Using the ACTOR_TEST_PAY_PER_EVENT environment variable is only supported '
38+
'in a local development environment'
39+
)
40+
41+
self._pricing_model = 'PAY_PER_EVENT'
42+
43+
self._client = client
44+
self._charging_log_dataset: Dataset | None = None
45+
46+
self._charging_state: dict[str, ChargingStateItem] | None = None
47+
self._pricing_info: dict[str, PricingInfoItem] = {}
48+
49+
self._not_ppe_warning_printed = False
1550

1651
async def init(self) -> None:
17-
pass
52+
self._charging_state = {}
53+
54+
if self._is_at_home:
55+
if self._actor_run_id is None:
56+
raise RuntimeError('Actor run ID not found even though the Actor is running on Apify')
57+
58+
run = run_validator.validate_python(await self._client.run(self._actor_run_id).get())
59+
if run is None:
60+
raise RuntimeError('Actor run not found')
61+
62+
if run.pricing_info is not None:
63+
self._pricing_model = run.pricing_info.pricing_model
64+
65+
if run.pricing_info.pricing_model == 'PAY_PER_EVENT':
66+
for event_name, event_pricing in run.pricing_info.pricing_per_event.actor_charge_events.items():
67+
self._pricing_info[event_name] = PricingInfoItem(
68+
price=event_pricing.event_price_usd,
69+
title=event_pricing.event_title,
70+
)
71+
72+
self._max_total_charge_usd = run.options.max_total_charge_usd or self._max_total_charge_usd
73+
74+
for event_name, count in (run.charged_event_counts or {}).items():
75+
price = self._pricing_info.get(event_name, PricingInfoItem(Decimal(), title='')).price
76+
self._charging_state[event_name] = ChargingStateItem(
77+
charge_count=count,
78+
total_charged_amount=count * price,
79+
)
80+
81+
if not self._is_at_home and self._pricing_model == 'PAY_PER_EVENT':
82+
if self._purge_charging_log_dataset:
83+
dataset = await Dataset.open(name=self.LOCAL_CHARGING_LOG_DATASET_NAME)
84+
await dataset.drop()
85+
86+
self._charging_log_dataset = await Dataset.open(name=self.LOCAL_CHARGING_LOG_DATASET_NAME)
1887

1988
async def charge(self, event_name: str, count: int = 1) -> ChargeResult:
20-
pass
89+
if self._charging_state is None:
90+
raise RuntimeError('Charging manager is not initialized')
91+
92+
def calculate_chargeable() -> dict[str, int | None]:
93+
return {
94+
event_name: self.calculate_max_event_charge_count_within_limit(event_name)
95+
for event_name in self._pricing_info
96+
}
97+
98+
if self._pricing_model != 'PAY_PER_EVENT':
99+
if not self._not_ppe_warning_printed:
100+
logger.warning(
101+
'Ignored attempt to charge for an event - the Actor does not use the pay-per-event pricing'
102+
)
103+
self._not_ppe_warning_printed = True
104+
105+
return ChargeResult(
106+
event_charge_limit_reached=False,
107+
charged_count=0,
108+
chargeable_within_limit=calculate_chargeable(),
109+
)
110+
111+
# START OF CRITICAL SECTION - no awaits here
112+
charged_count = min(count, self.calculate_max_event_charge_count_within_limit(event_name) or count)
113+
114+
if charged_count == 0:
115+
return ChargeResult(
116+
event_charge_limit_reached=True,
117+
charged_count=0,
118+
chargeable_within_limit=calculate_chargeable(),
119+
)
21120

22-
def calculate_max_event_charge_within_limit(self, event_name: str) -> int:
23-
pass
121+
pricing_info = self._pricing_info.get(
122+
event_name,
123+
PricingInfoItem(
124+
price=Decimal()
125+
if self._is_at_home
126+
else Decimal(
127+
'1'
128+
), # Use a nonzero price for local development so that the maximum budget can be reached,
129+
title=f"Unknown event '{event_name}'",
130+
),
131+
)
132+
133+
self._charging_state.setdefault(event_name, ChargingStateItem(0, Decimal()))
134+
self._charging_state[event_name].charge_count += charged_count
135+
self._charging_state[event_name].total_charged_amount += charged_count * pricing_info.price
136+
137+
# END OF CRITICAL SECTION
138+
if self._is_at_home:
139+
if self._actor_run_id is None:
140+
raise RuntimeError('Actor run ID not configured')
141+
142+
if event_name is self._pricing_info:
143+
await self._client.run(self._actor_run_id).charge(event_name, charged_count)
144+
else:
145+
logger.warning(f"Attempting to charge for an unknown event '{event_name}'")
146+
147+
if self._charging_log_dataset:
148+
await self._charging_log_dataset.push_data(
149+
{
150+
'event_name': event_name,
151+
'event_title': pricing_info.title,
152+
'event_price_usd': round(pricing_info.price, 3),
153+
'charged_count': charged_count,
154+
'timestamp': datetime.now(timezone.utc).isoformat(),
155+
}
156+
)
157+
158+
if charged_count < count:
159+
subject = 'instance' if count == 1 else 'instances'
160+
logger.info(
161+
f"Charging {count} ${subject} of '{event_name}' event would exceed max_total_charge_usd "
162+
'- only {charged_count} events were charged'
163+
)
164+
165+
max_charge_count = self.calculate_max_event_charge_count_within_limit(event_name)
166+
167+
return ChargeResult(
168+
event_charge_limit_reached=max_charge_count is not None and max_charge_count <= 0,
169+
charged_count=charged_count,
170+
chargeable_within_limit=calculate_chargeable(),
171+
)
172+
173+
def calculate_total_charged_amount(self) -> Decimal:
174+
if self._charging_state is None:
175+
raise RuntimeError('Charging manager is not initialized')
176+
177+
return sum(
178+
(item.total_charged_amount for item in self._charging_state.values()),
179+
start=Decimal(),
180+
)
181+
182+
def calculate_max_event_charge_count_within_limit(self, event_name: str) -> int | None:
183+
if self._charging_state is None:
184+
raise RuntimeError('Charging manager is not initialized')
185+
186+
pricing_info = self._pricing_info.get(event_name)
187+
188+
if pricing_info is not None:
189+
price = pricing_info.price
190+
elif not self._is_at_home:
191+
price = Decimal('1') # Use a nonzero price for local development so that the maximum budget can be reached
192+
else:
193+
price = Decimal()
194+
195+
if not price:
196+
return None
197+
198+
return math.floor((self._max_total_charge_usd - self.calculate_total_charged_amount()) / price)
199+
200+
def get_pricing_info(self) -> ActorPricingInfo:
201+
if self._charging_state is None:
202+
raise RuntimeError('Charging manager is not initialized')
203+
204+
return ActorPricingInfo(
205+
pricing_model=self._pricing_model,
206+
is_pay_per_event=self._pricing_model == 'PAY_PER_EVENT',
207+
max_total_charge_usd=self._max_total_charge_usd
208+
if self._max_total_charge_usd is not None
209+
else Decimal('inf'),
210+
per_event_prices={
211+
event_name: pricing_info.price for event_name, pricing_info in self._pricing_info.items()
212+
},
213+
)
24214

25215

26216
@dataclass(frozen=True)
27217
class ChargeResult:
28218
event_charge_limit_reached: bool
29219
charged_count: int
30-
chargeable_within_limit: int
220+
chargeable_within_limit: dict[str, int | None]
221+
222+
223+
@dataclass
224+
class ChargingStateItem:
225+
charge_count: int
226+
total_charged_amount: Decimal
227+
228+
229+
@dataclass
230+
class PricingInfoItem:
231+
price: Decimal
232+
title: str
233+
234+
235+
@dataclass
236+
class ActorPricingInfo:
237+
pricing_model: PricingModel | None
238+
max_total_charge_usd: Decimal
239+
is_pay_per_event: bool
240+
per_event_prices: dict[str, Decimal]

src/apify/_configuration.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
from datetime import datetime, timedelta
4+
from decimal import Decimal
45
from logging import getLogger
56
from typing import Annotated, Any
67

@@ -212,7 +213,7 @@ class Configuration(CrawleeConfiguration):
212213
] = None
213214

214215
max_total_charge_usd: Annotated[
215-
float | None,
216+
Decimal | None,
216217
Field(
217218
alias='actor_max_total_charge_usd',
218219
description='For pay-per-event Actors, the user-set limit on total charges. Do not exceed this limit',

src/apify/_models.py

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from __future__ import annotations
22

33
from datetime import datetime, timedelta
4-
from typing import Annotated
4+
from decimal import Decimal
5+
from typing import TYPE_CHECKING, Annotated, Literal
56

67
from pydantic import BaseModel, BeforeValidator, ConfigDict, Field
78

@@ -11,6 +12,9 @@
1112

1213
from apify._utils import docs_group
1314

15+
if TYPE_CHECKING:
16+
from typing_extensions import TypeAlias
17+
1418

1519
@docs_group('Data structures')
1620
class Webhook(BaseModel):
@@ -67,6 +71,7 @@ class ActorRunOptions(BaseModel):
6771
timeout: Annotated[timedelta, Field(alias='timeoutSecs')]
6872
memory_mbytes: Annotated[int, Field(alias='memoryMbytes')]
6973
disk_mbytes: Annotated[int, Field(alias='diskMbytes')]
74+
max_total_charge_usd: Annotated[Decimal | None, Field(alias='maxTotalChargeUsd')] = None
7075

7176

7277
@docs_group('Data structures')
@@ -115,3 +120,55 @@ class ActorRun(BaseModel):
115120
usage: Annotated[ActorRunUsage | None, Field(alias='usage')] = None
116121
usage_total_usd: Annotated[float | None, Field(alias='usageTotalUsd')] = None
117122
usage_usd: Annotated[ActorRunUsage | None, Field(alias='usageUsd')] = None
123+
pricing_info: Annotated[
124+
FreeActorPricingInfo
125+
| FlatPricePerMonthActorPricingInfo
126+
| PricePerDatasetItemActorPricingInfo
127+
| PayPerEventActorPricingInfo
128+
| None,
129+
Field(alias='pricingInfo', discriminator='pricing_model'),
130+
] = None
131+
charged_event_counts: Annotated[
132+
dict[str, int] | None,
133+
Field(alias='chargedEventCounts'),
134+
] = None
135+
136+
137+
class FreeActorPricingInfo(BaseModel):
138+
pricing_model: Annotated[Literal['FREE'], Field(alias='pricingModel')]
139+
140+
141+
class FlatPricePerMonthActorPricingInfo(BaseModel):
142+
pricing_model: Annotated[Literal['FLAT_PRICE_PER_MONTH'], Field(alias='pricingModel')]
143+
trial_minutes: Annotated[int | None, Field(alias='trialMinutes')]
144+
price_per_unit_usd: Annotated[Decimal, Field(alias='pricePerUnitUsd')]
145+
146+
147+
class PricePerDatasetItemActorPricingInfo(BaseModel):
148+
pricing_model: Annotated[Literal['PRICE_PER_DATASET_ITEM'], Field(alias='pricingModel')]
149+
unit_name: Annotated[str | None, Field(alias='unitName')]
150+
price_per_unit_usd: Annotated[Decimal, Field(alias='pricePerUnitUsd')]
151+
152+
153+
class ActorChargeEvent(BaseModel):
154+
event_price_usd: Annotated[Decimal, Field(alias='eventPriceUsd')]
155+
event_title: Annotated[str, Field(alias='eventTitle')]
156+
event_description: Annotated[str | None, Field(alias='eventDescription')] = None
157+
158+
159+
class PricingPerEvent(BaseModel):
160+
actor_charge_events: Annotated[dict[str, ActorChargeEvent], Field(alias='actorChargeEvents')]
161+
162+
163+
class PayPerEventActorPricingInfo(BaseModel):
164+
pricing_model: Annotated[Literal['PAY_PER_EVENT'], Field(alias='pricingModel')]
165+
pricing_per_event: Annotated[PricingPerEvent, Field(alias='pricingPerEvent')]
166+
minimal_max_total_charge_usd: Annotated[Decimal | None, Field(alias='minimalMaxTotalChargeUsd')]
167+
168+
169+
PricingModel: TypeAlias = Literal[
170+
'FREE',
171+
'FLAT_PRICE_PER_MONTH',
172+
'PRICE_PER_DATASET_ITEM',
173+
'PAY_PER_EVENT',
174+
]

0 commit comments

Comments
 (0)