Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions dbos/admin-server.go → dbos/admin_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,34 @@ import (
)

const (
HealthCheckPath = "/dbos-healthz"
WorkflowRecoveryPath = "/dbos-workflow-recovery"
WorkflowQueuesMetadataPath = "/dbos-workflow-queues-metadata"
healthCheckPath = "/dbos-healthz"
workflowRecoveryPath = "/dbos-workflow-recovery"
workflowQueuesMetadataPath = "/dbos-workflow-queues-metadata"
)

type AdminServer struct {
type adminServer struct {
server *http.Server
}

type QueueMetadata struct {
type queueMetadata struct {
Name string `json:"name"`
Concurrency *int `json:"concurrency,omitempty"`
WorkerConcurrency *int `json:"workerConcurrency,omitempty"`
RateLimit *RateLimiter `json:"rateLimit,omitempty"`
}

func NewAdminServer(port int) *AdminServer {
func newAdminServer(port int) *adminServer {
mux := http.NewServeMux()

// Health endpoint
mux.HandleFunc(HealthCheckPath, func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc(healthCheckPath, func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"status":"healthy"}`))
})

// Recovery endpoint
mux.HandleFunc(WorkflowRecoveryPath, func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc(workflowRecoveryPath, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
Expand Down Expand Up @@ -71,21 +71,21 @@ func NewAdminServer(port int) *AdminServer {
})

// Queue metadata endpoint
mux.HandleFunc(WorkflowQueuesMetadataPath, func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc(workflowQueuesMetadataPath, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

var queueMetadataArray []QueueMetadata
var queueMetadataArray []queueMetadata

// Iterate through all queues in the registry
for _, queue := range workflowQueueRegistry {
queueMetadata := QueueMetadata{
Name: queue.Name,
WorkerConcurrency: queue.WorkerConcurrency,
Concurrency: queue.GlobalConcurrency,
RateLimit: queue.Limiter,
queueMetadata := queueMetadata{
Name: queue.name,
WorkerConcurrency: queue.workerConcurrency,
Concurrency: queue.globalConcurrency,
RateLimit: queue.limiter,
}

queueMetadataArray = append(queueMetadataArray, queueMetadata)
Expand All @@ -103,12 +103,12 @@ func NewAdminServer(port int) *AdminServer {
Handler: mux,
}

return &AdminServer{
return &adminServer{
server: server,
}
}

func (as *AdminServer) Start() error {
func (as *adminServer) Start() error {
getLogger().Info("Starting admin server", "port", 3001)

go func() {
Expand All @@ -120,7 +120,7 @@ func (as *AdminServer) Start() error {
return nil
}

func (as *AdminServer) Shutdown() error {
func (as *adminServer) Shutdown() error {
getLogger().Info("Shutting down admin server")

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down
66 changes: 37 additions & 29 deletions dbos/admin-server_test.go → dbos/admin_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,41 @@ import (
"encoding/json"
"io"
"net/http"
"os"
"strings"
"testing"
"time"
)

func TestAdminServer(t *testing.T) {
// Skip if database is not available
databaseURL := os.Getenv("DBOS_SYSTEM_DATABASE_URL")
if databaseURL == "" && os.Getenv("PGPASSWORD") == "" {
t.Skip("Database not available (DBOS_SYSTEM_DATABASE_URL and PGPASSWORD not set), skipping DBOS integration tests")
}
databaseURL := getDatabaseURL(t)

t.Run("Admin server is not started without WithAdminServer option", func(t *testing.T) {
t.Run("Admin server is not started by default", func(t *testing.T) {
// Ensure clean state
if dbos != nil {
Shutdown()
}
Shutdown()

// Launch DBOS without admin server option
err := Launch()
err := Initialize(Config{
DatabaseURL: databaseURL,
AppName: "test-app",
})
if err != nil {
t.Skipf("Failed to launch DBOS (database likely not available): %v", err)
t.Skipf("Failed to initialize DBOS: %v", err)
}
err = Launch()
if err != nil {
t.Skipf("Failed to initialize DBOS: %v", err)
}

// Ensure cleanup
defer Shutdown()
defer func() {
Shutdown()
}()

// Give time for any startup processes
time.Sleep(100 * time.Millisecond)

// Verify admin server is not running
client := &http.Client{Timeout: 1 * time.Second}
_, err = client.Get("http://localhost:3001" + HealthCheckPath)
_, err = client.Get("http://localhost:3001" + healthCheckPath)
if err == nil {
t.Error("Expected request to fail when admin server is not started, but it succeeded")
}
Expand All @@ -54,19 +55,26 @@ func TestAdminServer(t *testing.T) {
})

t.Run("Admin server endpoints", func(t *testing.T) {
// Ensure clean state
if dbos != nil {
Shutdown()
}
Shutdown()

// Launch DBOS with admin server once for all endpoint tests
err := Launch(WithAdminServer())
err := Initialize(Config{
DatabaseURL: databaseURL,
AppName: "test-app",
AdminServer: true,
})
if err != nil {
t.Skipf("Failed to launch DBOS with admin server (database likely not available): %v", err)
t.Skipf("Failed to initialize DBOS with admin server: %v", err)
}
err = Launch()
if err != nil {
t.Skipf("Failed to initialize DBOS with admin server: %v", err)
}

// Ensure cleanup
defer Shutdown()
defer func() {
Shutdown()
}()

// Give the server a moment to start
time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -94,13 +102,13 @@ func TestAdminServer(t *testing.T) {
{
name: "Health endpoint responds correctly",
method: "GET",
endpoint: "http://localhost:3001" + HealthCheckPath,
endpoint: "http://localhost:3001" + healthCheckPath,
expectedStatus: http.StatusOK,
},
{
name: "Recovery endpoint responds correctly with valid JSON",
method: "POST",
endpoint: "http://localhost:3001" + WorkflowRecoveryPath,
endpoint: "http://localhost:3001" + workflowRecoveryPath,
body: bytes.NewBuffer(mustMarshal([]string{"executor1", "executor2"})),
contentType: "application/json",
expectedStatus: http.StatusOK,
Expand All @@ -117,24 +125,24 @@ func TestAdminServer(t *testing.T) {
{
name: "Recovery endpoint rejects invalid methods",
method: "GET",
endpoint: "http://localhost:3001" + WorkflowRecoveryPath,
endpoint: "http://localhost:3001" + workflowRecoveryPath,
expectedStatus: http.StatusMethodNotAllowed,
},
{
name: "Recovery endpoint rejects invalid JSON",
method: "POST",
endpoint: "http://localhost:3001" + WorkflowRecoveryPath,
endpoint: "http://localhost:3001" + workflowRecoveryPath,
body: strings.NewReader(`{"invalid": json}`),
contentType: "application/json",
expectedStatus: http.StatusBadRequest,
},
{
name: "Queue metadata endpoint responds correctly",
method: "GET",
endpoint: "http://localhost:3001" + WorkflowQueuesMetadataPath,
endpoint: "http://localhost:3001" + workflowQueuesMetadataPath,
expectedStatus: http.StatusOK,
validateResp: func(t *testing.T, resp *http.Response) {
var queueMetadata []QueueMetadata
var queueMetadata []queueMetadata
if err := json.NewDecoder(resp.Body).Decode(&queueMetadata); err != nil {
t.Errorf("Failed to decode response as QueueMetadata array: %v", err)
}
Expand Down Expand Up @@ -170,7 +178,7 @@ func TestAdminServer(t *testing.T) {
{
name: "Queue metadata endpoint rejects invalid methods",
method: "POST",
endpoint: "http://localhost:3001" + WorkflowQueuesMetadataPath,
endpoint: "http://localhost:3001" + workflowQueuesMetadataPath,
expectedStatus: http.StatusMethodNotAllowed,
},
}
Expand Down
Loading
Loading