Skip to content

Commit ee7eb93

Browse files
authored
Admin server (#31)
1 parent e9b0391 commit ee7eb93

File tree

3 files changed

+307
-4
lines changed

3 files changed

+307
-4
lines changed

dbos/admin-server.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package dbos
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"net/http"
8+
"time"
9+
)
10+
11+
type AdminServer struct {
12+
server *http.Server
13+
}
14+
15+
func NewAdminServer(port int) *AdminServer {
16+
mux := http.NewServeMux()
17+
18+
// Health endpoint
19+
mux.HandleFunc("/dbos-healthz", func(w http.ResponseWriter, r *http.Request) {
20+
w.Header().Set("Content-Type", "application/json")
21+
w.WriteHeader(http.StatusOK)
22+
w.Write([]byte(`{"status":"healthy"}`))
23+
})
24+
25+
// Recovery endpoint
26+
mux.HandleFunc("/dbos-workflow-recovery", func(w http.ResponseWriter, r *http.Request) {
27+
if r.Method != http.MethodPost {
28+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
29+
return
30+
}
31+
32+
var executorIDs []string
33+
if err := json.NewDecoder(r.Body).Decode(&executorIDs); err != nil {
34+
http.Error(w, "Invalid JSON body", http.StatusBadRequest)
35+
return
36+
}
37+
38+
getLogger().Info("Recovering workflows for executors", "executors", executorIDs)
39+
40+
handles, err := recoverPendingWorkflows(r.Context(), executorIDs)
41+
if err != nil {
42+
getLogger().Error("Error recovering workflows", "error", err)
43+
http.Error(w, fmt.Sprintf("Recovery failed: %v", err), http.StatusInternalServerError)
44+
return
45+
}
46+
47+
// Extract workflow IDs from handles
48+
workflowIDs := make([]string, len(handles))
49+
for i, handle := range handles {
50+
workflowIDs[i] = handle.GetWorkflowID()
51+
}
52+
53+
w.Header().Set("Content-Type", "application/json")
54+
w.WriteHeader(http.StatusOK)
55+
if err := json.NewEncoder(w).Encode(workflowIDs); err != nil {
56+
getLogger().Error("Error encoding response", "error", err)
57+
}
58+
})
59+
60+
server := &http.Server{
61+
Addr: fmt.Sprintf(":%d", port),
62+
Handler: mux,
63+
}
64+
65+
return &AdminServer{
66+
server: server,
67+
}
68+
}
69+
70+
func (as *AdminServer) Start() error {
71+
getLogger().Info("Starting admin server", "port", 3001)
72+
73+
go func() {
74+
if err := as.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
75+
getLogger().Error("Admin server error", "error", err)
76+
}
77+
}()
78+
79+
return nil
80+
}
81+
82+
func (as *AdminServer) Shutdown() error {
83+
getLogger().Info("Shutting down admin server")
84+
85+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
86+
defer cancel()
87+
88+
if err := as.server.Shutdown(ctx); err != nil {
89+
getLogger().Error("Admin server shutdown error", "error", err)
90+
return fmt.Errorf("failed to shutdown admin server: %w", err)
91+
}
92+
93+
getLogger().Info("Admin server shutdown complete")
94+
return nil
95+
}

