Skip to content

Commit 31f29c0

Browse files
authored
More tests and admin server fixes (#69)
- `QueueOptions` made public - More comprehensive recovery test (recovers more workflows and check steps status) - Check that steps return a cancelled error after the workflow was cancelled - Admin server: - Always "stringify" non nil workflow output/input - Filter out queue tasks by status (pending/enqueued) on the list queues endpoint - Have `RetrieveWorkflow` return a handle interface
1 parent e96da88 commit 31f29c0

File tree

10 files changed

+449
-93
lines changed

10 files changed

+449
-93
lines changed

dbos/admin_server.go

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,9 @@ type adminServer struct {
105105
isDeactivated atomic.Int32
106106
}
107107

108-
// workflowStatusToUTC converts a WorkflowStatus to a map with all time fields in UTC
108+
// toListWorkflowResponse converts a WorkflowStatus to a map with all time fields in UTC
109109
// not super ergonomic but the DBOS console excepts unix timestamps
110-
func workflowStatusToUTC(ws WorkflowStatus) map[string]any {
110+
func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) {
111111
result := map[string]any{
112112
"WorkflowUUID": ws.ID,
113113
"Status": ws.Status,
@@ -152,7 +152,23 @@ func workflowStatusToUTC(ws WorkflowStatus) map[string]any {
152152
result["StartedAt"] = nil
153153
}
154154

155-
return result
155+
if ws.Input != nil && ws.Input != "" {
156+
bytes, err := json.Marshal(ws.Input)
157+
if err != nil {
158+
return nil, fmt.Errorf("failed to marshal input: %w", err)
159+
}
160+
result["Input"] = string(bytes)
161+
}
162+
163+
if ws.Output != nil && ws.Output != "" {
164+
bytes, err := json.Marshal(ws.Output)
165+
if err != nil {
166+
return nil, fmt.Errorf("failed to marshal output: %w", err)
167+
}
168+
result["Output"] = string(bytes)
169+
}
170+
171+
return result, nil
156172
}
157173

158174
func newAdminServer(ctx *dbosContext, port int) *adminServer {
@@ -295,13 +311,18 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
295311
}
296312

297313
// Transform to UTC before encoding
298-
utcWorkflows := make([]map[string]any, len(workflows))
314+
responseWorkflows := make([]map[string]any, len(workflows))
299315
for i, wf := range workflows {
300-
utcWorkflows[i] = workflowStatusToUTC(wf)
316+
responseWorkflows[i], err = toListWorkflowResponse(wf)
317+
if err != nil {
318+
ctx.logger.Error("Error transforming workflow response", "error", err)
319+
http.Error(w, fmt.Sprintf("Failed to format workflow response: %v", err), http.StatusInternalServerError)
320+
return
321+
}
301322
}
302323

303324
w.Header().Set("Content-Type", "application/json")
304-
if err := json.NewEncoder(w).Encode(utcWorkflows); err != nil {
325+
if err := json.NewEncoder(w).Encode(responseWorkflows); err != nil {
305326
ctx.logger.Error("Error encoding workflows response", "error", err)
306327
http.Error(w, fmt.Sprintf("Failed to encode response: %v", err), http.StatusInternalServerError)
307328
}
@@ -327,7 +348,12 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
327348
}
328349

329350
// Return the first (and only) workflow, transformed to UTC
330-
workflow := workflowStatusToUTC(workflows[0])
351+
workflow, err := toListWorkflowResponse(workflows[0])
352+
if err != nil {
353+
ctx.logger.Error("Error transforming workflow response", "error", err)
354+
http.Error(w, fmt.Sprintf("Failed to format workflow response: %v", err), http.StatusInternalServerError)
355+
return
356+
}
331357

332358
w.Header().Set("Content-Type", "application/json")
333359
if err := json.NewEncoder(w).Encode(workflow); err != nil {
@@ -346,7 +372,10 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
346372
}
347373
}
348374

349-
workflows, err := ListWorkflows(ctx, req.toListWorkflowsOptions()...)
375+
req.Status = "" // We are not expecting a filter here but clear just in case
376+
filters := req.toListWorkflowsOptions()
377+
filters = append(filters, WithStatus([]WorkflowStatusType{WorkflowStatusEnqueued, WorkflowStatusPending}))
378+
workflows, err := ListWorkflows(ctx, filters...)
350379
if err != nil {
351380
ctx.logger.Error("Failed to list queued workflows", "error", err)
352381
http.Error(w, fmt.Sprintf("Failed to list queued workflows: %v", err), http.StatusInternalServerError)
@@ -365,13 +394,18 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
365394
}
366395

367396
// Transform to UNIX timestamps before encoding
368-
utcWorkflows := make([]map[string]any, len(workflows))
397+
responseWorkflows := make([]map[string]any, len(workflows))
369398
for i, wf := range workflows {
370-
utcWorkflows[i] = workflowStatusToUTC(wf)
399+
responseWorkflows[i], err = toListWorkflowResponse(wf)
400+
if err != nil {
401+
ctx.logger.Error("Error transforming workflow response", "error", err)
402+
http.Error(w, fmt.Sprintf("Failed to format workflow response: %v", err), http.StatusInternalServerError)
403+
return
404+
}
371405
}
372406

373407
w.Header().Set("Content-Type", "application/json")
374-
if err := json.NewEncoder(w).Encode(utcWorkflows); err != nil {
408+
if err := json.NewEncoder(w).Encode(responseWorkflows); err != nil {
375409
ctx.logger.Error("Error encoding queued workflows response", "error", err)
376410
http.Error(w, fmt.Sprintf("Failed to encode response: %v", err), http.StatusInternalServerError)
377411
}

dbos/admin_server_test.go

Lines changed: 147 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,154 @@ func TestAdminServer(t *testing.T) {
213213
}
214214
})
215215

