|
4 | 4 |
|
5 | 5 | import asyncio |
6 | 6 | import contextvars |
| 7 | +import dataclasses |
7 | 8 | import inspect |
8 | 9 | import logging |
9 | 10 | import threading |
@@ -415,6 +416,61 @@ def decorator( |
415 | 416 | return decorator(fn.__name__, description, fn, bypass_async_check=True) |
416 | 417 |
|
417 | 418 |
|
| 419 | +@dataclass(frozen=True) |
| 420 | +class DynamicWorkflowConfig: |
| 421 | + """Returned by functions using the :py:func:`dynamic_config` decorator, see it for more.""" |
| 422 | + |
| 423 | + failure_exception_types: Optional[Sequence[Type[BaseException]]] = None |
| 424 | + """The types of exceptions that, if a workflow-thrown exception extends, will cause the |
| 425 | + workflow/update to fail instead of suspending the workflow via task failure. These are applied |
| 426 | + in addition to ones set on the worker constructor. If ``Exception`` is set, it effectively will |
| 427 | + fail a workflow/update in all user exception cases. |
| 428 | +
|
| 429 | + Always overrides the equivalent parameter on :py:func:`defn` if set not-None. |
| 430 | +
|
| 431 | + WARNING: This setting is experimental. |
| 432 | + """ |
| 433 | + versioning_behavior: temporalio.common.VersioningBehavior = ( |
| 434 | + temporalio.common.VersioningBehavior.UNSPECIFIED |
| 435 | + ) |
| 436 | + """Specifies the versioning behavior to use for this workflow. |
| 437 | +
|
| 438 | + Always overrides the equivalent parameter on :py:func:`defn`. |
| 439 | +
|
| 440 | + WARNING: This setting is experimental. |
| 441 | + """ |
| 442 | + |
| 443 | + |
| 444 | +def dynamic_config( |
| 445 | + fn: MethodSyncNoParam[SelfType, DynamicWorkflowConfig], |
| 446 | +) -> MethodSyncNoParam[SelfType, DynamicWorkflowConfig]: |
| 447 | + """Decorator to allow configuring a dynamic workflow's behavior. |
| 448 | +
|
| 449 | + Because dynamic workflows may conceptually represent more than one workflow type, it may be |
| 450 | + desirable to have different settings for fields that would normally be passed to |
| 451 | + :py:func:`defn`, but vary based on the workflow type name or other information available in |
| 452 | + the workflow's context. This function will be called after the workflow's :py:func:`init`, |
| 453 | + if it has one, but before the workflow's :py:func:`run` method. |
| 454 | +
|
| 455 | + The method must only take self as a parameter, and any values set in the class it returns will |
| 456 | + override those provided to :py:func:`defn`. |
| 457 | +
|
| 458 | + Cannot be specified on non-dynamic workflows. |
| 459 | +
|
| 460 | + Args: |
| 461 | + fn: The function to decorate. |
| 462 | + """ |
| 463 | + if inspect.iscoroutinefunction(fn): |
| 464 | + raise ValueError("Workflow dynamic_config method must be synchronous") |
| 465 | + params = list(inspect.signature(fn).parameters.values()) |
| 466 | + if len(params) != 1: |
| 467 | + raise ValueError("Workflow dynamic_config method must only take self parameter") |
| 468 | + |
| 469 | + # Add marker attribute |
| 470 | + setattr(fn, "__temporal_workflow_dynamic_config", True) |
| 471 | + return fn |
| 472 | + |
| 473 | + |
418 | 474 | @dataclass(frozen=True) |
419 | 475 | class Info: |
420 | 476 | """Information about the running workflow. |
@@ -1449,6 +1505,7 @@ class _Definition: |
1449 | 1505 | arg_types: Optional[List[Type]] = None |
1450 | 1506 | ret_type: Optional[Type] = None |
1451 | 1507 | versioning_behavior: Optional[temporalio.common.VersioningBehavior] = None |
| 1508 | + dynamic_config_fn: Optional[Callable[..., DynamicWorkflowConfig]] = None |
1452 | 1509 |
|
1453 | 1510 | @staticmethod |
1454 | 1511 | def from_class(cls: Type) -> Optional[_Definition]: |
@@ -1513,6 +1570,7 @@ def _apply_to_class( |
1513 | 1570 | # Collect run fn and all signal/query/update fns |
1514 | 1571 | init_fn: Optional[Callable[..., None]] = None |
1515 | 1572 | run_fn: Optional[Callable[..., Awaitable[Any]]] = None |
| 1573 | + dynamic_config_fn: Optional[Callable[..., DynamicWorkflowConfig]] = None |
1516 | 1574 | seen_run_attr = False |
1517 | 1575 | signals: Dict[Optional[str], _SignalDefinition] = {} |
1518 | 1576 | queries: Dict[Optional[str], _QueryDefinition] = {} |
@@ -1560,6 +1618,17 @@ def _apply_to_class( |
1560 | 1618 | queries[query_defn.name] = query_defn |
1561 | 1619 | elif name == "__init__" and hasattr(member, "__temporal_workflow_init"): |
1562 | 1620 | init_fn = member |
| 1621 | + elif hasattr(member, "__temporal_workflow_dynamic_config"): |
| 1622 | + if workflow_name: |
| 1623 | + issues.append( |
| 1624 | + "@workflow.dynamic_config can only be used in dynamic workflows, but " |
| 1625 | + f"workflow class {workflow_name} ({cls.__name__}) is not dynamic" |
| 1626 | + ) |
| 1627 | + if dynamic_config_fn: |
| 1628 | + issues.append( |
| 1629 | + "@workflow.dynamic_config can only be defined once per workflow" |
| 1630 | + ) |
| 1631 | + dynamic_config_fn = member |
1563 | 1632 | elif isinstance(member, UpdateMethodMultiParam): |
1564 | 1633 | update_defn = member._defn |
1565 | 1634 | if update_defn.name in updates: |
@@ -1643,6 +1712,7 @@ def _apply_to_class( |
1643 | 1712 | sandboxed=sandboxed, |
1644 | 1713 | failure_exception_types=failure_exception_types, |
1645 | 1714 | versioning_behavior=versioning_behavior, |
| 1715 | + dynamic_config_fn=dynamic_config_fn, |
1646 | 1716 | ) |
1647 | 1717 | setattr(cls, "__temporal_workflow_definition", defn) |
1648 | 1718 | setattr(run_fn, "__temporal_workflow_definition", defn) |
|
0 commit comments