Skip to content

Bulk Operations & Communication Observability#608

Merged
kraftp merged 12 commits intomainfrom
kraftp/bulk
Mar 9, 2026
Merged

Bulk Operations & Communication Observability#608
kraftp merged 12 commits intomainfrom
kraftp/bulk

Conversation

@kraftp
Copy link
Copy Markdown
Member

@kraftp kraftp commented Mar 5, 2026

  • Add a bulk API for canceling, resuming, and deleting workflows.
  • Add this API to Conductor
  • Add observability for workflow communications (events, notifications, streams) to Conductor

@kraftp kraftp marked this pull request as ready for review March 6, 2026 00:00
@dosubot
Copy link
Copy Markdown

dosubot bot commented Mar 6, 2026

Documentation Updates

2 document(s) were updated by changes in this PR:

Async Workflow Execution
View Changes
@@ -621,6 +621,9 @@
 - [`list_workflows_async`](https://docs.dbos.dev/python/reference/contexts#list_workflows_async)
 - [`write_stream_async`](https://docs.dbos.dev/python/reference/contexts#write_stream_async) / [`read_stream_async`](https://docs.dbos.dev/python/reference/contexts#read_stream_async)
 - [`wait_first_async`](https://docs.dbos.dev/python/reference/contexts#wait_first_async)
+- [`cancel_workflow_async`](https://docs.dbos.dev/python/reference/contexts#cancel_workflow_async) / [`cancel_workflows_async`](https://docs.dbos.dev/python/reference/contexts#cancel_workflows_async)
+- [`resume_workflow_async`](https://docs.dbos.dev/python/reference/contexts#resume_workflow_async) / [`resume_workflows_async`](https://docs.dbos.dev/python/reference/contexts#resume_workflows_async)
+- [`delete_workflow_async`](https://docs.dbos.dev/python/reference/contexts#delete_workflow_async) / [`delete_workflows_async`](https://docs.dbos.dev/python/reference/contexts#delete_workflows_async)
 
 All async context methods return coroutines that must be awaited in async functions.
 
@@ -736,6 +739,196 @@
     return results
 ```
 
+### cancel_workflow / cancel_workflow_async
+
+**Signature (async):**
+```python
+DBOS.cancel_workflow_async(workflow_id: str) -> None
+```
+
+**Signature (sync):**
+```python
+DBOS.cancel_workflow(workflow_id: str) -> None
+```
+
+Cancel a workflow by ID. The workflow's status is set to CANCELLED and it is removed from any queue, but only if the workflow is not already complete.
+
+**Parameters:**
+- **`workflow_id`**: The workflow ID to cancel.
+
+**Example:**
+```python
+# Synchronous cancellation
+DBOS.cancel_workflow(workflow_id)
+
+# Asynchronous cancellation
+await DBOS.cancel_workflow_async(workflow_id)
+```
+
+### cancel_workflows / cancel_workflows_async
+
+**Signature (async):**
+```python
+DBOS.cancel_workflows_async(workflow_ids: List[str]) -> None
+```
+
+**Signature (sync):**
+```python
+DBOS.cancel_workflows(workflow_ids: List[str]) -> None
+```
+
+Cancel multiple workflows at once by providing a list of workflow IDs. This is more efficient than calling `cancel_workflow()` repeatedly for each workflow. All specified workflows will have their status set to CANCELLED and be removed from any queues, but only if they are not already complete.
+
+**Parameters:**
+- **`workflow_ids`**: List of workflow IDs to cancel.
+
+**Example:**
+```python
+# Cancel multiple workflows efficiently
+workflow_ids = [wf1_id, wf2_id, wf3_id]
+DBOS.cancel_workflows(workflow_ids)
+
+# Async version
+await DBOS.cancel_workflows_async(workflow_ids)
+```
+
+### resume_workflow / resume_workflow_async
+
+**Signature (async):**
+```python
+DBOS.resume_workflow_async(workflow_id: str) -> WorkflowHandleAsync[Any]
+```
+
+**Signature (sync):**
+```python
+DBOS.resume_workflow(workflow_id: str) -> WorkflowHandle[Any]
+```
+
+Resume a cancelled workflow by ID. The workflow's status is set to ENQUEUED and its recovery attempts and deadline are cleared, but only if the workflow is not already complete. Returns a handle to the resumed workflow.
+
+**Parameters:**
+- **`workflow_id`**: The workflow ID to resume.
+
+**Returns:**
+A workflow handle for the resumed workflow.
+
+**Example:**
+```python
+# Resume a cancelled workflow
+handle = DBOS.resume_workflow(workflow_id)
+result = handle.get_result()
+
+# Async version
+handle = await DBOS.resume_workflow_async(workflow_id)
+result = await handle.get_result()
+```
+
+### resume_workflows / resume_workflows_async
+
+**Signature (async):**
+```python
+DBOS.resume_workflows_async(workflow_ids: List[str]) -> List[WorkflowHandleAsync[Any]]
+```
+
+**Signature (sync):**
+```python
+DBOS.resume_workflows(workflow_ids: List[str]) -> List[WorkflowHandle[Any]]
+```
+
+Resume multiple cancelled workflows at once by providing a list of workflow IDs. This is more efficient than calling `resume_workflow()` repeatedly for each workflow. All specified workflows will have their status set to ENQUEUED and their recovery attempts and deadline cleared, but only if they are not already complete. Returns a list of handles corresponding to the resumed workflows.
+
+**Parameters:**
+- **`workflow_ids`**: List of workflow IDs to resume.
+
+**Returns:**
+A list of workflow handles for the resumed workflows, in the same order as the input workflow IDs.
+
+**Example:**
+```python
+# Resume multiple workflows efficiently
+workflow_ids = [wf1_id, wf2_id, wf3_id]
+handles = DBOS.resume_workflows(workflow_ids)
+results = [h.get_result() for h in handles]
+
+# Async version
+handles = await DBOS.resume_workflows_async(workflow_ids)
+results = [await h.get_result() for h in handles]
+```
+
+### delete_workflow / delete_workflow_async
+
+**Signature (async):**
+```python
+DBOS.delete_workflow_async(
+    workflow_id: str, 
+    *, 
+    delete_children: bool = False
+) -> None
+```
+
+**Signature (sync):**
+```python
+DBOS.delete_workflow(
+    workflow_id: str, 
+    *, 
+    delete_children: bool = False
+) -> None
+```
+
+Delete a workflow and all its associated data by ID. If `delete_children` is True, also recursively deletes all child workflows.
+
+**Parameters:**
+- **`workflow_id`**: The workflow ID to delete.
+- **`delete_children`**: If True, also delete all child workflows recursively. Defaults to False.
+
+**Example:**
+```python
+# Delete a workflow and all its children
+DBOS.delete_workflow(workflow_id, delete_children=True)
+
+# Async version
+await DBOS.delete_workflow_async(workflow_id, delete_children=True)
+```
+
+### delete_workflows / delete_workflows_async
+
+**Signature (async):**
+```python
+DBOS.delete_workflows_async(
+    workflow_ids: List[str], 
+    *, 
+    delete_children: bool = False
+) -> None
+```
+
+**Signature (sync):**
+```python
+DBOS.delete_workflows(
+    workflow_ids: List[str], 
+    *, 
+    delete_children: bool = False
+) -> None
+```
+
+Delete multiple workflows and all their associated data at once by providing a list of workflow IDs. This is more efficient than calling `delete_workflow()` repeatedly for each workflow. If `delete_children` is True, also recursively deletes all child workflows of each specified workflow.
+
+**Parameters:**
+- **`workflow_ids`**: List of workflow IDs to delete.
+- **`delete_children`**: If True, also delete all child workflows of each specified workflow recursively. Defaults to False.
+
+**Example:**
+```python
+# Delete multiple workflows efficiently
+workflow_ids = [wf1_id, wf2_id, wf3_id]
+DBOS.delete_workflows(workflow_ids)
+
+# Delete workflows and all their children
+DBOS.delete_workflows(workflow_ids, delete_children=True)
+
+# Async version
+await DBOS.delete_workflows_async(workflow_ids, delete_children=True)
+```
+
 ## Relevant Code Files
 
 | File | Description | URL |
System Database Schema
View Changes
@@ -156,6 +156,49 @@
 - `offset` (INTEGER) - Position in stream
 - `function_id` (INTEGER) - [Function that added this value](https://github.com/dbos-inc/dbos-transact-py/blob/12049d833dddbcaaea18d921149e2c33aad37ecf/dbos/_migration.py#L246-L247)
 - `serialization` (TEXT) - Serialization format metadata
+
+## Workflow Communication Observability
+
+The SystemDB class provides methods to query and inspect workflow communications for debugging and monitoring purposes. These methods allow you to retrieve events, notifications, and streams associated with a specific workflow.
+
+### get_all_events()
+
+The `get_all_events(workflow_id: str)` method retrieves all events associated with a workflow from the `workflow_events` table. It returns a dictionary mapping event keys to their deserialized values.
+
+**Returns:** `Dict[str, Any]` - Dictionary of event key-value pairs
+
+**Use Cases:**
+- Inspecting workflow state set via events
+- Debugging event-based workflow logic
+- Auditing workflow event history
+
+### get_all_notifications()
+
+The `get_all_notifications(workflow_id: str)` method retrieves all notifications sent to a workflow from the `notifications` table. It returns a list of notification records ordered by creation timestamp.
+
+**Returns:** `List[NotificationInfo]` where each `NotificationInfo` contains:
+- `topic` (Optional[str]) - Notification topic, or None for default topic
+- `message` (Any) - Deserialized message content
+- `created_at_epoch_ms` (int) - Message creation timestamp in milliseconds
+- `consumed` (bool) - Whether the message has been consumed by `recv()`
+
+**Use Cases:**
+- Monitoring inter-workflow communication
+- Debugging notification delivery and consumption
+- Auditing message flow between workflows
+
+### get_all_stream_entries()
+
+The `get_all_stream_entries(workflow_id: str)` method retrieves all stream entries for a workflow from the `streams` table. It returns a dictionary mapping stream keys to lists of deserialized values ordered by offset.
+
+**Returns:** `Dict[str, List[Any]]` - Dictionary mapping stream keys to ordered lists of values
+
+**Use Cases:**
+- Inspecting stream data produced by workflows
+- Debugging stream-based workflow patterns
+- Analyzing workflow data flow
+
+**Note:** Stream closed sentinel values (`__DBOS_STREAM_CLOSED__`) are automatically filtered out from the results.
 
 ### workflow_schedules
 

How did I do? Any feedback?  Join Discord

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants