Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 82 additions & 8 deletions durabletask/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

import logging
import uuid
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Optional, Sequence, Union
from enum import Enum
from typing import Any, Optional, Sequence, TypeVar, Union

import grpc
from google.protobuf import wrappers_pb2
Expand All @@ -16,13 +18,85 @@
from durabletask import task
from durabletask.aio.internal.grpc_interceptor import DefaultClientInterceptorImpl
from durabletask.aio.internal.shared import ClientInterceptor, get_grpc_aio_channel
from durabletask.client import (
OrchestrationState,
OrchestrationStatus,
TInput,
TOutput,
new_orchestration_state,
)

TInput = TypeVar("TInput")
TOutput = TypeVar("TOutput")


class OrchestrationStatus(Enum):
"""The status of an orchestration instance."""

RUNNING = pb.ORCHESTRATION_STATUS_RUNNING
COMPLETED = pb.ORCHESTRATION_STATUS_COMPLETED
FAILED = pb.ORCHESTRATION_STATUS_FAILED
TERMINATED = pb.ORCHESTRATION_STATUS_TERMINATED
CONTINUED_AS_NEW = pb.ORCHESTRATION_STATUS_CONTINUED_AS_NEW
PENDING = pb.ORCHESTRATION_STATUS_PENDING
SUSPENDED = pb.ORCHESTRATION_STATUS_SUSPENDED

def __str__(self):
return helpers.get_orchestration_status_str(self.value)


@dataclass
class OrchestrationState:
instance_id: str
name: str
runtime_status: OrchestrationStatus
created_at: datetime
last_updated_at: datetime
serialized_input: Optional[str]
serialized_output: Optional[str]
serialized_custom_status: Optional[str]
failure_details: Optional[task.FailureDetails]

def raise_if_failed(self):
if self.failure_details is not None:
raise OrchestrationFailedError(
f"Orchestration '{self.instance_id}' failed: {self.failure_details.message}",
self.failure_details,
)


class OrchestrationFailedError(Exception):
def __init__(self, message: str, failure_details: task.FailureDetails):
super().__init__(message)
self._failure_details = failure_details

@property
def failure_details(self):
return self._failure_details


def new_orchestration_state(
instance_id: str, res: pb.GetInstanceResponse
) -> Optional[OrchestrationState]:
if not res.exists:
return None

state = res.orchestrationState

failure_details = None
if state.failureDetails.errorMessage != "" or state.failureDetails.errorType != "":
failure_details = task.FailureDetails(
state.failureDetails.errorMessage,
state.failureDetails.errorType,
state.failureDetails.stackTrace.value
if not helpers.is_empty(state.failureDetails.stackTrace)
else None,
)

return OrchestrationState(
instance_id,
state.name,
OrchestrationStatus(state.orchestrationStatus),
state.createdTimestamp.ToDatetime(),
state.lastUpdatedTimestamp.ToDatetime(),
state.input.value if not helpers.is_empty(state.input) else None,
state.output.value if not helpers.is_empty(state.output) else None,
state.customStatus.value if not helpers.is_empty(state.customStatus) else None,
failure_details,
)


class AsyncTaskHubGrpcClient:
Expand Down
Loading
Loading