Skip to content
56 changes: 45 additions & 11 deletions dbos/admin_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ type adminServer struct {
isDeactivated atomic.Int32
}

// workflowStatusToUTC converts a WorkflowStatus to a map with all time fields in UTC
// toListWorkflowResponse converts a WorkflowStatus to a map with all time fields in UTC
// not super ergonomic but the DBOS console excepts unix timestamps
func workflowStatusToUTC(ws WorkflowStatus) map[string]any {
func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) {
result := map[string]any{
"WorkflowUUID": ws.ID,
"Status": ws.Status,
Expand Down Expand Up @@ -152,7 +152,23 @@ func workflowStatusToUTC(ws WorkflowStatus) map[string]any {
result["StartedAt"] = nil
}

return result
if ws.Input != nil && ws.Input != "" {
bytes, err := json.Marshal(ws.Input)
if err != nil {
return nil, fmt.Errorf("failed to marshal input: %w", err)
}
result["Input"] = string(bytes)
}

if ws.Output != nil && ws.Output != "" {
bytes, err := json.Marshal(ws.Output)
if err != nil {
return nil, fmt.Errorf("failed to marshal output: %w", err)
}
result["Output"] = string(bytes)
}

return result, nil
}

func newAdminServer(ctx *dbosContext, port int) *adminServer {
Expand Down Expand Up @@ -295,13 +311,18 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
}

// Transform to UTC before encoding
utcWorkflows := make([]map[string]any, len(workflows))
responseWorkflows := make([]map[string]any, len(workflows))
for i, wf := range workflows {
utcWorkflows[i] = workflowStatusToUTC(wf)
responseWorkflows[i], err = toListWorkflowResponse(wf)
if err != nil {
ctx.logger.Error("Error transforming workflow response", "error", err)
http.Error(w, fmt.Sprintf("Failed to format workflow response: %v", err), http.StatusInternalServerError)
return
}
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(utcWorkflows); err != nil {
if err := json.NewEncoder(w).Encode(responseWorkflows); err != nil {
ctx.logger.Error("Error encoding workflows response", "error", err)
http.Error(w, fmt.Sprintf("Failed to encode response: %v", err), http.StatusInternalServerError)
}
Expand All @@ -327,7 +348,12 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
}

// Return the first (and only) workflow, transformed to UTC
workflow := workflowStatusToUTC(workflows[0])
workflow, err := toListWorkflowResponse(workflows[0])
if err != nil {
ctx.logger.Error("Error transforming workflow response", "error", err)
http.Error(w, fmt.Sprintf("Failed to format workflow response: %v", err), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(workflow); err != nil {
Expand All @@ -346,7 +372,10 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
}
}

workflows, err := ListWorkflows(ctx, req.toListWorkflowsOptions()...)
req.Status = "" // We are not expecting a filter here but clear just in case
filters := req.toListWorkflowsOptions()
filters = append(filters, WithStatus([]WorkflowStatusType{WorkflowStatusEnqueued, WorkflowStatusPending}))
workflows, err := ListWorkflows(ctx, filters...)
if err != nil {
ctx.logger.Error("Failed to list queued workflows", "error", err)
http.Error(w, fmt.Sprintf("Failed to list queued workflows: %v", err), http.StatusInternalServerError)
Expand All @@ -365,13 +394,18 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
}

// Transform to UNIX timestamps before encoding
utcWorkflows := make([]map[string]any, len(workflows))
responseWorkflows := make([]map[string]any, len(workflows))
for i, wf := range workflows {
utcWorkflows[i] = workflowStatusToUTC(wf)
responseWorkflows[i], err = toListWorkflowResponse(wf)
if err != nil {
ctx.logger.Error("Error transforming workflow response", "error", err)
http.Error(w, fmt.Sprintf("Failed to format workflow response: %v", err), http.StatusInternalServerError)
return
}
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(utcWorkflows); err != nil {
if err := json.NewEncoder(w).Encode(responseWorkflows); err != nil {
ctx.logger.Error("Error encoding queued workflows response", "error", err)
http.Error(w, fmt.Sprintf("Failed to encode response: %v", err), http.StatusInternalServerError)
}
Expand Down
149 changes: 147 additions & 2 deletions dbos/admin_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,154 @@ func TestAdminServer(t *testing.T) {
}
})

t.Run("List workflows input/output values", func(t *testing.T) {
resetTestDatabase(t, databaseURL)
ctx, err := NewDBOSContext(Config{
DatabaseURL: databaseURL,
AppName: "test-app",
AdminServer: true,
})
require.NoError(t, err)

// Define a custom struct for testing
type TestStruct struct {
Name string `json:"name"`
Value int `json:"value"`
}

// Test workflow with int input/output
intWorkflow := func(dbosCtx DBOSContext, input int) (int, error) {
return input * 2, nil
}
RegisterWorkflow(ctx, intWorkflow)

// Test workflow with empty string input/output
emptyStringWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
return "", nil
}
RegisterWorkflow(ctx, emptyStringWorkflow)

// Test workflow with struct input/output
structWorkflow := func(dbosCtx DBOSContext, input TestStruct) (TestStruct, error) {
return TestStruct{Name: "output-" + input.Name, Value: input.Value * 2}, nil
}
RegisterWorkflow(ctx, structWorkflow)

err = ctx.Launch()
require.NoError(t, err)

// Ensure cleanup
defer func() {
if ctx != nil {
ctx.Cancel()
}
}()

// Give the server a moment to start
time.Sleep(100 * time.Millisecond)

client := &http.Client{Timeout: 5 * time.Second}
endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_WORKFLOWS_PATTERN, "POST /"))

// Create workflows with different input/output types
// 1. Integer workflow
intHandle, err := RunAsWorkflow(ctx, intWorkflow, 42)
require.NoError(t, err, "Failed to create int workflow")
intResult, err := intHandle.GetResult()
require.NoError(t, err, "Failed to get int workflow result")
assert.Equal(t, 84, intResult)

// 2. Empty string workflow
emptyStringHandle, err := RunAsWorkflow(ctx, emptyStringWorkflow, "")
require.NoError(t, err, "Failed to create empty string workflow")
emptyStringResult, err := emptyStringHandle.GetResult()
require.NoError(t, err, "Failed to get empty string workflow result")
assert.Equal(t, "", emptyStringResult)

// 3. Struct workflow
structInput := TestStruct{Name: "test", Value: 10}
structHandle, err := RunAsWorkflow(ctx, structWorkflow, structInput)
require.NoError(t, err, "Failed to create struct workflow")
structResult, err := structHandle.GetResult()
require.NoError(t, err, "Failed to get struct workflow result")
assert.Equal(t, TestStruct{Name: "output-test", Value: 20}, structResult)

// Query workflows with input/output loading enabled
// Filter by the workflow IDs we just created to avoid interference from other tests
reqBody := map[string]any{
"workflow_uuids": []string{
intHandle.GetWorkflowID(),
emptyStringHandle.GetWorkflowID(),
structHandle.GetWorkflowID(),
},
"load_input": true,
"load_output": true,
"limit": 10,
}
req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(mustMarshal(reqBody)))
require.NoError(t, err, "Failed to create request")
req.Header.Set("Content-Type", "application/json")

resp, err := client.Do(req)
require.NoError(t, err, "Failed to make request")
defer resp.Body.Close()

assert.Equal(t, http.StatusOK, resp.StatusCode)

var workflows []map[string]any
err = json.NewDecoder(resp.Body).Decode(&workflows)
require.NoError(t, err, "Failed to decode workflows response")

// Should have exactly 3 workflows
assert.Equal(t, 3, len(workflows), "Expected exactly 3 workflows")

// Verify each workflow's input/output marshaling
for _, wf := range workflows {
wfID := wf["WorkflowUUID"].(string)

// Check input and output fields exist and are strings (JSON marshaled)
if wfID == intHandle.GetWorkflowID() {
// Integer workflow: input and output should be marshaled as JSON strings
inputStr, ok := wf["Input"].(string)
require.True(t, ok, "Int workflow Input should be a string")
assert.Equal(t, "42", inputStr, "Int workflow input should be marshaled as '42'")

outputStr, ok := wf["Output"].(string)
require.True(t, ok, "Int workflow Output should be a string")
assert.Equal(t, "84", outputStr, "Int workflow output should be marshaled as '84'")

} else if wfID == emptyStringHandle.GetWorkflowID() {
// Empty string workflow: both input and output are empty strings
// According to the logic, empty strings should not have Input/Output fields
input, hasInput := wf["Input"]
require.Equal(t, "", input)
require.True(t, hasInput, "Empty string workflow should have Input field")

output, hasOutput := wf["Output"]
require.True(t, hasOutput, "Empty string workflow should have Output field")
require.Equal(t, "", output)

} else if wfID == structHandle.GetWorkflowID() {
// Struct workflow: input and output should be marshaled as JSON strings
inputStr, ok := wf["Input"].(string)
require.True(t, ok, "Struct workflow Input should be a string")
var inputStruct TestStruct
err = json.Unmarshal([]byte(inputStr), &inputStruct)
require.NoError(t, err, "Failed to unmarshal struct workflow input")
assert.Equal(t, structInput, inputStruct, "Struct workflow input should match")

outputStr, ok := wf["Output"].(string)
require.True(t, ok, "Struct workflow Output should be a string")
var outputStruct TestStruct
err = json.Unmarshal([]byte(outputStr), &outputStruct)
require.NoError(t, err, "Failed to unmarshal struct workflow output")
assert.Equal(t, TestStruct{Name: "output-test", Value: 20}, outputStruct, "Struct workflow output should match")
}
}
})

