|
24 | 24 | EventSystemInfoData,
|
25 | 25 | )
|
26 | 26 |
|
| 27 | +from apify._charging import ChargeResult, ChargingManager |
27 | 28 | from apify._configuration import Configuration
|
28 | 29 | from apify._consts import EVENT_LISTENERS_TIMEOUT
|
29 | 30 | from apify._crypto import decrypt_input_secrets, load_private_key
|
@@ -97,6 +98,8 @@ def __init__(
|
97 | 98 | )
|
98 | 99 | )
|
99 | 100 |
|
| 101 | + self._charging_manager = ChargingManager(self._configuration, self._apify_client) |
| 102 | + |
100 | 103 | self._is_initialized = False
|
101 | 104 |
|
102 | 105 | @ignore_docs
|
@@ -221,6 +224,10 @@ async def init(self) -> None:
|
221 | 224 | # https://github.com/apify/apify-sdk-python/issues/146
|
222 | 225 |
|
223 | 226 | await self._event_manager.__aenter__()
|
| 227 | + self.log.debug('Event manager initialized') |
| 228 | + |
| 229 | + await self._charging_manager.init() |
| 230 | + self.log.debug('Charging manager initialized') |
224 | 231 |
|
225 | 232 | self._is_initialized = True
|
226 | 233 |
|
@@ -445,19 +452,45 @@ async def open_request_queue(
|
445 | 452 | storage_client=storage_client,
|
446 | 453 | )
|
447 | 454 |
|
448 |
| - async def push_data(self, data: dict | list[dict]) -> None: |
| 455 | + @overload |
| 456 | + async def push_data(self, data: dict | list[dict]) -> None: ... |
| 457 | + @overload |
| 458 | + async def push_data(self, data: dict | list[dict], event_name: str) -> ChargeResult: ... |
| 459 | + async def push_data(self, data: dict | list[dict], event_name: str | None = None) -> ChargeResult | None: |
449 | 460 | """Store an object or a list of objects to the default dataset of the current Actor run.
|
450 | 461 |
|
451 | 462 | Args:
|
452 | 463 | data: The data to push to the default dataset.
|
| 464 | + event_name: If provided, the method will attempt to charge for the event for each pushed item. |
453 | 465 | """
|
454 | 466 | self._raise_if_not_initialized()
|
455 | 467 |
|
456 | 468 | if not data:
|
457 |
| - return |
| 469 | + return None |
| 470 | + |
| 471 | + data = data if isinstance(data, list) else [data] |
| 472 | + |
| 473 | + max_charged_count = ( |
| 474 | + self._charging_manager.calculate_max_event_charge_within_limit(event_name) |
| 475 | + if event_name is not None |
| 476 | + else None |
| 477 | + ) |
458 | 478 |
|
459 | 479 | dataset = await self.open_dataset()
|
460 |
| - await dataset.push_data(data) |
| 480 | + |
| 481 | + if max_charged_count is not None and len(data) > max_charged_count: |
| 482 | + # Push as many items as we can charge for |
| 483 | + await dataset.push_data(data[:max_charged_count]) |
| 484 | + else: |
| 485 | + await dataset.push_data(data) |
| 486 | + |
| 487 | + if event_name: |
| 488 | + return await self._charging_manager.charge( |
| 489 | + event_name=event_name, |
| 490 | + count=min(max_charged_count, len(data)) if max_charged_count is not None else len(data), |
| 491 | + ) |
| 492 | + |
| 493 | + return None |
461 | 494 |
|
462 | 495 | async def get_input(self) -> Any:
|
463 | 496 | """Get the Actor input value from the default key-value store associated with the current Actor run."""
|
@@ -506,6 +539,15 @@ async def set_value(
|
506 | 539 | key_value_store = await self.open_key_value_store()
|
507 | 540 | return await key_value_store.set_value(key, value, content_type=content_type)
|
508 | 541 |
|
| 542 | + def get_charging_manager(self) -> ChargingManager: |
| 543 | + """Retrieve the charging manager to access granular pricing information.""" |
| 544 | + self._raise_if_not_initialized() |
| 545 | + return self._charging_manager |
| 546 | + |
| 547 | + async def charge(self, event_name: str, count: int = 1) -> ChargeResult: |
| 548 | + self._raise_if_not_initialized() |
| 549 | + return await self._charging_manager.charge(event_name, count) |
| 550 | + |
509 | 551 | @overload
|
510 | 552 | def on(
|
511 | 553 | self, event_name: Literal[Event.PERSIST_STATE], listener: EventListener[EventPersistStateData]
|
|
0 commit comments