dbos/admin-server_test.go

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package dbos
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"io"
7+
"net/http"
8+
"os"
9+
"strings"
10+
"testing"
11+
"time"
12+
)
13+
14+
func TestAdminServer(t *testing.T) {
15+
// Skip if database is not available
16+
databaseURL := os.Getenv("DBOS_DATABASE_URL")
17+
if databaseURL == "" && os.Getenv("PGPASSWORD") == "" {
18+
t.Skip("Database not available (DBOS_DATABASE_URL and PGPASSWORD not set), skipping DBOS integration tests")
19+
}
20+
21+
t.Run("Admin server is not started without WithAdminServer option", func(t *testing.T) {
22+
// Ensure clean state
23+
if dbos != nil {
24+
Shutdown()
25+
}
26+
27+
// Launch DBOS without admin server option
28+
err := Launch()
29+
if err != nil {
30+
t.Skipf("Failed to launch DBOS (database likely not available): %v", err)
31+
}
32+
33+
// Ensure cleanup
34+
defer Shutdown()
35+
36+
// Give time for any startup processes
37+
time.Sleep(100 * time.Millisecond)
38+
39+
// Verify admin server is not running
40+
client := &http.Client{Timeout: 1 * time.Second}
41+
_, err = client.Get("http://localhost:3001/dbos-healthz")
42+
if err == nil {
43+
t.Error("Expected request to fail when admin server is not started, but it succeeded")
44+
}
45+
46+
// Verify the DBOS executor doesn't have an admin server instance
47+
if dbos == nil {
48+
t.Fatal("Expected DBOS instance to be created")
49+
}
50+
51+
if dbos.adminServer != nil {
52+
t.Error("Expected admin server to be nil when not configured")
53+
}
54+
})
55+
56+
t.Run("Admin server endpoints", func(t *testing.T) {
57+
// Ensure clean state
58+
if dbos != nil {
59+
Shutdown()
60+
}
61+
62+
// Launch DBOS with admin server once for all endpoint tests
63+
err := Launch(WithAdminServer())
64+
if err != nil {
65+
t.Skipf("Failed to launch DBOS with admin server (database likely not available): %v", err)
66+
}
67+
68+
// Ensure cleanup
69+
defer Shutdown()
70+
71+
// Give the server a moment to start
72+
time.Sleep(100 * time.Millisecond)
73+
74+
// Verify the DBOS executor has an admin server instance
75+
if dbos == nil {
76+
t.Fatal("Expected DBOS instance to be created")
77+
}
78+
79+
if dbos.adminServer == nil {
80+
t.Fatal("Expected admin server to be created in DBOS instance")
81+
}
82+
83+
client := &http.Client{Timeout: 5 * time.Second}
84+
85+
tests := []struct {
86+
name string
87+
method string
88+
endpoint string
89+
body io.Reader
90+
contentType string
91+
expectedStatus int
92+
validateResp func(t *testing.T, resp *http.Response)
93+
}{
94+
{
95+
name: "Health endpoint responds correctly",
96+
method: "GET",
97+
endpoint: "http://localhost:3001/dbos-healthz",
98+
expectedStatus: http.StatusOK,
99+
},
100+
{
101+
name: "Recovery endpoint responds correctly with valid JSON",
102+
method: "POST",
103+
endpoint: "http://localhost:3001/dbos-workflow-recovery",
104+
body: bytes.NewBuffer(mustMarshal([]string{"executor1", "executor2"})),
105+
contentType: "application/json",
106+
expectedStatus: http.StatusOK,
107+
validateResp: func(t *testing.T, resp *http.Response) {
108+
var workflowIDs []string
109+
if err := json.NewDecoder(resp.Body).Decode(&workflowIDs); err != nil {
110+
t.Errorf("Failed to decode response as JSON array: %v", err)
111+
}
112+
if workflowIDs == nil {
113+
t.Error("Expected non-nil workflow IDs array")
114+
}
115+
},
116+
},
117+
{
118+
name: "Recovery endpoint rejects invalid methods",
119+
method: "GET",
120+
endpoint: "http://localhost:3001/dbos-workflow-recovery",
121+
expectedStatus: http.StatusMethodNotAllowed,
122+
},
123+
{
124+
name: "Recovery endpoint rejects invalid JSON",
125+
method: "POST",
126+
endpoint: "http://localhost:3001/dbos-workflow-recovery",
127+
body: strings.NewReader(`{"invalid": json}`),
128+
contentType: "application/json",
129+
expectedStatus: http.StatusBadRequest,
130+
},
131+
}
132+
133+
for _, tt := range tests {
134+
t.Run(tt.name, func(t *testing.T) {
135+
var req *http.Request
136+
var err error
137+
138+
if tt.body != nil {
139+
req, err = http.NewRequest(tt.method, tt.endpoint, tt.body)
140+
} else {
141+
req, err = http.NewRequest(tt.method, tt.endpoint, nil)
142+
}
143+
if err != nil {
144+
t.Fatalf("Failed to create request: %v", err)
145+
}
146+
147+
if tt.contentType != "" {
148+
req.Header.Set("Content-Type", tt.contentType)
149+
}
150+
151+
resp, err := client.Do(req)
152+
if err != nil {
153+
t.Fatalf("Failed to make request: %v", err)
154+
}
155+
defer resp.Body.Close()
156+
157+
if resp.StatusCode != tt.expectedStatus {
158+
body, _ := io.ReadAll(resp.Body)
159+
t.Errorf("Expected status code %d, got %d. Response: %s", tt.expectedStatus, resp.StatusCode, string(body))
160+
}
161+
162+
if tt.validateResp != nil {
163+
tt.validateResp(t, resp)
164+
}
165+
})
166+
}
167+
})
168+
}
169+
170+
func mustMarshal(v any) []byte {
171+
data, err := json.Marshal(v)
172+
if err != nil {
173+
panic(err)
174+
}
175+
return data
176+
}