t.Run("List endpoints time filtering", func(t *testing.T) {
resetTestDatabase(t, databaseURL)
ctx, err := NewDBOSContext(Config{
DatabaseURL: databaseURL,
AppName: "test-app",
Expand Down Expand Up @@ -308,7 +455,6 @@ func TestAdminServer(t *testing.T) {
"start_time": timeBetween.Format(time.RFC3339Nano),
"limit": 10,
}
fmt.Println("Request body 2:", reqBody2, "timebetween", timeBetween.UnixMilli())
req2, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(mustMarshal(reqBody2)))
require.NoError(t, err, "Failed to create request 2")
req2.Header.Set("Content-Type", "application/json")
Expand All @@ -322,7 +468,6 @@ func TestAdminServer(t *testing.T) {
var workflows2 []map[string]any
err = json.NewDecoder(resp2.Body).Decode(&workflows2)
require.NoError(t, err, "Failed to decode workflows response 2")
fmt.Println(workflows2)

// Should have exactly 1 workflow (the second one)
assert.Equal(t, 1, len(workflows2), "Expected exactly 1 workflow with start_time after timeBetween")
Expand Down
2 changes: 1 addition & 1 deletion dbos/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestEnqueue(t *testing.T) {

// After first workflow completes, we should be able to enqueue with same deduplication ID
handle5, err := Enqueue[wfInput, string](clientCtx, queue.Name, "ServerWorkflow", wfInput{Input: "test-input"},
WithEnqueueWorkflowID(wfid2), // Reuse the workflow ID that failed before
WithEnqueueWorkflowID(wfid2), // Reuse the workflow ID that failed before
WithEnqueueDeduplicationID(dedupID), // Same deduplication ID as first workflow
WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion()))
require.NoError(t, err, "failed to enqueue workflow with same dedup ID after completion")
Expand Down
16 changes: 8 additions & 8 deletions dbos/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,44 +36,44 @@ type WorkflowQueue struct {
MaxTasksPerIteration int `json:"maxTasksPerIteration"` // Max workflows to dequeue per iteration
}

// queueOption is a functional option for configuring a workflow queue
type queueOption func(*WorkflowQueue)
// QueueOption is a functional option for configuring a workflow queue
type QueueOption func(*WorkflowQueue)

// WithWorkerConcurrency limits the number of workflows this executor can run concurrently from the queue.
// This provides per-executor concurrency control.
func WithWorkerConcurrency(concurrency int) queueOption {
func WithWorkerConcurrency(concurrency int) QueueOption {
return func(q *WorkflowQueue) {
q.WorkerConcurrency = &concurrency
}
}

// WithGlobalConcurrency limits the total number of workflows that can run concurrently from the queue
// across all executors. This provides global concurrency control.
func WithGlobalConcurrency(concurrency int) queueOption {
func WithGlobalConcurrency(concurrency int) QueueOption {
return func(q *WorkflowQueue) {
q.GlobalConcurrency = &concurrency
}
}

// WithPriorityEnabled enables priority-based scheduling for the queue.
// When enabled, workflows with lower priority numbers are executed first.
func WithPriorityEnabled(enabled bool) queueOption {
func WithPriorityEnabled(enabled bool) QueueOption {
return func(q *WorkflowQueue) {
q.PriorityEnabled = enabled
}
}

// WithRateLimiter configures rate limiting for the queue to prevent overwhelming external services.
// The rate limiter enforces a maximum number of workflow starts within a time period.
func WithRateLimiter(limiter *RateLimiter) queueOption {
func WithRateLimiter(limiter *RateLimiter) QueueOption {
return func(q *WorkflowQueue) {
q.RateLimit = limiter
}
}

// WithMaxTasksPerIteration sets the maximum number of workflows to dequeue in a single iteration.
// This controls batch sizes for queue processing.
func WithMaxTasksPerIteration(maxTasks int) queueOption {
func WithMaxTasksPerIteration(maxTasks int) QueueOption {
return func(q *WorkflowQueue) {
q.MaxTasksPerIteration = maxTasks
}
Expand All @@ -96,7 +96,7 @@ func WithMaxTasksPerIteration(maxTasks int) queueOption {
//
// // Enqueue workflows to this queue:
// handle, err := dbos.RunAsWorkflow(ctx, SendEmailWorkflow, emailData, dbos.WithQueue("email-queue"))
func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...queueOption) WorkflowQueue {
func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...QueueOption) WorkflowQueue {
ctx, ok := dbosCtx.(*dbosContext)
if !ok {
return WorkflowQueue{} // Do nothing if the concrete type is not dbosContext
Expand Down
4 changes: 2 additions & 2 deletions dbos/queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,13 @@ func TestWorkflowQueues(t *testing.T) {
for {
dlqStatus, err := dlqHandle[0].GetStatus()
require.NoError(t, err, "failed to get status of DLQ workflow handle")
if dlqStatus.Status != WorkflowStatusRetriesExceeded && retries < 10 {
if dlqStatus.Status != WorkflowStatusMaxRecoveryAttemptsExceeded && retries < 10 {
time.Sleep(1 * time.Second) // Wait a bit before checking again
retries++
continue
}
require.NoError(t, err, "failed to get status of DLQ workflow handle")
assert.Equal(t, WorkflowStatusRetriesExceeded, dlqStatus.Status, "expected workflow to be in DLQ after max retries exceeded")
assert.Equal(t, WorkflowStatusMaxRecoveryAttemptsExceeded, dlqStatus.Status, "expected workflow to be in DLQ after max retries exceeded")
handles = append(handles, dlqHandle[0])
break
}
Expand Down
4 changes: 2 additions & 2 deletions dbos/serialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func TestSetEventSerialize(t *testing.T) {
assert.Equal(t, "user-defined-event-set", result)

// Retrieve the event to verify it was properly serialized and can be deserialized
retrievedEvent, err := GetEvent[UserDefinedEventData](executor, setHandle.GetWorkflowID(), "user-defined-key", 3 * time.Second)
retrievedEvent, err := GetEvent[UserDefinedEventData](executor, setHandle.GetWorkflowID(), "user-defined-key", 3*time.Second)
require.NoError(t, err)

// Verify the retrieved data matches what we set
Expand Down Expand Up @@ -273,7 +273,7 @@ func sendUserDefinedTypeWorkflow(ctx DBOSContext, destinationID string) (string,

func recvUserDefinedTypeWorkflow(ctx DBOSContext, input string) (UserDefinedEventData, error) {
// Receive the user-defined type message
result, err := Recv[UserDefinedEventData](ctx, "user-defined-topic", 3 * time.Second)
result, err := Recv[UserDefinedEventData](ctx, "user-defined-topic", 3*time.Second)
return result, err
}

Expand Down
Loading
Loading