Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions dbos/admin-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,35 @@ import (
"time"
)

const (
HealthCheckPath = "/dbos-healthz"
WorkflowRecoveryPath = "/dbos-workflow-recovery"
WorkflowQueuesMetadataPath = "/dbos-workflow-queues-metadata"
)

type AdminServer struct {
server *http.Server
}

type QueueMetadata struct {
Name string `json:"name"`
Concurrency *int `json:"concurrency,omitempty"`
WorkerConcurrency *int `json:"workerConcurrency,omitempty"`
RateLimit *RateLimiter `json:"rateLimit,omitempty"`
}

func NewAdminServer(port int) *AdminServer {
mux := http.NewServeMux()

// Health endpoint
mux.HandleFunc("/dbos-healthz", func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc(HealthCheckPath, func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"status":"healthy"}`))
})

// Recovery endpoint
mux.HandleFunc("/dbos-workflow-recovery", func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc(WorkflowRecoveryPath, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
Expand Down Expand Up @@ -57,6 +70,34 @@ func NewAdminServer(port int) *AdminServer {
}
})

// Queue metadata endpoint
mux.HandleFunc(WorkflowQueuesMetadataPath, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

var queueMetadataArray []QueueMetadata

// Iterate through all queues in the registry
for _, queue := range workflowQueueRegistry {
queueMetadata := QueueMetadata{
Name: queue.Name,
WorkerConcurrency: queue.WorkerConcurrency,
Concurrency: queue.GlobalConcurrency,
RateLimit: queue.Limiter,
}

queueMetadataArray = append(queueMetadataArray, queueMetadata)
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(queueMetadataArray); err != nil {
getLogger().Error("Error encoding queue metadata response", "error", err)
}
})

server := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
Expand Down
55 changes: 50 additions & 5 deletions dbos/admin-server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestAdminServer(t *testing.T) {

// Verify admin server is not running
client := &http.Client{Timeout: 1 * time.Second}
_, err = client.Get("http://localhost:3001/dbos-healthz")
_, err = client.Get("http://localhost:3001" + HealthCheckPath)
if err == nil {
t.Error("Expected request to fail when admin server is not started, but it succeeded")
}
Expand Down Expand Up @@ -94,13 +94,13 @@ func TestAdminServer(t *testing.T) {
{
name: "Health endpoint responds correctly",
method: "GET",
endpoint: "http://localhost:3001/dbos-healthz",
endpoint: "http://localhost:3001" + HealthCheckPath,
expectedStatus: http.StatusOK,
},
{
name: "Recovery endpoint responds correctly with valid JSON",
method: "POST",
endpoint: "http://localhost:3001/dbos-workflow-recovery",
endpoint: "http://localhost:3001" + WorkflowRecoveryPath,
body: bytes.NewBuffer(mustMarshal([]string{"executor1", "executor2"})),
contentType: "application/json",
expectedStatus: http.StatusOK,
Expand All @@ -117,17 +117,62 @@ func TestAdminServer(t *testing.T) {
{
name: "Recovery endpoint rejects invalid methods",
method: "GET",
endpoint: "http://localhost:3001/dbos-workflow-recovery",
endpoint: "http://localhost:3001" + WorkflowRecoveryPath,
expectedStatus: http.StatusMethodNotAllowed,
},
{
name: "Recovery endpoint rejects invalid JSON",
method: "POST",
endpoint: "http://localhost:3001/dbos-workflow-recovery",
endpoint: "http://localhost:3001" + WorkflowRecoveryPath,
body: strings.NewReader(`{"invalid": json}`),
contentType: "application/json",
expectedStatus: http.StatusBadRequest,
},
{
name: "Queue metadata endpoint responds correctly",
method: "GET",
endpoint: "http://localhost:3001" + WorkflowQueuesMetadataPath,
expectedStatus: http.StatusOK,
validateResp: func(t *testing.T, resp *http.Response) {
var queueMetadata []QueueMetadata
if err := json.NewDecoder(resp.Body).Decode(&queueMetadata); err != nil {
t.Errorf("Failed to decode response as QueueMetadata array: %v", err)
}
if queueMetadata == nil {
t.Error("Expected non-nil queue metadata array")
}
// Should contain at least the internal queue
if len(queueMetadata) == 0 {
t.Error("Expected at least one queue in metadata")
}
// Verify internal queue fields
foundInternalQueue := false
for _, queue := range queueMetadata {
if queue.Name == _DBOS_INTERNAL_QUEUE_NAME { // Internal queue name
foundInternalQueue = true
if queue.Concurrency != nil {
t.Errorf("Expected internal queue to have no concurrency limit, but got %v", *queue.Concurrency)
}
if queue.WorkerConcurrency != nil {
t.Errorf("Expected internal queue to have no worker concurrency limit, but got %v", *queue.WorkerConcurrency)
}
if queue.RateLimit != nil {
t.Error("Expected internal queue to have no rate limit")
}
break
}
}
if !foundInternalQueue {
t.Error("Expected to find internal queue in metadata")
}
},
},
{
name: "Queue metadata endpoint rejects invalid methods",
method: "POST",
endpoint: "http://localhost:3001" + WorkflowQueuesMetadataPath,
expectedStatus: http.StatusMethodNotAllowed,
},
}

for _, tt := range tests {
Expand Down
Loading