Skip to content

Commit 64dde17

Browse files
authored
Queue List (#205)
CLI utility for listing the contents of your queues.
1 parent 4afd4d9 commit 64dde17

File tree

7 files changed

+348
-73
lines changed

7 files changed

+348
-73
lines changed

dbos/_dbos_config.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,10 @@ def get_dbos_database_url(config_file_path: str = DBOS_CONFIG_PATH) -> str:
123123

124124

125125
def load_config(
126-
config_file_path: str = DBOS_CONFIG_PATH, *, use_db_wizard: bool = True
126+
config_file_path: str = DBOS_CONFIG_PATH,
127+
*,
128+
use_db_wizard: bool = True,
129+
silent: bool = False,
127130
) -> ConfigFile:
128131
"""
129132
Load the DBOS `ConfigFile` from the specified path (typically `dbos-config.yaml`).
@@ -188,18 +191,19 @@ def load_config(
188191
# Load the DB connection file. Use its values for missing fields from dbos-config.yaml. Use defaults otherwise.
189192
data = cast(ConfigFile, data)
190193
db_connection = load_db_connection()
191-
if data["database"].get("hostname"):
192-
print(
193-
"[bold blue]Loading database connection parameters from dbos-config.yaml[/bold blue]"
194-
)
195-
elif db_connection.get("hostname"):
196-
print(
197-
"[bold blue]Loading database connection parameters from .dbos/db_connection[/bold blue]"
198-
)
199-
else:
200-
print(
201-
"[bold blue]Using default database connection parameters (localhost)[/bold blue]"
202-
)
194+
if not silent:
195+
if data["database"].get("hostname"):
196+
print(
197+
"[bold blue]Loading database connection parameters from dbos-config.yaml[/bold blue]"
198+
)
199+
elif db_connection.get("hostname"):
200+
print(
201+
"[bold blue]Loading database connection parameters from .dbos/db_connection[/bold blue]"
202+
)
203+
else:
204+
print(
205+
"[bold blue]Using default database connection parameters (localhost)[/bold blue]"
206+
)
203207

204208
data["database"]["hostname"] = (
205209
data["database"].get("hostname") or db_connection.get("hostname") or "localhost"

dbos/_sys_db.py

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,15 @@ def __init__(self) -> None:
126126
)
127127

128128

129+
class GetQueuedWorkflowsInput(TypedDict):
130+
queue_name: Optional[str]
131+
status: Optional[str]
132+
start_time: Optional[str] # Timestamp in ISO 8601 format
133+
end_time: Optional[str] # Timestamp in ISO 8601 format
134+
limit: Optional[int] # Return up to this many workflows IDs.
135+
name: Optional[str] # The name of the workflow function
136+
137+
129138
class GetWorkflowsOutput:
130139
def __init__(self, workflow_uuids: List[str]):
131140
self.workflow_uuids = workflow_uuids
@@ -658,7 +667,6 @@ def get_workflows(self, input: GetWorkflowsInput) -> GetWorkflowsOutput:
658667
query = sa.select(SystemSchema.workflow_status.c.workflow_uuid).order_by(
659668
SystemSchema.workflow_status.c.created_at.desc()
660669
)
661-
662670
if input.name:
663671
query = query.where(SystemSchema.workflow_status.c.name == input.name)
664672
if input.authenticated_user:
@@ -692,6 +700,52 @@ def get_workflows(self, input: GetWorkflowsInput) -> GetWorkflowsOutput:
692700

693701
return GetWorkflowsOutput(workflow_uuids)
694702

703+
def get_queued_workflows(
704+
self, input: GetQueuedWorkflowsInput
705+
) -> GetWorkflowsOutput:
706+
707+
query = (
708+
sa.select(SystemSchema.workflow_queue.c.workflow_uuid)
709+
.join(
710+
SystemSchema.workflow_status,
711+
SystemSchema.workflow_queue.c.workflow_uuid
712+
== SystemSchema.workflow_status.c.workflow_uuid,
713+
)
714+
.order_by(SystemSchema.workflow_status.c.created_at.desc())
715+
)
716+
717+
if input.get("name"):
718+
query = query.where(SystemSchema.workflow_status.c.name == input["name"])
719+
720+
if input.get("queue_name"):
721+
query = query.where(
722+
SystemSchema.workflow_queue.c.queue_name == input["queue_name"]
723+
)
724+
725+
if input.get("status"):
726+
query = query.where(
727+
SystemSchema.workflow_status.c.status == input["status"]
728+
)
729+
if "start_time" in input and input["start_time"] is not None:
730+
query = query.where(
731+
SystemSchema.workflow_status.c.created_at
732+
>= datetime.datetime.fromisoformat(input["start_time"]).timestamp()
733+
* 1000
734+
)
735+
if "end_time" in input and input["end_time"] is not None:
736+
query = query.where(
737+
SystemSchema.workflow_status.c.created_at
738+
<= datetime.datetime.fromisoformat(input["end_time"]).timestamp() * 1000
739+
)
740+
if input.get("limit"):
741+
query = query.limit(input["limit"])
742+
743+
with self.engine.begin() as c:
744+
rows = c.execute(query)
745+
workflow_uuids = [row[0] for row in rows]
746+
747+
return GetWorkflowsOutput(workflow_uuids)
748+
695749
def get_pending_workflows(self, executor_id: str) -> list[str]:
696750
with self.engine.begin() as c:
697751
rows = c.execute(

dbos/_workflow_commands.py

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from . import _serialization
66
from ._dbos_config import ConfigFile
77
from ._sys_db import (
8+
GetQueuedWorkflowsInput,
89
GetWorkflowsInput,
910
GetWorkflowsOutput,
1011
SystemDatabase,
@@ -19,8 +20,8 @@ class WorkflowInformation:
1920
workflowClassName: Optional[str]
2021
workflowConfigName: Optional[str]
2122
input: Optional[_serialization.WorkflowInputs] # JSON (jsonpickle)
22-
output: Optional[str] # JSON (jsonpickle)
23-
error: Optional[str] # JSON (jsonpickle)
23+
output: Optional[str] = None # JSON (jsonpickle)
24+
error: Optional[str] = None # JSON (jsonpickle)
2425
executor_id: Optional[str]
2526
app_version: Optional[str]
2627
app_id: Optional[str]
@@ -34,17 +35,15 @@ class WorkflowInformation:
3435

3536
def list_workflows(
3637
config: ConfigFile,
37-
li: int,
38+
limit: int,
3839
user: Optional[str],
3940
starttime: Optional[str],
4041
endtime: Optional[str],
4142
status: Optional[str],
4243
request: bool,
4344
appversion: Optional[str],
45+
name: Optional[str],
4446
) -> List[WorkflowInformation]:
45-
46-
sys_db = None
47-
4847
try:
4948
sys_db = SystemDatabase(config)
5049

@@ -55,24 +54,55 @@ def list_workflows(
5554
if status is not None:
5655
input.status = cast(WorkflowStatuses, status)
5756
input.application_version = appversion
58-
input.limit = li
57+
input.limit = limit
58+
input.name = name
5959

6060
output: GetWorkflowsOutput = sys_db.get_workflows(input)
61-
6261
infos: List[WorkflowInformation] = []
62+
for workflow_id in output.workflow_uuids:
63+
info = _get_workflow_info(
64+
sys_db, workflow_id, request
65+
) # Call the method for each ID
66+
if info is not None:
67+
infos.append(info)
68+
69+
return infos
70+
except Exception as e:
71+
typer.echo(f"Error listing workflows: {e}")
72+
return []
73+
finally:
74+
if sys_db:
75+
sys_db.destroy()
6376

64-
if output.workflow_uuids is None:
65-
typer.echo("No workflows found")
66-
return {}
6777

78+
def list_queued_workflows(
79+
config: ConfigFile,
80+
limit: Optional[int] = None,
81+
start_time: Optional[str] = None,
82+
end_time: Optional[str] = None,
83+
queue_name: Optional[str] = None,
84+
status: Optional[str] = None,
85+
name: Optional[str] = None,
86+
request: bool = False,
87+
) -> List[WorkflowInformation]:
88+
try:
89+
sys_db = SystemDatabase(config)
90+
input: GetQueuedWorkflowsInput = {
91+
"queue_name": queue_name,
92+
"start_time": start_time,
93+
"end_time": end_time,
94+
"status": status,
95+
"limit": limit,
96+
"name": name,
97+
}
98+
output: GetWorkflowsOutput = sys_db.get_queued_workflows(input)
99+
infos: List[WorkflowInformation] = []
68100
for workflow_id in output.workflow_uuids:
69101
info = _get_workflow_info(
70102
sys_db, workflow_id, request
71103
) # Call the method for each ID
72-
73104
if info is not None:
74105
infos.append(info)
75-
76106
return infos
77107
except Exception as e:
78108
typer.echo(f"Error listing workflows: {e}")

dbos/cli/cli.py

Lines changed: 84 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,21 @@
1919
from .._app_db import ApplicationDatabase
2020
from .._dbos_config import _is_valid_app_name
2121
from .._sys_db import SystemDatabase, reset_system_database
22-
from .._workflow_commands import cancel_workflow, get_workflow, list_workflows
22+
from .._workflow_commands import (
23+
cancel_workflow,
24+
get_workflow,
25+
list_queued_workflows,
26+
list_workflows,
27+
)
2328
from ..cli._github_init import create_template_from_github
2429
from ._template_init import copy_template, get_project_name, get_templates_directory
2530

2631
app = typer.Typer()
2732
workflow = typer.Typer()
33+
queue = typer.Typer()
2834

2935
app.add_typer(workflow, name="workflow", help="Manage DBOS workflows")
36+
workflow.add_typer(queue, name="queue", help="Manage enqueued workflows")
3037

3138

3239
def _on_windows() -> bool:
@@ -272,35 +279,35 @@ def list(
272279
help="Retrieve workflows with this application version",
273280
),
274281
] = None,
282+
name: Annotated[
283+
typing.Optional[str],
284+
typer.Option(
285+
"--name",
286+
"-n",
287+
help="Retrieve workflows with this name",
288+
),
289+
] = None,
275290
request: Annotated[
276291
bool,
277292
typer.Option("--request", help="Retrieve workflow request information"),
278293
] = True,
279-
appdir: Annotated[
280-
typing.Optional[str],
281-
typer.Option("--app-dir", "-d", help="Specify the application root directory"),
282-
] = None,
283294
) -> None:
284-
config = load_config()
295+
config = load_config(silent=True)
285296
workflows = list_workflows(
286-
config, limit, user, starttime, endtime, status, request, appversion
297+
config, limit, user, starttime, endtime, status, request, appversion, name
287298
)
288299
print(jsonpickle.encode(workflows, unpicklable=False))
289300

290301

291302
@workflow.command(help="Retrieve the status of a workflow")
292303
def get(
293304
uuid: Annotated[str, typer.Argument()],
294-
appdir: Annotated[
295-
typing.Optional[str],
296-
typer.Option("--app-dir", "-d", help="Specify the application root directory"),
297-
] = None,
298305
request: Annotated[
299306
bool,
300307
typer.Option("--request", help="Retrieve workflow request information"),
301308
] = True,
302309
) -> None:
303-
config = load_config()
310+
config = load_config(silent=True)
304311
print(jsonpickle.encode(get_workflow(config, uuid, request), unpicklable=False))
305312

306313

@@ -309,10 +316,6 @@ def get(
309316
)
310317
def cancel(
311318
uuid: Annotated[str, typer.Argument()],
312-
appdir: Annotated[
313-
typing.Optional[str],
314-
typer.Option("--app-dir", "-d", help="Specify the application root directory"),
315-
] = None,
316319
) -> None:
317320
config = load_config()
318321
cancel_workflow(config, uuid)
@@ -363,5 +366,70 @@ def restart(
363366
print(f"Failed to resume workflow {uuid}. Status code: {response.status_code}")
364367

365368

369+
@queue.command(name="list", help="List enqueued functions for your application")
370+
def list_queue(
371+
limit: Annotated[
372+
typing.Optional[int],
373+
typer.Option("--limit", "-l", help="Limit the results returned"),
374+
] = None,
375+
start_time: Annotated[
376+
typing.Optional[str],
377+
typer.Option(
378+
"--start-time",
379+
"-s",
380+
help="Retrieve functions starting after this timestamp (ISO 8601 format)",
381+
),
382+
] = None,
383+
end_time: Annotated[
384+
typing.Optional[str],
385+
typer.Option(
386+
"--end-time",
387+
"-e",
388+
help="Retrieve functions starting before this timestamp (ISO 8601 format)",
389+
),
390+
] = None,
391+
status: Annotated[
392+
typing.Optional[str],
393+
typer.Option(
394+
"--status",
395+
"-S",
396+
help="Retrieve functions with this status (PENDING, SUCCESS, ERROR, RETRIES_EXCEEDED, ENQUEUED, or CANCELLED)",
397+
),
398+
] = None,
399+
queue_name: Annotated[
400+
typing.Optional[str],
401+
typer.Option(
402+
"--queue-name",
403+
"-q",
404+
help="Retrieve functions on this queue",
405+
),
406+
] = None,
407+
name: Annotated[
408+
typing.Optional[str],
409+
typer.Option(
410+
"--name",
411+
"-n",
412+
help="Retrieve functions on this queue",
413+
),
414+
] = None,
415+
request: Annotated[
416+
bool,
417+
typer.Option("--request", help="Retrieve workflow request information"),
418+
] = True,
419+
) -> None:
420+
config = load_config(silent=True)
421+
workflows = list_queued_workflows(
422+
config=config,
423+
limit=limit,
424+
start_time=start_time,
425+
end_time=end_time,
426+
queue_name=queue_name,
427+
status=status,
428+
request=request,
429+
name=name,
430+
)
431+
print(jsonpickle.encode(workflows, unpicklable=False))
432+
433+
366434
if __name__ == "__main__":
367435
app()

0 commit comments

Comments
 (0)