Skip to content
56 changes: 45 additions & 11 deletions dbos/admin_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ type adminServer struct {
isDeactivated atomic.Int32
}

// workflowStatusToUTC converts a WorkflowStatus to a map with all time fields in UTC
// toListWorkflowResponse converts a WorkflowStatus to a map with all time fields in UTC
// not super ergonomic but the DBOS console excepts unix timestamps
func workflowStatusToUTC(ws WorkflowStatus) map[string]any {
func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) {
result := map[string]any{
"WorkflowUUID": ws.ID,
"Status": ws.Status,
Expand Down Expand Up @@ -152,7 +152,23 @@ func workflowStatusToUTC(ws WorkflowStatus) map[string]any {
result["StartedAt"] = nil
}

return result
if ws.Input != nil && ws.Input != "" {
bytes, err := json.Marshal(ws.Input)
if err != nil {
return nil, fmt.Errorf("failed to marshal input: %w", err)
}
result["Input"] = string(bytes)
}

if ws.Output != nil && ws.Output != "" {
bytes, err := json.Marshal(ws.Output)
if err != nil {
return nil, fmt.Errorf("failed to marshal output: %w", err)
}
result["Output"] = string(bytes)
}

return result, nil
}

func newAdminServer(ctx *dbosContext, port int) *adminServer {
Expand Down Expand Up @@ -295,13 +311,18 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
}

// Transform to UTC before encoding
utcWorkflows := make([]map[string]any, len(workflows))
responseWorkflows := make([]map[string]any, len(workflows))
for i, wf := range workflows {
utcWorkflows[i] = workflowStatusToUTC(wf)
responseWorkflows[i], err = toListWorkflowResponse(wf)
if err != nil {
ctx.logger.Error("Error transforming workflow response", "error", err)
http.Error(w, fmt.Sprintf("Failed to format workflow response: %v", err), http.StatusInternalServerError)
return
}
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(utcWorkflows); err != nil {
if err := json.NewEncoder(w).Encode(responseWorkflows); err != nil {
ctx.logger.Error("Error encoding workflows response", "error", err)
http.Error(w, fmt.Sprintf("Failed to encode response: %v", err), http.StatusInternalServerError)
}
Expand All @@ -327,7 +348,12 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
}

// Return the first (and only) workflow, transformed to UTC
workflow := workflowStatusToUTC(workflows[0])
workflow, err := toListWorkflowResponse(workflows[0])
if err != nil {
ctx.logger.Error("Error transforming workflow response", "error", err)
http.Error(w, fmt.Sprintf("Failed to format workflow response: %v", err), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(workflow); err != nil {
Expand All @@ -346,7 +372,10 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
}
}

workflows, err := ListWorkflows(ctx, req.toListWorkflowsOptions()...)
req.Status = "" // We are not expecting a filter here but clear just in case
filters := req.toListWorkflowsOptions()
filters = append(filters, WithStatus([]WorkflowStatusType{WorkflowStatusEnqueued, WorkflowStatusPending}))
workflows, err := ListWorkflows(ctx, filters...)
if err != nil {
ctx.logger.Error("Failed to list queued workflows", "error", err)
http.Error(w, fmt.Sprintf("Failed to list queued workflows: %v", err), http.StatusInternalServerError)
Expand All @@ -365,13 +394,18 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
}

// Transform to UNIX timestamps before encoding
utcWorkflows := make([]map[string]any, len(workflows))
responseWorkflows := make([]map[string]any, len(workflows))
for i, wf := range workflows {
utcWorkflows[i] = workflowStatusToUTC(wf)
responseWorkflows[i], err = toListWorkflowResponse(wf)
if err != nil {
ctx.logger.Error("Error transforming workflow response", "error", err)
http.Error(w, fmt.Sprintf("Failed to format workflow response: %v", err), http.StatusInternalServerError)
return
}
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(utcWorkflows); err != nil {
if err := json.NewEncoder(w).Encode(responseWorkflows); err != nil {
ctx.logger.Error("Error encoding queued workflows response", "error", err)
http.Error(w, fmt.Sprintf("Failed to encode response: %v", err), http.StatusInternalServerError)
}
Expand Down
147 changes: 147 additions & 0 deletions dbos/admin_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,154 @@ func TestAdminServer(t *testing.T) {
}
})

t.Run("List workflows input/output values", func(t *testing.T) {
resetTestDatabase(t, databaseURL)
ctx, err := NewDBOSContext(Config{
DatabaseURL: databaseURL,
AppName: "test-app",
AdminServer: true,
})
require.NoError(t, err)

// Define a custom struct for testing
type TestStruct struct {
Name string `json:"name"`
Value int `json:"value"`
}

// Test workflow with int input/output
intWorkflow := func(dbosCtx DBOSContext, input int) (int, error) {
return input * 2, nil
}
RegisterWorkflow(ctx, intWorkflow)

// Test workflow with empty string input/output
emptyStringWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
return "", nil
}
RegisterWorkflow(ctx, emptyStringWorkflow)

// Test workflow with struct input/output
structWorkflow := func(dbosCtx DBOSContext, input TestStruct) (TestStruct, error) {
return TestStruct{Name: "output-" + input.Name, Value: input.Value * 2}, nil
}
RegisterWorkflow(ctx, structWorkflow)

err = ctx.Launch()
require.NoError(t, err)

// Ensure cleanup
defer func() {
if ctx != nil {
ctx.Cancel()
}
}()

// Give the server a moment to start
time.Sleep(100 * time.Millisecond)

client := &http.Client{Timeout: 5 * time.Second}
endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_WORKFLOWS_PATTERN, "POST /"))

// Create workflows with different input/output types
// 1. Integer workflow
intHandle, err := RunAsWorkflow(ctx, intWorkflow, 42)
require.NoError(t, err, "Failed to create int workflow")
intResult, err := intHandle.GetResult()
require.NoError(t, err, "Failed to get int workflow result")
assert.Equal(t, 84, intResult)

// 2. Empty string workflow
emptyStringHandle, err := RunAsWorkflow(ctx, emptyStringWorkflow, "")
require.NoError(t, err, "Failed to create empty string workflow")
emptyStringResult, err := emptyStringHandle.GetResult()
require.NoError(t, err, "Failed to get empty string workflow result")
assert.Equal(t, "", emptyStringResult)

// 3. Struct workflow
structInput := TestStruct{Name: "test", Value: 10}
structHandle, err := RunAsWorkflow(ctx, structWorkflow, structInput)
require.NoError(t, err, "Failed to create struct workflow")
structResult, err := structHandle.GetResult()
require.NoError(t, err, "Failed to get struct workflow result")
assert.Equal(t, TestStruct{Name: "output-test", Value: 20}, structResult)

// Query workflows with input/output loading enabled
// Filter by the workflow IDs we just created to avoid interference from other tests
reqBody := map[string]any{
"workflow_uuids": []string{
intHandle.GetWorkflowID(),
emptyStringHandle.GetWorkflowID(),
structHandle.GetWorkflowID(),
},
"load_input": true,
"load_output": true,
"limit": 10,
}
req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(mustMarshal(reqBody)))
require.NoError(t, err, "Failed to create request")
req.Header.Set("Content-Type", "application/json")

resp, err := client.Do(req)
require.NoError(t, err, "Failed to make request")
defer resp.Body.Close()

assert.Equal(t, http.StatusOK, resp.StatusCode)

var workflows []map[string]any
err = json.NewDecoder(resp.Body).Decode(&workflows)
require.NoError(t, err, "Failed to decode workflows response")

// Should have exactly 3 workflows
assert.Equal(t, 3, len(workflows), "Expected exactly 3 workflows")

// Verify each workflow's input/output marshaling
for _, wf := range workflows {
wfID := wf["WorkflowUUID"].(string)

// Check input and output fields exist and are strings (JSON marshaled)
if wfID == intHandle.GetWorkflowID() {
// Integer workflow: input and output should be marshaled as JSON strings
inputStr, ok := wf["Input"].(string)
require.True(t, ok, "Int workflow Input should be a string")
assert.Equal(t, "42", inputStr, "Int workflow input should be marshaled as '42'")

outputStr, ok := wf["Output"].(string)
require.True(t, ok, "Int workflow Output should be a string")
assert.Equal(t, "84", outputStr, "Int workflow output should be marshaled as '84'")

} else if wfID == emptyStringHandle.GetWorkflowID() {
// Empty string workflow: both input and output are empty strings
// According to the logic, empty strings should not have Input/Output fields
input, hasInput := wf["Input"]
require.Equal(t, "", input)
require.True(t, hasInput, "Empty string workflow should have Input field")

output, hasOutput := wf["Output"]
require.True(t, hasOutput, "Empty string workflow should have Output field")
require.Equal(t, "", output)

} else if wfID == structHandle.GetWorkflowID() {
// Struct workflow: input and output should be marshaled as JSON strings
inputStr, ok := wf["Input"].(string)
require.True(t, ok, "Struct workflow Input should be a string")
var inputStruct TestStruct
err = json.Unmarshal([]byte(inputStr), &inputStruct)
require.NoError(t, err, "Failed to unmarshal struct workflow input")
assert.Equal(t, structInput, inputStruct, "Struct workflow input should match")

outputStr, ok := wf["Output"].(string)
require.True(t, ok, "Struct workflow Output should be a string")
var outputStruct TestStruct
err = json.Unmarshal([]byte(outputStr), &outputStruct)
require.NoError(t, err, "Failed to unmarshal struct workflow output")
assert.Equal(t, TestStruct{Name: "output-test", Value: 20}, outputStruct, "Struct workflow output should match")
}
}
})

