diff --git a/dbos/_admin_server.py b/dbos/_admin_server.py index 5af128fe2..a2d5afc6c 100644 --- a/dbos/_admin_server.py +++ b/dbos/_admin_server.py @@ -16,6 +16,7 @@ _health_check_path = "/dbos-healthz" _workflow_recovery_path = "/dbos-workflow-recovery" _deactivate_path = "/deactivate" +_workflow_queues_metadata_path = "/dbos-workflow-queues-metadata" # /workflows/:workflow_id/cancel # /workflows/:workflow_id/resume # /workflows/:workflow_id/restart @@ -64,6 +65,26 @@ def do_GET(self) -> None: self.send_response(200) self._end_headers() self.wfile.write("deactivated".encode("utf-8")) + elif self.path == _workflow_queues_metadata_path: + queue_metadata_array = [] + from ._dbos import _get_or_create_dbos_registry + + registry = _get_or_create_dbos_registry() + for queue in registry.queue_info_map.values(): + queue_metadata = { + "name": queue.name, + "concurrency": queue.concurrency, + "workerConcurrency": queue.worker_concurrency, + "rateLimit": queue.limiter, + } + # Remove keys with None values + queue_metadata = { + k: v for k, v in queue_metadata.items() if v is not None + } + queue_metadata_array.append(queue_metadata) + self.send_response(200) + self._end_headers() + self.wfile.write(json.dumps(queue_metadata_array).encode("utf-8")) else: self.send_response(404) self._end_headers() diff --git a/tests/test_admin_server.py b/tests/test_admin_server.py index 7fd4e5fc7..8e541e4d8 100644 --- a/tests/test_admin_server.py +++ b/tests/test_admin_server.py @@ -5,7 +5,7 @@ import requests # Public API -from dbos import DBOS, ConfigFile, SetWorkflowID, _workflow_commands +from dbos import DBOS, ConfigFile, Queue, SetWorkflowID, _workflow_commands def test_admin_endpoints(dbos: DBOS) -> None: @@ -23,6 +23,27 @@ def test_admin_endpoints(dbos: DBOS) -> None: assert response.status_code == 200 assert response.json() == [] + # Test GET /dbos-workflow-queues-metadata + Queue("q1") + Queue("q2", concurrency=1) + Queue("q3", concurrency=1, worker_concurrency=1) + Queue("q4", concurrency=1, worker_concurrency=1, limiter={"limit": 0, "period": 0}) + response = requests.get( + "http://localhost:3001/dbos-workflow-queues-metadata", timeout=5 + ) + assert response.status_code == 200 + assert response.json() == [ + {"name": "q1"}, + {"name": "q2", "concurrency": 1}, + {"name": "q3", "concurrency": 1, "workerConcurrency": 1}, + { + "name": "q4", + "concurrency": 1, + "workerConcurrency": 1, + "rateLimit": {"limit": 0, "period": 0}, + }, + ] + # Test GET not found response = requests.get("http://localhost:3001/stuff", timeout=5) assert response.status_code == 404