216+
t.Run("List workflows input/output values", func(t *testing.T) {
217+
resetTestDatabase(t, databaseURL)
218+
ctx, err := NewDBOSContext(Config{
219+
DatabaseURL: databaseURL,
220+
AppName: "test-app",
221+
AdminServer: true,
222+
})
223+
require.NoError(t, err)
224+
225+
// Define a custom struct for testing
226+
type TestStruct struct {
227+
Name string `json:"name"`
228+
Value int `json:"value"`
229+
}
230+
231+
// Test workflow with int input/output
232+
intWorkflow := func(dbosCtx DBOSContext, input int) (int, error) {
233+
return input * 2, nil
234+
}
235+
RegisterWorkflow(ctx, intWorkflow)
236+
237+
// Test workflow with empty string input/output
238+
emptyStringWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
239+
return "", nil
240+
}
241+
RegisterWorkflow(ctx, emptyStringWorkflow)
242+
243+
// Test workflow with struct input/output
244+
structWorkflow := func(dbosCtx DBOSContext, input TestStruct) (TestStruct, error) {
245+
return TestStruct{Name: "output-" + input.Name, Value: input.Value * 2}, nil
246+
}
247+
RegisterWorkflow(ctx, structWorkflow)
248+
249+
err = ctx.Launch()
250+
require.NoError(t, err)
251+
252+
// Ensure cleanup
253+
defer func() {
254+
if ctx != nil {
255+
ctx.Cancel()
256+
}
257+
}()
258+
259+
// Give the server a moment to start
260+
time.Sleep(100 * time.Millisecond)
261+
262+
client := &http.Client{Timeout: 5 * time.Second}
263+
endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_WORKFLOWS_PATTERN, "POST /"))
264+
265+
// Create workflows with different input/output types
266+
// 1. Integer workflow
267+
intHandle, err := RunAsWorkflow(ctx, intWorkflow, 42)
268+
require.NoError(t, err, "Failed to create int workflow")
269+
intResult, err := intHandle.GetResult()
270+
require.NoError(t, err, "Failed to get int workflow result")
271+
assert.Equal(t, 84, intResult)
272+
273+
// 2. Empty string workflow
274+
emptyStringHandle, err := RunAsWorkflow(ctx, emptyStringWorkflow, "")
275+
require.NoError(t, err, "Failed to create empty string workflow")
276+
emptyStringResult, err := emptyStringHandle.GetResult()
277+
require.NoError(t, err, "Failed to get empty string workflow result")
278+
assert.Equal(t, "", emptyStringResult)
279+
280+
// 3. Struct workflow
281+
structInput := TestStruct{Name: "test", Value: 10}
282+
structHandle, err := RunAsWorkflow(ctx, structWorkflow, structInput)
283+
require.NoError(t, err, "Failed to create struct workflow")
284+
structResult, err := structHandle.GetResult()
285+
require.NoError(t, err, "Failed to get struct workflow result")
286+
assert.Equal(t, TestStruct{Name: "output-test", Value: 20}, structResult)
287+
288+
// Query workflows with input/output loading enabled
289+
// Filter by the workflow IDs we just created to avoid interference from other tests
290+
reqBody := map[string]any{
291+
"workflow_uuids": []string{
292+
intHandle.GetWorkflowID(),
293+
emptyStringHandle.GetWorkflowID(),
294+
structHandle.GetWorkflowID(),
295+
},
296+
"load_input": true,
297+
"load_output": true,
298+
"limit": 10,
299+
}
300+
req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(mustMarshal(reqBody)))
301+
require.NoError(t, err, "Failed to create request")
302+
req.Header.Set("Content-Type", "application/json")
303+
304+
resp, err := client.Do(req)
305+
require.NoError(t, err, "Failed to make request")
306+
defer resp.Body.Close()
307+
308+
assert.Equal(t, http.StatusOK, resp.StatusCode)
309+
310+
var workflows []map[string]any
311+
err = json.NewDecoder(resp.Body).Decode(&workflows)
312+
require.NoError(t, err, "Failed to decode workflows response")
313+
314+
// Should have exactly 3 workflows
315+
assert.Equal(t, 3, len(workflows), "Expected exactly 3 workflows")
316+
317+
// Verify each workflow's input/output marshaling
318+
for _, wf := range workflows {
319+
wfID := wf["WorkflowUUID"].(string)
320+
321+
// Check input and output fields exist and are strings (JSON marshaled)
322+
if wfID == intHandle.GetWorkflowID() {
323+
// Integer workflow: input and output should be marshaled as JSON strings
324+
inputStr, ok := wf["Input"].(string)
325+
require.True(t, ok, "Int workflow Input should be a string")
326+
assert.Equal(t, "42", inputStr, "Int workflow input should be marshaled as '42'")
327+
328+
outputStr, ok := wf["Output"].(string)
329+
require.True(t, ok, "Int workflow Output should be a string")
330+
assert.Equal(t, "84", outputStr, "Int workflow output should be marshaled as '84'")
331+
332+
} else if wfID == emptyStringHandle.GetWorkflowID() {
333+
// Empty string workflow: both input and output are empty strings
334+
// According to the logic, empty strings should not have Input/Output fields
335+
input, hasInput := wf["Input"]
336+
require.Equal(t, "", input)
337+
require.True(t, hasInput, "Empty string workflow should have Input field")
338+
339+
output, hasOutput := wf["Output"]
340+
require.True(t, hasOutput, "Empty string workflow should have Output field")
341+
require.Equal(t, "", output)
342+
343+
} else if wfID == structHandle.GetWorkflowID() {
344+
// Struct workflow: input and output should be marshaled as JSON strings
345+
inputStr, ok := wf["Input"].(string)
346+
require.True(t, ok, "Struct workflow Input should be a string")
347+
var inputStruct TestStruct
348+
err = json.Unmarshal([]byte(inputStr), &inputStruct)
349+
require.NoError(t, err, "Failed to unmarshal struct workflow input")
350+
assert.Equal(t, structInput, inputStruct, "Struct workflow input should match")
351+
352+
outputStr, ok := wf["Output"].(string)
353+
require.True(t, ok, "Struct workflow Output should be a string")
354+
var outputStruct TestStruct
355+
err = json.Unmarshal([]byte(outputStr), &outputStruct)
356+
require.NoError(t, err, "Failed to unmarshal struct workflow output")
357+
assert.Equal(t, TestStruct{Name: "output-test", Value: 20}, outputStruct, "Struct workflow output should match")
358+
}
359+
}
360+
})
361+
216362
t.Run("List endpoints time filtering", func(t *testing.T) {
363+
resetTestDatabase(t, databaseURL)
217364
ctx, err := NewDBOSContext(Config{
218365
DatabaseURL: databaseURL,
219366
AppName: "test-app",
@@ -308,7 +455,6 @@ func TestAdminServer(t *testing.T) {
308455
"start_time": timeBetween.Format(time.RFC3339Nano),
309456
"limit": 10,
310457
}
311-
fmt.Println("Request body 2:", reqBody2, "timebetween", timeBetween.UnixMilli())
312458
req2, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(mustMarshal(reqBody2)))
313459
require.NoError(t, err, "Failed to create request 2")
314460
req2.Header.Set("Content-Type", "application/json")
@@ -322,7 +468,6 @@ func TestAdminServer(t *testing.T) {
322468
var workflows2 []map[string]any
323469
err = json.NewDecoder(resp2.Body).Decode(&workflows2)
324470
require.NoError(t, err, "Failed to decode workflows response 2")
325-
fmt.Println(workflows2)
326471

327472
// Should have exactly 1 workflow (the second one)
328473
assert.Equal(t, 1, len(workflows2), "Expected exactly 1 workflow with start_time after timeBetween")

dbos/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func TestEnqueue(t *testing.T) {
235235

236236
// After first workflow completes, we should be able to enqueue with same deduplication ID
237237
handle5, err := Enqueue[wfInput, string](clientCtx, queue.Name, "ServerWorkflow", wfInput{Input: "test-input"},
238-
WithEnqueueWorkflowID(wfid2), // Reuse the workflow ID that failed before
238+
WithEnqueueWorkflowID(wfid2), // Reuse the workflow ID that failed before
239239
WithEnqueueDeduplicationID(dedupID), // Same deduplication ID as first workflow
240240
WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion()))
241241
require.NoError(t, err, "failed to enqueue workflow with same dedup ID after completion")

dbos/queue.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,44 +36,44 @@ type WorkflowQueue struct {
3636
MaxTasksPerIteration int `json:"maxTasksPerIteration"` // Max workflows to dequeue per iteration
3737
}
3838

39-
// queueOption is a functional option for configuring a workflow queue
40-
type queueOption func(*WorkflowQueue)
39+
// QueueOption is a functional option for configuring a workflow queue
40+
type QueueOption func(*WorkflowQueue)
4141

4242
// WithWorkerConcurrency limits the number of workflows this executor can run concurrently from the queue.
4343
// This provides per-executor concurrency control.
44-
func WithWorkerConcurrency(concurrency int) queueOption {
44+
func WithWorkerConcurrency(concurrency int) QueueOption {
4545
return func(q *WorkflowQueue) {
4646
q.WorkerConcurrency = &concurrency
4747
}
4848
}
4949

5050
// WithGlobalConcurrency limits the total number of workflows that can run concurrently from the queue
5151
// across all executors. This provides global concurrency control.
52-
func WithGlobalConcurrency(concurrency int) queueOption {
52+
func WithGlobalConcurrency(concurrency int) QueueOption {
5353
return func(q *WorkflowQueue) {
5454
q.GlobalConcurrency = &concurrency
5555
}
5656
}
5757

5858
// WithPriorityEnabled enables priority-based scheduling for the queue.
5959
// When enabled, workflows with lower priority numbers are executed first.
60-
func WithPriorityEnabled(enabled bool) queueOption {
60+
func WithPriorityEnabled(enabled bool) QueueOption {
6161
return func(q *WorkflowQueue) {
6262
q.PriorityEnabled = enabled
6363
}
6464
}
6565

6666
// WithRateLimiter configures rate limiting for the queue to prevent overwhelming external services.
6767
// The rate limiter enforces a maximum number of workflow starts within a time period.
68-
func WithRateLimiter(limiter *RateLimiter) queueOption {
68+
func WithRateLimiter(limiter *RateLimiter) QueueOption {
6969
return func(q *WorkflowQueue) {
7070
q.RateLimit = limiter
7171
}
7272
}
7373

7474
// WithMaxTasksPerIteration sets the maximum number of workflows to dequeue in a single iteration.
7575
// This controls batch sizes for queue processing.
76-
func WithMaxTasksPerIteration(maxTasks int) queueOption {
76+
func WithMaxTasksPerIteration(maxTasks int) QueueOption {
7777
return func(q *WorkflowQueue) {
7878
q.MaxTasksPerIteration = maxTasks
7979
}
@@ -96,7 +96,7 @@ func WithMaxTasksPerIteration(maxTasks int) queueOption {
9696
//
9797
// // Enqueue workflows to this queue:
9898
// handle, err := dbos.RunAsWorkflow(ctx, SendEmailWorkflow, emailData, dbos.WithQueue("email-queue"))
99-
func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...queueOption) WorkflowQueue {
99+
func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...QueueOption) WorkflowQueue {
100100
ctx, ok := dbosCtx.(*dbosContext)
101101
if !ok {
102102
return WorkflowQueue{} // Do nothing if the concrete type is not dbosContext

dbos/queues_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,13 +233,13 @@ func TestWorkflowQueues(t *testing.T) {
233233
for {
234234
dlqStatus, err := dlqHandle[0].GetStatus()
235235
require.NoError(t, err, "failed to get status of DLQ workflow handle")
236-
if dlqStatus.Status != WorkflowStatusRetriesExceeded && retries < 10 {
236+
if dlqStatus.Status != WorkflowStatusMaxRecoveryAttemptsExceeded && retries < 10 {
237237
time.Sleep(1 * time.Second) // Wait a bit before checking again
238238
retries++
239239
continue
240240
}
241241
require.NoError(t, err, "failed to get status of DLQ workflow handle")
242-
assert.Equal(t, WorkflowStatusRetriesExceeded, dlqStatus.Status, "expected workflow to be in DLQ after max retries exceeded")
242+
assert.Equal(t, WorkflowStatusMaxRecoveryAttemptsExceeded, dlqStatus.Status, "expected workflow to be in DLQ after max retries exceeded")
243243
handles = append(handles, dlqHandle[0])
244244
break
245245
}

dbos/serialization_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func TestSetEventSerialize(t *testing.T) {
236236
assert.Equal(t, "user-defined-event-set", result)
237237

238238
// Retrieve the event to verify it was properly serialized and can be deserialized
239-
retrievedEvent, err := GetEvent[UserDefinedEventData](executor, setHandle.GetWorkflowID(), "user-defined-key", 3 * time.Second)
239+
retrievedEvent, err := GetEvent[UserDefinedEventData](executor, setHandle.GetWorkflowID(), "user-defined-key", 3*time.Second)
240240
require.NoError(t, err)
241241

242242
// Verify the retrieved data matches what we set
@@ -273,7 +273,7 @@ func sendUserDefinedTypeWorkflow(ctx DBOSContext, destinationID string) (string,
273273

274274
func recvUserDefinedTypeWorkflow(ctx DBOSContext, input string) (UserDefinedEventData, error) {
275275
// Receive the user-defined type message
276-
result, err := Recv[UserDefinedEventData](ctx, "user-defined-topic", 3 * time.Second)
276+
result, err := Recv[UserDefinedEventData](ctx, "user-defined-topic", 3*time.Second)
277277
return result, err
278278
}
279279

0 commit comments

Comments
 (0)