Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions temporalio/worker/workflow_sandbox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@
RestrictedWorkflowAccessError,
SandboxMatcher,
SandboxRestrictions,
UnintentionalPassthroughError,
)
from ._runner import SandboxedWorkflowRunner

__all__ = [
"RestrictedWorkflowAccessError",
"UnintentionalPassthroughError",
"SandboxedWorkflowRunner",
"SandboxMatcher",
"SandboxRestrictions",
Expand Down
35 changes: 35 additions & 0 deletions temporalio/worker/workflow_sandbox/_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
RestrictedWorkflowAccessError,
RestrictionContext,
SandboxRestrictions,
UnintentionalPassthroughError,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -200,6 +201,17 @@ def _import(

# Check module restrictions and passthrough modules
if full_name not in sys.modules:
# Issue a warning if appropriate
if (
self.restriction_context.in_activation
and self._is_import_notification_policy_applied(
temporalio.workflow.SandboxImportNotificationPolicy.WARN_ON_DYNAMIC_IMPORT
)
):
warnings.warn(
f"Module {full_name} was imported after initial workflow load."
)

# Make sure not an entirely invalid module
self._assert_valid_module(full_name)

Expand Down Expand Up @@ -282,13 +294,36 @@ def module_configured_passthrough(self, name: str) -> bool:
break
return True

def _is_import_notification_policy_applied(
self, policy: temporalio.workflow.SandboxImportNotificationPolicy
) -> bool:
override_policy = (
temporalio.workflow.unsafe.current_import_notification_policy_override()
)
if override_policy:
return policy in override_policy

return policy in self.restrictions.import_notification_policy

def _maybe_passthrough_module(self, name: str) -> Optional[types.ModuleType]:
# If imports not passed through and all modules are not passed through
# and name not in passthrough modules, check parents
if (
not temporalio.workflow.unsafe.is_imports_passed_through()
and not self.module_configured_passthrough(name)
):
if self._is_import_notification_policy_applied(
temporalio.workflow.SandboxImportNotificationPolicy.RAISE_ON_UNINTENTIONAL_PASSTHROUGH
):
raise UnintentionalPassthroughError(name)

if self._is_import_notification_policy_applied(
temporalio.workflow.SandboxImportNotificationPolicy.WARN_ON_UNINTENTIONAL_PASSTHROUGH
):
warnings.warn(
f"Module {name} was not intentionally passed through to the sandbox."
)

return None
# Do the pass through
with self._unapplied():
Expand Down
31 changes: 31 additions & 0 deletions temporalio/worker/workflow_sandbox/_restrictions.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
except ImportError:
HAVE_PYDANTIC = False

import temporalio.exceptions
import temporalio.workflow

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -82,6 +83,21 @@ def default_message(qualified_name: str) -> str:
)


class UnintentionalPassthroughError(temporalio.exceptions.TemporalError):
"""Error that occurs when a workflow unintentionally passes an import to the sandbox when
the import notification policy includes :py:attr:`temporalio.workflow.SandboxImportNotificationPolicy.RAISE_ON_NON_PASSTHROUGH`.

Attributes:
qualified_name: Fully qualified name of what was passed through to the sandbox.
"""

def __init__(self, qualified_name: str) -> None:
"""Create an unintentional passthrough error."""
super().__init__(
f"Module {qualified_name} was not intentionally passed through to the sandbox."
)


@dataclass(frozen=True)
class SandboxRestrictions:
"""Set of restrictions that can be applied to a sandbox."""
Expand Down Expand Up @@ -110,6 +126,11 @@ class methods (including __init__, etc). The check compares the against the
fully qualified path to the item.
"""

import_notification_policy: temporalio.workflow.SandboxImportNotificationPolicy
Copy link
Member

@cretz cretz Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should set the default here, it's a backwards incompatible change to add a new required field here (and then not set it in our tests in the global/general area)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Making this change now.

"""
The import notification policy to use when an import is triggered during workflow loading or execution. See :py:class:`temporalio.workflow.SandboxImportNotificationPolicy` for options.
"""

passthrough_all_modules: bool = False
"""
Pass through all modules, do not sandbox any modules. This is the equivalent
Expand Down Expand Up @@ -170,6 +191,12 @@ def with_passthrough_all_modules(self) -> SandboxRestrictions:
"""
return dataclasses.replace(self, passthrough_all_modules=True)

def with_import_notification_policy(
self, policy: temporalio.workflow.SandboxImportNotificationPolicy
) -> SandboxRestrictions:
"""Create a new restriction set with the given import notification policy as the :py:attr:`import_policy`."""
return dataclasses.replace(self, import_notification_policy=policy)


# We intentionally use specific fields instead of generic "matcher" callbacks
# for optimization reasons.
Expand Down Expand Up @@ -305,10 +332,12 @@ def access_matcher(
if not child_matcher:
return None
matcher = child_matcher

if not context.is_runtime and matcher.only_runtime:
return None
if not matcher.match_self:
return None

return matcher

def match_access(
Expand Down Expand Up @@ -798,6 +827,7 @@ def _public_callables(parent: Any, *, exclude: Set[str] = set()) -> Set[str]:
passthrough_modules=SandboxRestrictions.passthrough_modules_default,
invalid_modules=SandboxMatcher.none,
invalid_module_members=SandboxRestrictions.invalid_module_members_default,
import_notification_policy=temporalio.workflow.SandboxImportNotificationPolicy.WARN_ON_DYNAMIC_IMPORT,
)


Expand All @@ -819,6 +849,7 @@ def unwrap_if_proxied(v: Any) -> Any:
def __init__(self) -> None:
"""Create a restriction context."""
self.is_runtime = False
self.in_activation = False


@dataclass
Expand Down
2 changes: 2 additions & 0 deletions temporalio/worker/workflow_sandbox/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def activate(
self, act: temporalio.bridge.proto.workflow_activation.WorkflowActivation
) -> temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion:
self.importer.restriction_context.is_runtime = True
self.importer.restriction_context.in_activation = True
try:
self._run_code(
"with __temporal_importer.applied():\n"
Expand All @@ -169,6 +170,7 @@ def activate(
return self.globals_and_locals.pop("__temporal_completion") # type: ignore
finally:
self.importer.restriction_context.is_runtime = False
self.importer.restriction_context.in_activation = False

def _run_code(self, code: str, **extra_globals: Any) -> None:
for k, v in extra_globals.items():
Expand Down
45 changes: 44 additions & 1 deletion temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from enum import Enum, IntEnum
from enum import Enum, Flag, IntEnum, auto
from functools import partial
from random import Random
from typing import (
Expand Down Expand Up @@ -1410,6 +1410,20 @@ async def wait_condition(
_sandbox_unrestricted = threading.local()
_in_sandbox = threading.local()
_imports_passed_through = threading.local()
_sandbox_import_notification_policy_override = threading.local()


class SandboxImportNotificationPolicy(Flag):
"""Defines the behavior taken when modules are imported into the sandbox after the workflow is initially loaded or unintentionally missing from the passthrough list."""

SILENT = auto()
"""Allow imports that do not violate sandbox restrictions and no warnings are generated."""
WARN_ON_DYNAMIC_IMPORT = auto()
"""Allows dynamic imports that do not violate sandbox restrictions but issues a warning when an import is triggered in the sandbox after initial workflow load."""
WARN_ON_UNINTENTIONAL_PASSTHROUGH = auto()
"""Allows imports that do not violate sandbox restrictions but issues a warning when an import is triggered in the sandbox that was unintentionally passed through."""
RAISE_ON_UNINTENTIONAL_PASSTHROUGH = auto()
"""Raise an error when an import is triggered in the sandbox that was unintentionally passed through."""


class unsafe:
Expand Down Expand Up @@ -1498,6 +1512,35 @@ def imports_passed_through() -> Iterator[None]:
finally:
_imports_passed_through.value = False

@staticmethod
def current_import_notification_policy_override() -> (
Optional[SandboxImportNotificationPolicy]
):
"""Gets the current import notification policy override if one is set."""
applied_policy = getattr(
_sandbox_import_notification_policy_override,
"value",
None,
)
return applied_policy

@staticmethod
@contextmanager
def sandbox_import_notification_policy(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm, remind me, did we decide not to have a kind of with workflow.intentional_import_reload(): type of thing yet? I know you can do it here with the policy altering, but I think I remember we decided to not do the nicer sugared form at this time, just confirming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought here was that this could be combined with the pre-defined policies to achieve a similar effect. It's a bit more wordy but feels okay to me to use something like

with workflow.unsafe.sandbox_import_notification_policy(
    SandboxRestrictions.import_notification_policy_silent
):
    #...

That being said, I think it makes just as much sense to have a method like you're suggesting that applies the correct policy. I put a little time into thinking about this before opening the PR but couldn't settle on any wording that would make the behavior being applied immediately obvious. Very happy to add any options you'd recommend and/or remove this one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can leave off a with workflow.intentional_import_reload() helper for now, mainly because I don't think it'd be common

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Along these same lines, do you still think it's worth removing the various static helpers per your other comment? I think either way makes sense. IMO they're helpful, but as you pointed out are pretty straightforward to reason about even without them.

Copy link
Member

@cretz cretz Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think it's still worth removing them. Having static, wordy helper constants just to help you set an enum flag seems a bit off. The ergonomics of setting an enum flag are good enough on their own IMO.

policy: SandboxImportNotificationPolicy,
) -> Iterator[None]:
"""Context manager to apply the given import notification policy."""
original_policy = _sandbox_import_notification_policy_override.value = getattr(
_sandbox_import_notification_policy_override,
"value",
None,
)
_sandbox_import_notification_policy_override.value = policy
try:
yield None
finally:
_sandbox_import_notification_policy_override.value = original_policy


class LoggerAdapter(logging.LoggerAdapter):
"""Adapter that adds details to the log about the running workflow.
Expand Down
Loading
Loading