t.Run("List endpoints time filtering", func(t *testing.T) {
resetTestDatabase(t, databaseURL)
ctx, err := NewDBOSContext(Config{
DatabaseURL: databaseURL,
AppName: "test-app",
Expand Down
2 changes: 1 addition & 1 deletion dbos/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestEnqueue(t *testing.T) {

// After first workflow completes, we should be able to enqueue with same deduplication ID
handle5, err := Enqueue[wfInput, string](clientCtx, queue.Name, "ServerWorkflow", wfInput{Input: "test-input"},
WithEnqueueWorkflowID(wfid2), // Reuse the workflow ID that failed before
WithEnqueueWorkflowID(wfid2), // Reuse the workflow ID that failed before
WithEnqueueDeduplicationID(dedupID), // Same deduplication ID as first workflow
WithEnqueueApplicationVersion(serverCtx.GetApplicationVersion()))
require.NoError(t, err, "failed to enqueue workflow with same dedup ID after completion")
Expand Down
16 changes: 8 additions & 8 deletions dbos/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,44 +36,44 @@ type WorkflowQueue struct {
MaxTasksPerIteration int `json:"maxTasksPerIteration"` // Max workflows to dequeue per iteration
}

// queueOption is a functional option for configuring a workflow queue
type queueOption func(*WorkflowQueue)
// QueueOption is a functional option for configuring a workflow queue
type QueueOption func(*WorkflowQueue)

// WithWorkerConcurrency limits the number of workflows this executor can run concurrently from the queue.
// This provides per-executor concurrency control.
func WithWorkerConcurrency(concurrency int) queueOption {
func WithWorkerConcurrency(concurrency int) QueueOption {
return func(q *WorkflowQueue) {
q.WorkerConcurrency = &concurrency
}
}

// WithGlobalConcurrency limits the total number of workflows that can run concurrently from the queue
// across all executors. This provides global concurrency control.
func WithGlobalConcurrency(concurrency int) queueOption {
func WithGlobalConcurrency(concurrency int) QueueOption {
return func(q *WorkflowQueue) {
q.GlobalConcurrency = &concurrency
}
}

// WithPriorityEnabled enables priority-based scheduling for the queue.
// When enabled, workflows with lower priority numbers are executed first.
func WithPriorityEnabled(enabled bool) queueOption {
func WithPriorityEnabled(enabled bool) QueueOption {
return func(q *WorkflowQueue) {
q.PriorityEnabled = enabled
}
}

// WithRateLimiter configures rate limiting for the queue to prevent overwhelming external services.
// The rate limiter enforces a maximum number of workflow starts within a time period.
func WithRateLimiter(limiter *RateLimiter) queueOption {
func WithRateLimiter(limiter *RateLimiter) QueueOption {
return func(q *WorkflowQueue) {
q.RateLimit = limiter
}
}

// WithMaxTasksPerIteration sets the maximum number of workflows to dequeue in a single iteration.
// This controls batch sizes for queue processing.
func WithMaxTasksPerIteration(maxTasks int) queueOption {
func WithMaxTasksPerIteration(maxTasks int) QueueOption {
return func(q *WorkflowQueue) {
q.MaxTasksPerIteration = maxTasks
}
Expand All @@ -96,7 +96,7 @@ func WithMaxTasksPerIteration(maxTasks int) queueOption {
//
// // Enqueue workflows to this queue:
// handle, err := dbos.RunAsWorkflow(ctx, SendEmailWorkflow, emailData, dbos.WithQueue("email-queue"))
func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...queueOption) WorkflowQueue {
func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...QueueOption) WorkflowQueue {
ctx, ok := dbosCtx.(*dbosContext)
if !ok {
return WorkflowQueue{} // Do nothing if the concrete type is not dbosContext
Expand Down
4 changes: 2 additions & 2 deletions dbos/serialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func TestSetEventSerialize(t *testing.T) {
assert.Equal(t, "user-defined-event-set", result)

// Retrieve the event to verify it was properly serialized and can be deserialized
retrievedEvent, err := GetEvent[UserDefinedEventData](executor, setHandle.GetWorkflowID(), "user-defined-key", 3 * time.Second)
retrievedEvent, err := GetEvent[UserDefinedEventData](executor, setHandle.GetWorkflowID(), "user-defined-key", 3*time.Second)
require.NoError(t, err)

// Verify the retrieved data matches what we set
Expand Down Expand Up @@ -273,7 +273,7 @@ func sendUserDefinedTypeWorkflow(ctx DBOSContext, destinationID string) (string,

func recvUserDefinedTypeWorkflow(ctx DBOSContext, input string) (UserDefinedEventData, error) {
// Receive the user-defined type message
result, err := Recv[UserDefinedEventData](ctx, "user-defined-topic", 3 * time.Second)
result, err := Recv[UserDefinedEventData](ctx, "user-defined-topic", 3*time.Second)
return result, err
}

Expand Down
8 changes: 4 additions & 4 deletions dbos/system_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,11 +421,11 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt
}

// Every time we start executing a workflow (and thus attempt to insert its status), we increment `recovery_attempts` by 1.
// When this number becomes equal to `maxRetries + 1`, we mark the workflow as `RETRIES_EXCEEDED`.
// When this number becomes equal to `maxRetries + 1`, we mark the workflow as `MAX_RECOVERY_ATTEMPTS_EXCEEDED`.
if result.status != WorkflowStatusSuccess && result.status != WorkflowStatusError &&
input.maxRetries > 0 && result.attempts > input.maxRetries+1 {

// Update workflow status to RETRIES_EXCEEDED and clear queue-related fields
// Update workflow status to MAX_RECOVERY_ATTEMPTS_EXCEEDED and clear queue-related fields
dlqQuery := `UPDATE dbos.workflow_status
SET status = $1, deduplication_id = NULL, started_at_epoch_ms = NULL, queue_name = NULL
WHERE workflow_uuid = $2 AND status = $3`
Expand All @@ -436,12 +436,12 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt
WorkflowStatusPending)

if err != nil {
return nil, fmt.Errorf("failed to update workflow to RETRIES_EXCEEDED: %w", err)
return nil, fmt.Errorf("failed to update workflow to %s: %w", WorkflowStatusRetriesExceeded, err)
}

// Commit the transaction before throwing the error
if err := input.tx.Commit(ctx); err != nil {
return nil, fmt.Errorf("failed to commit transaction after marking workflow as RETRIES_EXCEEDED: %w", err)
return nil, fmt.Errorf("failed to commit transaction after marking workflow as %s: %w", WorkflowStatusRetriesExceeded, err)
}

return nil, newDeadLetterQueueError(input.status.ID, input.maxRetries)
Expand Down
Loading
Loading