dbos/dbos.go

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ import (
1616
)
1717

1818
var (
19-
APP_VERSION string
20-
EXECUTOR_ID string
21-
APP_ID string
19+
APP_VERSION string
20+
EXECUTOR_ID string
21+
APP_ID string
22+
DEFAULT_ADMIN_SERVER_PORT = 3001
2223
)
2324

2425
func computeApplicationVersion() string {
@@ -62,6 +63,7 @@ type executor struct {
6263
queueRunnerCtx context.Context
6364
queueRunnerCancelFunc context.CancelFunc
6465
queueRunnerDone chan struct{}
66+
adminServer *AdminServer
6567
}
6668

6769
var dbos *executor
@@ -88,7 +90,8 @@ func getLogger() *slog.Logger {
8890
}
8991

9092
type config struct {
91-
logger *slog.Logger
93+
logger *slog.Logger
94+
adminServer bool
9295
}
9396

9497
type LaunchOption func(*config)
@@ -99,6 +102,12 @@ func WithLogger(logger *slog.Logger) LaunchOption {
99102
}
100103
}
101104

105+
func WithAdminServer() LaunchOption {
106+
return func(config *config) {
107+
config.adminServer = true
108+
}
109+
}
110+
102111
func Launch(options ...LaunchOption) error {
103112
if dbos != nil {
104113
fmt.Println("warning: DBOS instance already initialized, skipping re-initialization")
@@ -138,6 +147,18 @@ func Launch(options ...LaunchOption) error {
138147

139148
systemDB.Launch(context.Background())
140149

150+
// Start the admin server if configured
151+
var adminServer *AdminServer
152+
if config.adminServer {
153+
adminServer = NewAdminServer(DEFAULT_ADMIN_SERVER_PORT)
154+
err := adminServer.Start()
155+
if err != nil {
156+
logger.Error("Failed to start admin server", "error", err)
157+
return NewInitializationError(fmt.Sprintf("failed to start admin server: %v", err))
158+
}
159+
logger.Info("Admin server started", "port", DEFAULT_ADMIN_SERVER_PORT)
160+
}
161+
141162
// Create context with cancel function for queue runner
142163
ctx, cancel := context.WithCancel(context.Background())
143164

@@ -146,6 +167,7 @@ func Launch(options ...LaunchOption) error {
146167
queueRunnerCtx: ctx,
147168
queueRunnerCancelFunc: cancel,
148169
queueRunnerDone: make(chan struct{}),
170+
adminServer: adminServer,
149171
}
150172

151173
// Start the queue runner in a goroutine
@@ -207,6 +229,16 @@ func Shutdown() {
207229
dbos.systemDB = nil
208230
}
209231

232+
if dbos.adminServer != nil {
233+
err := dbos.adminServer.Shutdown()
234+
if err != nil {
235+
getLogger().Error("Failed to shutdown admin server", "error", err)
236+
} else {
237+
getLogger().Info("Admin server shutdown complete")
238+
}
239+
dbos.adminServer = nil
240+
}
241+
210242
if logger != nil {
211243
logger = nil
212244
}

0 commit comments

Comments
 (0)