Skip to content

Commit 7564a92

Browse files
authored
Conductor Improvements (#617)
- Support for a "private mode" where workflow data (as opposed to metadata) is never sent to Conductor - Add the ability to "tag" executors with metadata for display in the dashboard - Add a new workflow aggregates API
1 parent c75d07d commit 7564a92

File tree

8 files changed

+364
-23
lines changed

8 files changed

+364
-23
lines changed

dbos/_conductor/conductor.py

Lines changed: 67 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ def run(self) -> None:
123123
hostname=socket.gethostname(),
124124
language="python",
125125
dbos_version=GlobalParams.dbos_version,
126+
executor_metadata=self.dbos._conductor_executor_metadata,
126127
)
127128
websocket.send(info_response.to_json())
128129
self.dbos.logger.info("Connected to DBOS conductor")
@@ -285,8 +286,8 @@ def run(self) -> None:
285286
workflow_id_prefix=body.get(
286287
"workflow_id_prefix", None
287288
),
288-
load_input=body.get("load_input", False),
289-
load_output=body.get("load_output", False),
289+
load_input=body.get("load_input", True),
290+
load_output=body.get("load_output", True),
290291
executor_id=body.get("executor_id", None),
291292
queues_only=body.get("queues_only", False),
292293
)
@@ -330,8 +331,8 @@ def run(self) -> None:
330331
workflow_id_prefix=q_body.get(
331332
"workflow_id_prefix", None
332333
),
333-
load_input=q_body.get("load_input", False),
334-
load_output=q_body.get("load_output", False),
334+
load_input=q_body.get("load_input", True),
335+
load_output=q_body.get("load_output", True),
335336
executor_id=q_body.get("executor_id", None),
336337
queues_only=True,
337338
)
@@ -358,7 +359,10 @@ def run(self) -> None:
358359
info = None
359360
try:
360361
info = get_workflow(
361-
self.dbos._sys_db, get_workflow_message.workflow_id
362+
self.dbos._sys_db,
363+
get_workflow_message.workflow_id,
364+
load_input=get_workflow_message.load_input,
365+
load_output=get_workflow_message.load_output,
362366
)
363367
except Exception as e:
364368
error_message = f"Exception encountered when getting workflow {get_workflow_message.workflow_id}: {traceback.format_exc()}"
@@ -478,7 +482,8 @@ def run(self) -> None:
478482
step_info = None
479483
try:
480484
step_info = self.dbos._sys_db.list_workflow_steps(
481-
list_steps_message.workflow_id
485+
list_steps_message.workflow_id,
486+
load_output=list_steps_message.load_output,
482487
)
483488
except Exception as e:
484489
error_message = f"Exception encountered when getting workflow {list_steps_message.workflow_id}: {traceback.format_exc()}"
@@ -645,9 +650,12 @@ def run(self) -> None:
645650
sched_body = list_sched_msg.body
646651
schedules: list[p.ScheduleOutput] = []
647652
try:
653+
load_context = sched_body.get("load_context", True)
648654
schedules = [
649655
p.ScheduleOutput.from_schedule(
650-
s, self.dbos._sys_db.serializer
656+
s,
657+
self.dbos._sys_db.serializer,
658+
load_context=load_context,
651659
)
652660
for s in self.dbos._sys_db.list_schedules(
653661
status=sched_body.get("status", None),
@@ -679,7 +687,9 @@ def run(self) -> None:
679687
)
680688
if sched is not None:
681689
output = p.ScheduleOutput.from_schedule(
682-
sched, self.dbos._sys_db.serializer
690+
sched,
691+
self.dbos._sys_db.serializer,
692+
load_context=get_sched_msg.load_context,
683693
)
684694
except Exception:
685695
error_message = f"Exception encountered when getting schedule '{get_sched_msg.schedule_name}': {traceback.format_exc()}"
@@ -814,6 +824,55 @@ def run(self) -> None:
814824
error_message=error_message,
815825
).to_json()
816826
)
827+
elif msg_type == p.MessageType.GET_WORKFLOW_AGGREGATES:
828+
agg_message = p.GetWorkflowAggregatesRequest.from_json(
829+
message
830+
)
831+
agg_body = agg_message.body
832+
agg_output: list[p.WorkflowAggregateOutput] = []
833+
try:
834+
agg_rows = self.dbos._sys_db.get_workflow_aggregates(
835+
group_by_status=agg_body.get(
836+
"group_by_status", False
837+
),
838+
group_by_name=agg_body.get("group_by_name", False),
839+
group_by_queue_name=agg_body.get(
840+
"group_by_queue_name", False
841+
),
842+
group_by_executor_id=agg_body.get(
843+
"group_by_executor_id", False
844+
),
845+
group_by_application_version=agg_body.get(
846+
"group_by_application_version", False
847+
),
848+
status=agg_body.get("status", None),
849+
start_time=agg_body.get("start_time", None),
850+
end_time=agg_body.get("end_time", None),
851+
name=agg_body.get("name", None),
852+
app_version=agg_body.get("app_version", None),
853+
executor_id=agg_body.get("executor_id", None),
854+
queue_name=agg_body.get("queue_name", None),
855+
workflow_id_prefix=agg_body.get(
856+
"workflow_id_prefix", None
857+
),
858+
)
859+
agg_output = [
860+
p.WorkflowAggregateOutput(
861+
group=r["group"], count=r["count"]
862+
)
863+
for r in agg_rows
864+
]
865+
except Exception:
866+
error_message = f"Exception encountered when getting workflow aggregates: {traceback.format_exc()}"
867+
self.dbos.logger.error(error_message)
868+
websocket.send(
869+
p.GetWorkflowAggregatesResponse(
870+
type=p.MessageType.GET_WORKFLOW_AGGREGATES,
871+
request_id=base_message.request_id,
872+
output=agg_output,
873+
error_message=error_message,
874+
).to_json()
875+
)
817876
else:
818877
self.dbos.logger.warning(
819878
f"Unexpected message type: {msg_type}"

dbos/_conductor/protocol.py

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class MessageType(str, Enum):
4242
GET_WORKFLOW_EVENTS = "get_workflow_events"
4343
GET_WORKFLOW_NOTIFICATIONS = "get_workflow_notifications"
4444
GET_WORKFLOW_STREAMS = "get_workflow_streams"
45+
GET_WORKFLOW_AGGREGATES = "get_workflow_aggregates"
4546

4647

4748
T = TypeVar("T", bound="BaseMessage")
@@ -88,6 +89,7 @@ class ExecutorInfoResponse(BaseMessage):
8889
hostname: Optional[str]
8990
language: Optional[str]
9091
dbos_version: Optional[str]
92+
executor_metadata: Optional[Dict[str, object]] = None
9193
error_message: Optional[str] = None
9294

9395

@@ -335,6 +337,8 @@ class ListQueuedWorkflowsResponse(BaseMessage):
335337
@dataclass
336338
class GetWorkflowRequest(BaseMessage):
337339
workflow_id: str
340+
load_input: bool = True
341+
load_output: bool = True
338342

339343

340344
@dataclass
@@ -358,6 +362,7 @@ class ExistPendingWorkflowsResponse(BaseMessage):
358362
@dataclass
359363
class ListStepsRequest(BaseMessage):
360364
workflow_id: str
365+
load_output: bool = True
361366

362367

363368
@dataclass
@@ -470,16 +475,22 @@ class ScheduleOutput:
470475
workflow_class_name: Optional[str]
471476
schedule: str
472477
status: str
473-
context: str
478+
context: Optional[str]
474479
last_fired_at: Optional[str]
475480
automatic_backfill: bool
476481
cron_timezone: Optional[str]
477482

478483
@classmethod
479484
def from_schedule(
480-
cls, s: WorkflowSchedule, serializer: Serializer
485+
cls,
486+
s: WorkflowSchedule,
487+
serializer: Serializer,
488+
*,
489+
load_context: bool = True,
481490
) -> "ScheduleOutput":
482-
context_str = str(serializer.deserialize(s["context"]))
491+
context_str = (
492+
str(serializer.deserialize(s["context"])) if load_context else None
493+
)
483494
return cls(
484495
schedule_id=s["schedule_id"],
485496
schedule_name=s["schedule_name"],
@@ -498,6 +509,7 @@ class ListSchedulesBody(TypedDict, total=False):
498509
status: Optional[Union[str, List[str]]]
499510
workflow_name: Optional[Union[str, List[str]]]
500511
schedule_name_prefix: Optional[Union[str, List[str]]]
512+
load_context: bool
501513

502514

503515
@dataclass
@@ -514,6 +526,7 @@ class ListSchedulesResponse(BaseMessage):
514526
@dataclass
515527
class GetScheduleRequest(BaseMessage):
516528
schedule_name: str
529+
load_context: bool = True
517530

518531

519532
@dataclass
@@ -678,3 +691,36 @@ class GetWorkflowStreamsRequest(BaseMessage):
678691
class GetWorkflowStreamsResponse(BaseMessage):
679692
streams: Optional[List[StreamEntryOutput]]
680693
error_message: Optional[str] = None
694+
695+
696+
class GetWorkflowAggregatesBody(TypedDict, total=False):
697+
group_by_status: bool
698+
group_by_name: bool
699+
group_by_queue_name: bool
700+
group_by_executor_id: bool
701+
group_by_application_version: bool
702+
status: Optional[List[str]]
703+
start_time: Optional[str]
704+
end_time: Optional[str]
705+
name: Optional[List[str]]
706+
app_version: Optional[List[str]]
707+
executor_id: Optional[List[str]]
708+
queue_name: Optional[List[str]]
709+
workflow_id_prefix: Optional[List[str]]
710+
711+
712+
@dataclass
713+
class GetWorkflowAggregatesRequest(BaseMessage):
714+
body: GetWorkflowAggregatesBody
715+
716+
717+
@dataclass
718+
class WorkflowAggregateOutput:
719+
group: Dict[str, Optional[str]]
720+
count: int
721+
722+
723+
@dataclass
724+
class GetWorkflowAggregatesResponse(BaseMessage):
725+
output: List[WorkflowAggregateOutput]
726+
error_message: Optional[str] = None

dbos/_dbos.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import copy
55
import hashlib
66
import inspect
7+
import json
78
import os
89
import sys
910
import threading
@@ -408,6 +409,16 @@ def __init__(
408409
self._alert_handler: Optional[Callable[[str, str, Dict[str, str]], None]] = None
409410
serializer = config.get("serializer")
410411
self._serializer: Serializer = serializer if serializer else DefaultSerializer()
412+
self._conductor_executor_metadata: Optional[Dict[str, Any]] = config.get(
413+
"conductor_executor_metadata"
414+
)
415+
if self._conductor_executor_metadata is not None:
416+
try:
417+
json.dumps(self._conductor_executor_metadata)
418+
except Exception as e:
419+
raise DBOSException(
420+
f"conductor_executor_metadata must be JSON-serializable: {e}"
421+
)
411422

412423
# Globally set the application version and executor ID.
413424
# In DBOS Cloud, instead use the values supplied through environment variables.

dbos/_dbos_config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class DBOSConfig(TypedDict, total=False):
4242
system_database_engine (sa.Engine): A custom system database engine. If provided, DBOS will not create an engine but use this instead.
4343
conductor_key (str): An API key for DBOS Conductor. Pass this in to connect your process to Conductor.
4444
conductor_url (str): The websockets URL for your DBOS Conductor service. Only set if you're self-hosting Conductor.
45+
conductor_executor_metadata (Dict[str, Any]): Metadata associated with this executor that may be used to identify an executor on the Conductor dashboard. Must be JSON-serializable.
4546
serializer (Serializer): A custom serializer and deserializer DBOS uses when storing program data in the system database
4647
use_listen_notify (bool): Whether to use LISTEN/NOTIFY or polling to listen for notifications and events. Defaults to True. As this affects migrations, may not be changed after the system database is first created.
4748
notification_listener_polling_interval_sec (float): Polling interval in seconds for the notification listener background process. Defaults to 1.0. Minimum value is 0.001. Lower values can speed up test execution.
@@ -69,6 +70,7 @@ class DBOSConfig(TypedDict, total=False):
6970
system_database_engine: Optional[sa.Engine]
7071
conductor_key: Optional[str]
7172
conductor_url: Optional[str]
73+
conductor_executor_metadata: Optional[Dict[str, Any]]
7274
serializer: Optional[Serializer]
7375
enable_patching: Optional[bool]
7476
use_listen_notify: Optional[bool]

0 commit comments

Comments
 (0)