The entry-point that consumers of the SDK interact with is the DurableContext.
- Core Methods:
set_logger,step,invoke,map,parallel,run_in_child_context,wait,create_callback,wait_for_callback,wait_for_condition - Thread Safety: Uses
OrderedCounterfor generating sequential step IDs - State Management: Delegates to
ExecutionStatefor checkpointing
- Map/Parallel: Both inherit from
ConcurrentExecutorabstract base class - Thread Pool: Uses
ThreadPoolExecutorfor concurrent execution - State Tracking:
ExecutableWithStatemanages individual task lifecycle - Completion Logic:
ExecutionCounterstracks success/failure criteria - Suspension:
TimerSchedulerhandles timed suspensions and resumptions
- Modular Configs: Separate config classes for each operation type
- Completion Control:
CompletionConfigdefines success/failure criteria - Serialization:
SerDesinterface for custom serialization
- Separation of Concerns: Each operation has dedicated handler function
- Checkpointing: All operations integrate with execution state checkpointing
- Error Handling: Consistent error handling and retry logic across operations
classDiagram
class DurableContext {
-ExecutionState state
-Any lambda_context
-str _parent_id
-OrderedCounter _step_counter
-LogInfo _log_info
-Logger logger
+set_logger(LoggerInterface new_logger)
+step(Callable func, str name, StepConfig config) T
+invoke(str function_name, P payload, str name, InvokeConfig config) R
+map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult
+parallel(Sequence functions, str name, ParallelConfig config) BatchResult
+run_in_child_context(Callable func, str name, ChildConfig config) T
+wait(int seconds, str name)
+create_callback(str name, CallbackConfig config) Callback
+wait_for_callback(Callable submitter, str name, WaitForCallbackConfig config) Any
+wait_for_condition(Callable check, WaitForConditionConfig config, str name) T
}
class DurableContextProtocol {
<<interface>>
+step(Callable func, str name, StepConfig config) T
+run_in_child_context(Callable func, str name, ChildConfig config) T
+map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult
+parallel(Sequence functions, str name, ParallelConfig config) BatchResult
+wait(int seconds, str name)
+create_callback(str name, CallbackConfig config) Callback
}
class OrderedCounter {
-OrderedLock _lock
-int _counter
+increment() int
+decrement() int
+get_current() int
}
class ExecutionState {
+str durable_execution_arn
+get_checkpoint_result(str operation_id) CheckpointedResult
+create_checkpoint(OperationUpdate operation_update)
}
class Logger {
+LoggerInterface logger
+LogInfo info
+with_log_info(LogInfo info) Logger
+from_log_info(LoggerInterface logger, LogInfo info) Logger
}
DurableContext ..|> DurableContextProtocol : implements
DurableContext --> ExecutionState : uses
DurableContext --> OrderedCounter : contains
DurableContext --> Logger : contains
The DurableContext calls operation handlers, which contain the execution logic for each operation.
classDiagram
class DurableContext {
+step(Callable func, str name, StepConfig config) T
+invoke(str function_name, P payload, str name, InvokeConfig config) R
+map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult
+parallel(Sequence functions, str name, ParallelConfig config) BatchResult
+run_in_child_context(Callable func, str name, ChildConfig config) T
+wait(int seconds, str name)
+create_callback(str name, CallbackConfig config) Callback
+wait_for_callback(Callable submitter, str name, WaitForCallbackConfig config) Any
+wait_for_condition(Callable check, WaitForConditionConfig config, str name) T
}
class step_handler {
<<function>>
+step_handler(Callable func, ExecutionState state, OperationIdentifier op_id, StepConfig config, Logger logger) T
}
class invoke_handler {
<<function>>
+invoke_handler(str function_name, P payload, ExecutionState state, OperationIdentifier op_id, InvokeConfig config) R
}
class map_handler {
<<function>>
+map_handler(Sequence items, Callable func, MapConfig config, ExecutionState state, Callable run_in_child_context) BatchResult
}
class parallel_handler {
<<function>>
+parallel_handler(Sequence callables, ParallelConfig config, ExecutionState state, Callable run_in_child_context) BatchResult
}
class child_handler {
<<function>>
+child_handler(Callable func, ExecutionState state, OperationIdentifier op_id, ChildConfig config) T
}
class wait_handler {
<<function>>
+wait_handler(int seconds, ExecutionState state, OperationIdentifier op_id)
}
class create_callback_handler {
<<function>>
+create_callback_handler(ExecutionState state, OperationIdentifier op_id, CallbackConfig config) str
}
class wait_for_callback_handler {
<<function>>
+wait_for_callback_handler(DurableContext context, Callable submitter, str name, WaitForCallbackConfig config) Any
}
class wait_for_condition_handler {
<<function>>
+wait_for_condition_handler(Callable check, WaitForConditionConfig config, ExecutionState state, OperationIdentifier op_id, Logger logger) T
}
DurableContext --> step_handler : calls
DurableContext --> invoke_handler : calls
DurableContext --> map_handler : calls
DurableContext --> parallel_handler : calls
DurableContext --> child_handler : calls
DurableContext --> wait_handler : calls
DurableContext --> create_callback_handler : calls
DurableContext --> wait_for_callback_handler : calls
DurableContext --> wait_for_condition_handler : calls
classDiagram
class StepConfig {
+Callable retry_strategy
+StepSemantics step_semantics
+SerDes serdes
}
class InvokeConfig~P,R~ {
+int timeout_seconds
+SerDes~P~ serdes_payload
+SerDes~R~ serdes_result
}
class MapConfig {
+int max_concurrency
+ItemBatcher item_batcher
+CompletionConfig completion_config
+SerDes serdes
}
class ParallelConfig {
+int max_concurrency
+CompletionConfig completion_config
+SerDes serdes
}
class ChildConfig~T~ {
+SerDes serdes
+OperationSubType sub_type
+Callable~T,str~ summary_generator
}
class CallbackConfig {
+int timeout_seconds
+int heartbeat_timeout_seconds
+SerDes serdes
}
class WaitForCallbackConfig {
+Callable retry_strategy
}
class WaitForConditionConfig~T~ {
+Callable wait_strategy
+T initial_state
+SerDes serdes
}
class CompletionConfig {
+int min_successful
+int tolerated_failure_count
+float tolerated_failure_percentage
+first_successful()$ CompletionConfig
+all_completed()$ CompletionConfig
+all_successful()$ CompletionConfig
}
class ItemBatcher~T~ {
+int max_items_per_batch
+float max_item_bytes_per_batch
+T batch_input
}
WaitForCallbackConfig --|> CallbackConfig : extends
MapConfig --> CompletionConfig : contains
MapConfig --> ItemBatcher : contains
ParallelConfig --> CompletionConfig : contains
classDiagram
class DurableContextProtocol {
<<interface>>
+step(Callable func, str name, StepConfig config) T
+run_in_child_context(Callable func, str name, ChildConfig config) T
+map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult
+parallel(Sequence functions, str name, ParallelConfig config) BatchResult
+wait(int seconds, str name)
+create_callback(str name, CallbackConfig config) Callback
}
class LoggerInterface {
<<interface>>
+debug(object msg, *args, Mapping extra)
+info(object msg, *args, Mapping extra)
+warning(object msg, *args, Mapping extra)
+error(object msg, *args, Mapping extra)
+exception(object msg, *args, Mapping extra)
}
class CallbackProtocol~C_co~ {
<<interface>>
+str callback_id
+result() C_co
}
class BatchResultProtocol~T~ {
<<interface>>
+get_results() list~T~
}
class StepContext {
+LoggerInterface logger
}
class WaitForConditionCheckContext {
+LoggerInterface logger
}
class OperationContext {
+LoggerInterface logger
}
StepContext --|> OperationContext : extends
WaitForConditionCheckContext --|> OperationContext : extends
classDiagram
class SerDes~T~ {
<<abstract>>
+serialize(T value, SerDesContext context) str
+deserialize(str data, SerDesContext context) T
}
class JsonSerDes~T~ {
+serialize(T value, SerDesContext context) str
+deserialize(str data, SerDesContext context) T
}
class SerDesContext {
+str operation_id
+str durable_execution_arn
}
class serialize {
<<function>>
+serialize(SerDes serdes, T value, str operation_id, str durable_execution_arn) str
}
class deserialize {
<<function>>
+deserialize(SerDes serdes, str data, str operation_id, str durable_execution_arn) T
}
JsonSerDes ..|> SerDes : implements
serialize --> SerDes : uses
deserialize --> SerDes : uses
SerDes --> SerDesContext : uses
classDiagram
class ConcurrentExecutor~CallableType,ResultType~ {
<<abstract>>
+list~Executable~ executables
+int max_concurrency
+CompletionConfig completion_config
+ExecutionCounters counters
+list~ExecutableWithState~ executables_with_state
+Event _completion_event
+SuspendExecution _suspend_exception
+execute(ExecutionState state, Callable run_in_child_context) BatchResult~ResultType~
+execute_item(DurableContext child_context, Executable executable)* ResultType
+should_execution_suspend() SuspendResult
-_on_task_complete(ExecutableWithState exe_state, Future future, TimerScheduler scheduler)
-_create_result() BatchResult~ResultType~
}
class MapExecutor~T,R~ {
+Sequence~T~ items
+execute_item(DurableContext child_context, Executable executable) R
+from_items(Sequence items, Callable func, MapConfig config)$ MapExecutor
}
class ParallelExecutor {
+execute_item(DurableContext child_context, Executable executable) R
+from_callables(Sequence callables, ParallelConfig config)$ ParallelExecutor
}
class Executable~CallableType~ {
+int index
+CallableType func
}
class ExecutableWithState~CallableType,ResultType~ {
+Executable~CallableType~ executable
-BranchStatus _status
-Future _future
-float _suspend_until
-ResultType _result
-Exception _error
+run(Future future)
+suspend()
+suspend_with_timeout(float timestamp)
+complete(ResultType result)
+fail(Exception error)
+reset_to_pending()
+can_resume() bool
+is_running() bool
}
class ExecutionCounters {
+int total_tasks
+int min_successful
+int success_count
+int failure_count
-Lock _lock
+complete_task()
+fail_task()
+should_complete() bool
+is_all_completed() bool
+is_min_successful_reached() bool
+is_failure_tolerance_exceeded() bool
}
class TimerScheduler {
+Callable resubmit_callback
-list _pending_resumes
-Lock _lock
-Event _shutdown
-Thread _timer_thread
+schedule_resume(ExecutableWithState exe_state, float resume_time)
+shutdown()
-_timer_loop()
}
class BatchResult~R~ {
+list~BatchItem~R~~ all
+CompletionReason completion_reason
+succeeded() list~BatchItem~R~~
+failed() list~BatchItem~R~~
+get_results() list~R~
+throw_if_error()
}
class BatchItem~R~ {
+int index
+BatchItemStatus status
+R result
+ErrorObject error
}
MapExecutor --|> ConcurrentExecutor : extends
ParallelExecutor --|> ConcurrentExecutor : extends
ConcurrentExecutor --> ExecutableWithState : manages
ConcurrentExecutor --> ExecutionCounters : uses
ConcurrentExecutor --> TimerScheduler : uses
ConcurrentExecutor --> BatchResult : creates
ExecutableWithState --> Executable : contains
BatchResult --> BatchItem : contains
sequenceDiagram
participant DC as DurableContext
participant MH as map_handler
participant ME as MapExecutor
participant CE as ConcurrentExecutor
participant TP as ThreadPoolExecutor
participant TS as TimerScheduler
participant EC as ExecutionCounters
DC->>MH: map(inputs, func, config)
MH->>ME: MapExecutor.from_items()
ME->>CE: execute(state, run_in_child_context)
CE->>TP: ThreadPoolExecutor(max_workers)
CE->>TS: TimerScheduler(resubmitter)
CE->>EC: ExecutionCounters(total, min_successful)
loop For each executable
CE->>TP: submit_task(executable_with_state)
TP->>CE: execute_item_in_child_context()
CE->>DC: run_in_child_context(child_func)
DC->>ME: execute_item(child_context, executable)
end
par Task Completion Handling
TP->>CE: on_task_complete(future)
CE->>EC: complete_task() / fail_task()
CE->>CE: should_execution_suspend()
alt Should Complete
CE->>CE: _completion_event.set()
else Should Suspend
CE->>TS: schedule_resume(exe_state, timestamp)
end
end
CE->>CE: _completion_event.wait()
CE->>CE: _create_result()
CE->>DC: BatchResult
classDiagram
class OrderedLock {
-Lock _lock
-deque~Event~ _waiters
-bool _is_broken
-Exception _exception
+acquire() bool
+release()
+reset()
+is_broken() bool
+__enter__() OrderedLock
+__exit__(exc_type, exc_val, exc_tb)
}
class OrderedCounter {
-OrderedLock _lock
-int _counter
+increment() int
+decrement() int
+get_current() int
}