Skip to content

Commit 3511b6a

Browse files
authored
Admin server queue metadata endpoint (#34)
1 parent 8b55202 commit 3511b6a

File tree

2 files changed

+93
-7
lines changed

2 files changed

+93
-7
lines changed

dbos/admin-server.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,35 @@ import (
88
"time"
99
)
1010

11+
const (
12+
HealthCheckPath = "/dbos-healthz"
13+
WorkflowRecoveryPath = "/dbos-workflow-recovery"
14+
WorkflowQueuesMetadataPath = "/dbos-workflow-queues-metadata"
15+
)
16+
1117
type AdminServer struct {
1218
server *http.Server
1319
}
1420

21+
type QueueMetadata struct {
22+
Name string `json:"name"`
23+
Concurrency *int `json:"concurrency,omitempty"`
24+
WorkerConcurrency *int `json:"workerConcurrency,omitempty"`
25+
RateLimit *RateLimiter `json:"rateLimit,omitempty"`
26+
}
27+
1528
func NewAdminServer(port int) *AdminServer {
1629
mux := http.NewServeMux()
1730

1831
// Health endpoint
19-
mux.HandleFunc("/dbos-healthz", func(w http.ResponseWriter, r *http.Request) {
32+
mux.HandleFunc(HealthCheckPath, func(w http.ResponseWriter, r *http.Request) {
2033
w.Header().Set("Content-Type", "application/json")
2134
w.WriteHeader(http.StatusOK)
2235
w.Write([]byte(`{"status":"healthy"}`))
2336
})
2437

2538
// Recovery endpoint
26-
mux.HandleFunc("/dbos-workflow-recovery", func(w http.ResponseWriter, r *http.Request) {
39+
mux.HandleFunc(WorkflowRecoveryPath, func(w http.ResponseWriter, r *http.Request) {
2740
if r.Method != http.MethodPost {
2841
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
2942
return
@@ -57,6 +70,34 @@ func NewAdminServer(port int) *AdminServer {
5770
}
5871
})
5972

73+
// Queue metadata endpoint
74+
mux.HandleFunc(WorkflowQueuesMetadataPath, func(w http.ResponseWriter, r *http.Request) {
75+
if r.Method != http.MethodGet {
76+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
77+
return
78+
}
79+
80+
var queueMetadataArray []QueueMetadata
81+
82+
// Iterate through all queues in the registry
83+
for _, queue := range workflowQueueRegistry {
84+
queueMetadata := QueueMetadata{
85+
Name: queue.Name,
86+
WorkerConcurrency: queue.WorkerConcurrency,
87+
Concurrency: queue.GlobalConcurrency,
88+
RateLimit: queue.Limiter,
89+
}
90+
91+
queueMetadataArray = append(queueMetadataArray, queueMetadata)
92+
}
93+
94+
w.Header().Set("Content-Type", "application/json")
95+
w.WriteHeader(http.StatusOK)
96+
if err := json.NewEncoder(w).Encode(queueMetadataArray); err != nil {
97+
getLogger().Error("Error encoding queue metadata response", "error", err)
98+
}
99+
})
100+
60101
server := &http.Server{
61102
Addr: fmt.Sprintf(":%d", port),
62103
Handler: mux,

dbos/admin-server_test.go

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func TestAdminServer(t *testing.T) {
3838

3939
// Verify admin server is not running
4040
client := &http.Client{Timeout: 1 * time.Second}
41-
_, err = client.Get("http://localhost:3001/dbos-healthz")
41+
_, err = client.Get("http://localhost:3001" + HealthCheckPath)
4242
if err == nil {
4343
t.Error("Expected request to fail when admin server is not started, but it succeeded")
4444
}
@@ -94,13 +94,13 @@ func TestAdminServer(t *testing.T) {
9494
{
9595
name: "Health endpoint responds correctly",
9696
method: "GET",
97-
endpoint: "http://localhost:3001/dbos-healthz",
97+
endpoint: "http://localhost:3001" + HealthCheckPath,
9898
expectedStatus: http.StatusOK,
9999
},
100100
{
101101
name: "Recovery endpoint responds correctly with valid JSON",
102102
method: "POST",
103-
endpoint: "http://localhost:3001/dbos-workflow-recovery",
103+
endpoint: "http://localhost:3001" + WorkflowRecoveryPath,
104104
body: bytes.NewBuffer(mustMarshal([]string{"executor1", "executor2"})),
105105
contentType: "application/json",
106106
expectedStatus: http.StatusOK,
@@ -117,17 +117,62 @@ func TestAdminServer(t *testing.T) {
117117
{
118118
name: "Recovery endpoint rejects invalid methods",
119119
method: "GET",
120-
endpoint: "http://localhost:3001/dbos-workflow-recovery",
120+
endpoint: "http://localhost:3001" + WorkflowRecoveryPath,
121121
expectedStatus: http.StatusMethodNotAllowed,
122122
},
123123
{
124124
name: "Recovery endpoint rejects invalid JSON",
125125
method: "POST",
126-
endpoint: "http://localhost:3001/dbos-workflow-recovery",
126+
endpoint: "http://localhost:3001" + WorkflowRecoveryPath,
127127
body: strings.NewReader(`{"invalid": json}`),
128128
contentType: "application/json",
129129
expectedStatus: http.StatusBadRequest,
130130
},
131+
{
132+
name: "Queue metadata endpoint responds correctly",
133+
method: "GET",
134+
endpoint: "http://localhost:3001" + WorkflowQueuesMetadataPath,
135+
expectedStatus: http.StatusOK,
136+
validateResp: func(t *testing.T, resp *http.Response) {
137+
var queueMetadata []QueueMetadata
138+
if err := json.NewDecoder(resp.Body).Decode(&queueMetadata); err != nil {
139+
t.Errorf("Failed to decode response as QueueMetadata array: %v", err)
140+
}
141+
if queueMetadata == nil {
142+
t.Error("Expected non-nil queue metadata array")
143+
}
144+
// Should contain at least the internal queue
145+
if len(queueMetadata) == 0 {
146+
t.Error("Expected at least one queue in metadata")
147+
}
148+
// Verify internal queue fields
149+
foundInternalQueue := false
150+
for _, queue := range queueMetadata {
151+
if queue.Name == _DBOS_INTERNAL_QUEUE_NAME { // Internal queue name
152+
foundInternalQueue = true
153+
if queue.Concurrency != nil {
154+
t.Errorf("Expected internal queue to have no concurrency limit, but got %v", *queue.Concurrency)
155+
}
156+
if queue.WorkerConcurrency != nil {
157+
t.Errorf("Expected internal queue to have no worker concurrency limit, but got %v", *queue.WorkerConcurrency)
158+
}
159+
if queue.RateLimit != nil {
160+
t.Error("Expected internal queue to have no rate limit")
161+
}
162+
break
163+
}
164+
}
165+
if !foundInternalQueue {
166+
t.Error("Expected to find internal queue in metadata")
167+
}
168+
},
169+
},
170+
{
171+
name: "Queue metadata endpoint rejects invalid methods",
172+
method: "POST",
173+
endpoint: "http://localhost:3001" + WorkflowQueuesMetadataPath,
174+
expectedStatus: http.StatusMethodNotAllowed,
175+
},
131176
}
132177

133178
for _, tt := range tests {

0 commit comments

Comments
 (0)