Skip to content

Commit 98f25de

Browse files
committed
add recovery endpoint
1 parent d4684c6 commit 98f25de

File tree

2 files changed

+154
-33
lines changed

2 files changed

+154
-33
lines changed

dbos/admin-server.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package dbos
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"net/http"
78
"time"
@@ -21,6 +22,41 @@ func NewAdminServer(port int) *AdminServer {
2122
w.Write([]byte(`{"status":"healthy"}`))
2223
})
2324

25+
// Recovery endpoint
26+
mux.HandleFunc("/dbos-workflow-recovery", func(w http.ResponseWriter, r *http.Request) {
27+
if r.Method != http.MethodPost {
28+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
29+
return
30+
}
31+
32+
var executorIDs []string
33+
if err := json.NewDecoder(r.Body).Decode(&executorIDs); err != nil {
34+
http.Error(w, "Invalid JSON body", http.StatusBadRequest)
35+
return
36+
}
37+
38+
getLogger().Info("Recovering workflows for executors", "executors", executorIDs)
39+
40+
handles, err := recoverPendingWorkflows(r.Context(), executorIDs)
41+
if err != nil {
42+
getLogger().Error("Error recovering workflows", "error", err)
43+
http.Error(w, fmt.Sprintf("Recovery failed: %v", err), http.StatusInternalServerError)
44+
return
45+
}
46+
47+
// Extract workflow IDs from handles
48+
workflowIDs := make([]string, len(handles))
49+
for i, handle := range handles {
50+
workflowIDs[i] = handle.GetWorkflowID()
51+
}
52+
53+
w.Header().Set("Content-Type", "application/json")
54+
w.WriteHeader(http.StatusOK)
55+
if err := json.NewEncoder(w).Encode(workflowIDs); err != nil {
56+
getLogger().Error("Error encoding response", "error", err)
57+
}
58+
})
59+
2460
server := &http.Server{
2561
Addr: fmt.Sprintf(":%d", port),
2662
Handler: mux,

dbos/admin-server_test.go

Lines changed: 118 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package dbos
22

33
import (
4+
"bytes"
5+
"encoding/json"
6+
"io"
47
"net/http"
58
"os"
9+
"strings"
610
"testing"
711
"time"
812
)
@@ -14,78 +18,159 @@ func TestAdminServer(t *testing.T) {
1418
t.Skip("Database not available (DBOS_DATABASE_URL and PGPASSWORD not set), skipping DBOS integration tests")
1519
}
1620

17-
t.Run("Admin server starts through DBOS Launch with WithAdminServer", func(t *testing.T) {
21+
t.Run("Admin server is not started without WithAdminServer option", func(t *testing.T) {
1822
// Ensure clean state
1923
if dbos != nil {
2024
Shutdown()
2125
}
2226

23-
// Launch DBOS with admin server
24-
err := Launch(WithAdminServer())
27+
// Launch DBOS without admin server option
28+
err := Launch()
2529
if err != nil {
26-
t.Skipf("Failed to launch DBOS with admin server (database likely not available): %v", err)
30+
t.Skipf("Failed to launch DBOS (database likely not available): %v", err)
2731
}
2832

2933
// Ensure cleanup
3034
defer Shutdown()
3135

32-
// Give the server a moment to start
36+
// Give time for any startup processes
3337
time.Sleep(100 * time.Millisecond)
3438

35-
// Verify admin server is running by making a request
36-
client := &http.Client{Timeout: 5 * time.Second}
37-
resp, err := client.Get("http://localhost:3001/dbos-healthz")
38-
if err != nil {
39-
t.Fatalf("Failed to make request to admin server: %v", err)
40-
}
41-
defer resp.Body.Close()
42-
43-
if resp.StatusCode != http.StatusOK {
44-
t.Errorf("Expected status code 200, got %d", resp.StatusCode)
39+
// Verify admin server is not running
40+
client := &http.Client{Timeout: 1 * time.Second}
41+
_, err = client.Get("http://localhost:3001/dbos-healthz")
42+
if err == nil {
43+
t.Error("Expected request to fail when admin server is not started, but it succeeded")
4544
}
4645

47-
// Verify the DBOS executor has an admin server instance
46+
// Verify the DBOS executor doesn't have an admin server instance
4847
if dbos == nil {
4948
t.Fatal("Expected DBOS instance to be created")
5049
}
5150

52-
if dbos.adminServer == nil {
53-
t.Fatal("Expected admin server to be created in DBOS instance")
51+
if dbos.adminServer != nil {
52+
t.Error("Expected admin server to be nil when not configured")
5453
}
5554
})
5655

57-
t.Run("Admin server is not started without WithAdminServer option", func(t *testing.T) {
56+
t.Run("Admin server endpoints", func(t *testing.T) {
5857
// Ensure clean state
5958
if dbos != nil {
6059
Shutdown()
6160
}
6261

63-
// Launch DBOS without admin server option
64-
err := Launch()
62+
// Launch DBOS with admin server once for all endpoint tests
63+
err := Launch(WithAdminServer())
6564
if err != nil {
66-
t.Skipf("Failed to launch DBOS (database likely not available): %v", err)
65+
t.Skipf("Failed to launch DBOS with admin server (database likely not available): %v", err)
6766
}
6867

6968
// Ensure cleanup
7069
defer Shutdown()
7170

72-
// Give time for any startup processes
71+
// Give the server a moment to start
7372
time.Sleep(100 * time.Millisecond)
7473

75-
// Verify admin server is not running
76-
client := &http.Client{Timeout: 1 * time.Second}
77-
_, err = client.Get("http://localhost:3001/dbos-healthz")
78-
if err == nil {
79-
t.Error("Expected request to fail when admin server is not started, but it succeeded")
80-
}
81-
82-
// Verify the DBOS executor doesn't have an admin server instance
74+
// Verify the DBOS executor has an admin server instance
8375
if dbos == nil {
8476
t.Fatal("Expected DBOS instance to be created")
8577
}
8678

87-
if dbos.adminServer != nil {
88-
t.Error("Expected admin server to be nil when not configured")
79+
if dbos.adminServer == nil {
80+
t.Fatal("Expected admin server to be created in DBOS instance")
81+
}
82+
83+
client := &http.Client{Timeout: 5 * time.Second}
84+
85+
tests := []struct {
86+
name string
87+
method string
88+
endpoint string
89+
body io.Reader
90+
contentType string
91+
expectedStatus int
92+
validateResp func(t *testing.T, resp *http.Response)
93+
}{
94+
{
95+
name: "Health endpoint responds correctly",
96+
method: "GET",
97+
endpoint: "http://localhost:3001/dbos-healthz",
98+
expectedStatus: http.StatusOK,
99+
},
100+
{
101+
name: "Recovery endpoint responds correctly with valid JSON",
102+
method: "POST",
103+
endpoint: "http://localhost:3001/dbos-workflow-recovery",
104+
body: bytes.NewBuffer(mustMarshal([]string{"executor1", "executor2"})),
105+
contentType: "application/json",
106+
expectedStatus: http.StatusOK,
107+
validateResp: func(t *testing.T, resp *http.Response) {
108+
var workflowIDs []string
109+
if err := json.NewDecoder(resp.Body).Decode(&workflowIDs); err != nil {
110+
t.Errorf("Failed to decode response as JSON array: %v", err)
111+
}
112+
if workflowIDs == nil {
113+
t.Error("Expected non-nil workflow IDs array")
114+
}
115+
},
116+
},
117+
{
118+
name: "Recovery endpoint rejects invalid methods",
119+
method: "GET",
120+
endpoint: "http://localhost:3001/dbos-workflow-recovery",
121+
expectedStatus: http.StatusMethodNotAllowed,
122+
},
123+
{
124+
name: "Recovery endpoint rejects invalid JSON",
125+
method: "POST",
126+
endpoint: "http://localhost:3001/dbos-workflow-recovery",
127+
body: strings.NewReader(`{"invalid": json}`),
128+
contentType: "application/json",
129+
expectedStatus: http.StatusBadRequest,
130+
},
131+
}
132+
133+
for _, tt := range tests {
134+
t.Run(tt.name, func(t *testing.T) {
135+
var req *http.Request
136+
var err error
137+
138+
if tt.body != nil {
139+
req, err = http.NewRequest(tt.method, tt.endpoint, tt.body)
140+
} else {
141+
req, err = http.NewRequest(tt.method, tt.endpoint, nil)
142+
}
143+
if err != nil {
144+
t.Fatalf("Failed to create request: %v", err)
145+
}
146+
147+
if tt.contentType != "" {
148+
req.Header.Set("Content-Type", tt.contentType)
149+
}
150+
151+
resp, err := client.Do(req)
152+
if err != nil {
153+
t.Fatalf("Failed to make request: %v", err)
154+
}
155+
defer resp.Body.Close()
156+
157+
if resp.StatusCode != tt.expectedStatus {
158+
body, _ := io.ReadAll(resp.Body)
159+
t.Errorf("Expected status code %d, got %d. Response: %s", tt.expectedStatus, resp.StatusCode, string(body))
160+
}
161+
162+
if tt.validateResp != nil {
163+
tt.validateResp(t, resp)
164+
}
165+
})
89166
}
90167
})
91168
}
169+
170+
func mustMarshal(v any) []byte {
171+
data, err := json.Marshal(v)
172+
if err != nil {
173+
panic(err)
174+
}
175+
return data
176+
}

0 commit comments

Comments
 (0)