Skip to content

Commit 649f830

Browse files
authored
Merge branch 'main' into feat/register-workflowhandle
2 parents e5ecdff + c54b9e7 commit 649f830

File tree

6 files changed

+329
-14
lines changed

6 files changed

+329
-14
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,11 @@ func main() {
8282
dbos.RegisterWorkflow(ctx, workflow)
8383

8484
// Launch DBOS
85-
err = ctx.Launch()
85+
err = dbos.Launch(ctx)
8686
if err != nil {
8787
panic(err)
8888
}
89-
defer ctx.Shutdown(2 * time.Second)
89+
defer dbos.Shutdown(ctx, 2 * time.Second)
9090

9191
// Run a durable workflow and get its result
9292
handle, err := dbos.RunWorkflow(ctx, workflow, "")
@@ -157,11 +157,11 @@ func main() {
157157
queue := dbos.NewWorkflowQueue(ctx, "queue")
158158

159159
// Launch DBOS
160-
err = ctx.Launch()
160+
err = dbos.Launch(ctx)
161161
if err != nil {
162162
panic(err)
163163
}
164-
defer ctx.Shutdown(2 * time.Second)
164+
defer dbos.Shutdown(ctx, 2 * time.Second)
165165

166166
// Enqueue tasks and gather results
167167
fmt.Println("Enqueuing workflows")

dbos/admin_server.go

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) {
117117
"AssumedRole": ws.AssumedRole,
118118
"AuthenticatedRoles": ws.AuthenticatedRoles,
119119
"Output": ws.Output,
120-
"Error": ws.Error,
121120
"ExecutorID": ws.ExecutorID,
122121
"ApplicationVersion": ws.ApplicationVersion,
123122
"ApplicationID": ws.ApplicationID,
@@ -169,6 +168,18 @@ func toListWorkflowResponse(ws WorkflowStatus) (map[string]any, error) {
169168
result["Output"] = string(bytes)
170169
}
171170

171+
if ws.Error != nil {
172+
// Convert error to string first, then marshal as JSON
173+
errStr := ws.Error.Error()
174+
bytes, err := json.Marshal(errStr)
175+
if err != nil {
176+
return nil, fmt.Errorf("failed to marshal error: %w", err)
177+
}
178+
result["Error"] = string(bytes)
179+
} else {
180+
result["Error"] = ""
181+
}
182+
172183
return result, nil
173184
}
174185

@@ -422,13 +433,37 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
422433
// Transform to snake_case format with function_id and function_name
423434
formattedSteps := make([]map[string]any, len(steps))
424435
for i, step := range steps {
425-
formattedSteps[i] = map[string]any{
436+
formattedStep := map[string]any{
426437
"function_id": step.StepID,
427438
"function_name": step.StepName,
428-
"output": step.Output,
429-
"error": step.Error,
430439
"child_workflow_id": step.ChildWorkflowID,
431440
}
441+
442+
// Marshal Output as JSON string if present
443+
if step.Output != nil && step.Output != "" {
444+
bytes, err := json.Marshal(step.Output)
445+
if err != nil {
446+
ctx.logger.Error("Failed to marshal step output", "error", err)
447+
http.Error(w, fmt.Sprintf("Failed to format step output: %v", err), http.StatusInternalServerError)
448+
return
449+
}
450+
formattedStep["output"] = string(bytes)
451+
}
452+
453+
// Marshal Error as JSON string if present
454+
if step.Error != nil {
455+
// Convert error to string first, then marshal as JSON
456+
errStr := step.Error.Error()
457+
bytes, err := json.Marshal(errStr)
458+
if err != nil {
459+
ctx.logger.Error("Failed to marshal step error", "error", err)
460+
http.Error(w, fmt.Sprintf("Failed to format step error: %v", err), http.StatusInternalServerError)
461+
return
462+
}
463+
formattedStep["error"] = string(bytes)
464+
}
465+
466+
formattedSteps[i] = formattedStep
432467
}
433468

434469
w.Header().Set("Content-Type", "application/json")

dbos/admin_server_test.go

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@ import (
1717
"go.uber.org/goleak"
1818
)
1919

20+
// TestStepResult is a custom struct for testing step outputs
21+
type TestStepResult struct {
22+
Message string `json:"message"`
23+
Count int `json:"count"`
24+
Success bool `json:"success"`
25+
}
26+
2027
func TestAdminServer(t *testing.T) {
2128
defer goleak.VerifyNone(t,
2229
goleak.IgnoreAnyFunction("github.com/jackc/pgx/v5/pgxpool.(*Pool).backgroundHealthCheck"),
@@ -726,6 +733,168 @@ func TestAdminServer(t *testing.T) {
726733
assert.Equal(t, queue.Name, queueName, "Expected queue name to be 'test-queue'")
727734
})
728735

736+
t.Run("WorkflowSteps", func(t *testing.T) {
737+
resetTestDatabase(t, databaseURL)
738+
ctx, err := NewDBOSContext(context.Background(), Config{
739+
DatabaseURL: databaseURL,
740+
AppName: "test-app",
741+
AdminServer: true,
742+
})
743+
require.NoError(t, err)
744+
745+
// Test workflow with multiple steps - simpler version that won't fail on serialization
746+
testWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
747+
// Step 1: Return a string
748+
stepResult1, err := RunAsStep(dbosCtx, func(ctx context.Context) (string, error) {
749+
return "step1-output", nil
750+
}, WithStepName("stringStep"))
751+
if err != nil {
752+
return "", err
753+
}
754+
755+
// Step 2: Return a user-defined struct
756+
stepResult2, err := RunAsStep(dbosCtx, func(ctx context.Context) (TestStepResult, error) {
757+
return TestStepResult{
758+
Message: "structured data",
759+
Count: 100,
760+
Success: true,
761+
}, nil
762+
}, WithStepName("structStep"))
763+
if err != nil {
764+
return "", err
765+
}
766+
767+
// Step 3: Return an error - but we don't abort on error to test error marshaling
768+
_, _ = RunAsStep(dbosCtx, func(ctx context.Context) (string, error) {
769+
return "", fmt.Errorf("deliberate error for testing")
770+
}, WithStepName("errorStep"))
771+
772+
// Step 4: Return empty string (to test empty value handling)
773+
stepResult4, err := RunAsStep(dbosCtx, func(ctx context.Context) (string, error) {
774+
return "", nil
775+
}, WithStepName("emptyStep"))
776+
if err != nil {
777+
return "", err
778+
}
779+
780+
// Combine results
781+
return fmt.Sprintf("workflow complete: %s, struct(%s,%d,%v), %s", stepResult1, stepResult2.Message, stepResult2.Count, stepResult2.Success, stepResult4), nil
782+
}
783+
784+
RegisterWorkflow(ctx, testWorkflow)
785+
786+
err = Launch(ctx)
787+
require.NoError(t, err)
788+
789+
// Ensure cleanup
790+
defer func() {
791+
if ctx != nil {
792+
Shutdown(ctx, 1*time.Minute)
793+
}
794+
}()
795+
796+
// Give the server a moment to start
797+
time.Sleep(100 * time.Millisecond)
798+
799+
client := &http.Client{Timeout: 5 * time.Second}
800+
801+
// Create and run the workflow
802+
handle, err := RunWorkflow(ctx, testWorkflow, "test-input")
803+
require.NoError(t, err, "Failed to create workflow")
804+
805+
// Wait for workflow to complete
806+
result, err := handle.GetResult()
807+
require.NoError(t, err, "Workflow should complete successfully")
808+
t.Logf("Workflow result: %s", result)
809+
810+
// Call the workflow steps endpoint
811+
workflowID := handle.GetWorkflowID()
812+
endpoint := fmt.Sprintf("http://localhost:%d/workflows/%s/steps", _DEFAULT_ADMIN_SERVER_PORT, workflowID)
813+
req, err := http.NewRequest("GET", endpoint, nil)
814+
require.NoError(t, err, "Failed to create request")
815+
816+
resp, err := client.Do(req)
817+
require.NoError(t, err, "Failed to make request")
818+
defer resp.Body.Close()
819+
820+
assert.Equal(t, http.StatusOK, resp.StatusCode, "Expected 200 OK from steps endpoint")
821+
822+
// Decode the response
823+
var steps []map[string]any
824+
err = json.NewDecoder(resp.Body).Decode(&steps)
825+
require.NoError(t, err, "Failed to decode steps response")
826+
827+
// Should have 4 steps
828+
assert.Equal(t, 4, len(steps), "Expected exactly 4 steps")
829+
830+
// Verify each step's output/error is properly marshaled
831+
for i, step := range steps {
832+
functionName, ok := step["function_name"].(string)
833+
require.True(t, ok, "function_name should be a string for step %d", i)
834+
835+
t.Logf("Step %d (%s): output=%v, error=%v", i, functionName, step["output"], step["error"])
836+
837+
switch functionName {
838+
case "stringStep":
839+
// String output should be marshaled as JSON string
840+
outputStr, ok := step["output"].(string)
841+
require.True(t, ok, "String step output should be a JSON string")
842+
843+
var unmarshaledOutput string
844+
err = json.Unmarshal([]byte(outputStr), &unmarshaledOutput)
845+
require.NoError(t, err, "Failed to unmarshal string step output")
846+
assert.Equal(t, "step1-output", unmarshaledOutput, "String step output should match")
847+
848+
assert.Nil(t, step["error"], "String step should have no error")
849+
850+
case "structStep":
851+
// Struct output should be marshaled as JSON string
852+
outputStr, ok := step["output"].(string)
853+
require.True(t, ok, "Struct step output should be a JSON string")
854+
855+
var unmarshaledOutput TestStepResult
856+
err = json.Unmarshal([]byte(outputStr), &unmarshaledOutput)
857+
require.NoError(t, err, "Failed to unmarshal struct step output")
858+
assert.Equal(t, TestStepResult{
859+
Message: "structured data",
860+
Count: 100,
861+
Success: true,
862+
}, unmarshaledOutput, "Struct step output should match")
863+
864+
assert.Nil(t, step["error"], "Struct step should have no error")
865+
866+
case "errorStep":
867+
// Error step should have error marshaled as JSON string
868+
errorStr, ok := step["error"].(string)
869+
require.True(t, ok, "Error step error should be a JSON string")
870+
871+
var unmarshaledError string
872+
err = json.Unmarshal([]byte(errorStr), &unmarshaledError)
873+
require.NoError(t, err, "Failed to unmarshal error step error")
874+
assert.Contains(t, unmarshaledError, "deliberate error for testing", "Error message should be preserved")
875+
876+
case "emptyStep":
877+
// Empty string might be returned as nil or as an empty JSON string
878+
output := step["output"]
879+
if output == nil {
880+
// Empty string was not included in response (which is fine)
881+
t.Logf("Empty step output was nil (not included)")
882+
} else {
883+
// If it was included, it should be marshaled as JSON string `""`
884+
outputStr, ok := output.(string)
885+
require.True(t, ok, "If present, empty step output should be a JSON string")
886+
887+
var unmarshaledOutput string
888+
err = json.Unmarshal([]byte(outputStr), &unmarshaledOutput)
889+
require.NoError(t, err, "Failed to unmarshal empty step output")
890+
assert.Equal(t, "", unmarshaledOutput, "Empty step output should be empty string")
891+
}
892+
893+
assert.Nil(t, step["error"], "Empty step should have no error")
894+
}
895+
}
896+
})
897+
729898
t.Run("TestDeactivate", func(t *testing.T) {
730899
resetTestDatabase(t, databaseURL)
731900
ctx, err := NewDBOSContext(context.Background(), Config{

dbos/doc.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@
1212
// AppName: "my-app",
1313
// DatabaseURL: os.Getenv("DBOS_SYSTEM_DATABASE_URL"),
1414
// })
15-
// defer dbosContext.Shutdown(5 * time.Second)
15+
// defer dbos.Shutdown(dbosContext, 5 * time.Second)
1616
//
1717
// // Register workflows before launching
1818
// dbos.RegisterWorkflow(dbosContext, myWorkflow)
1919
//
2020
// // Launch the context to start processing
21-
// err = dbosContext.Launch()
21+
// err = dbos.Launch(dbosContext)
2222
//
2323
// # Workflows
2424
//

dbos/workflow.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,6 +1041,10 @@ func RunAsStep[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error
10411041
return *new(R), newStepExecutionError("", "", "step function cannot be nil")
10421042
}
10431043

1044+
// Register the output type for gob encoding
1045+
var r R
1046+
gob.Register(r)
1047+
10441048
// Append WithStepName option to ensure the step name is set. This will not erase a user-provided step name
10451049
stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
10461050
opts = append(opts, WithStepName(stepName))
@@ -1490,9 +1494,9 @@ func (c *dbosContext) CancelWorkflow(_ DBOSContext, workflowID string) error {
14901494
workflowState, ok := c.Value(workflowStateKey).(*workflowState)
14911495
isWithinWorkflow := ok && workflowState != nil
14921496
if isWithinWorkflow {
1493-
_, err := RunAsStep(c, func(ctx context.Context) (any, error) {
1497+
_, err := RunAsStep(c, func(ctx context.Context) (string, error) {
14941498
err := c.systemDB.cancelWorkflow(ctx, workflowID)
1495-
return nil, err
1499+
return "", err
14961500
}, WithStepName("DBOS.cancelWorkflow"))
14971501
return err
14981502
} else {
@@ -1527,9 +1531,9 @@ func (c *dbosContext) ResumeWorkflow(_ DBOSContext, workflowID string) (Workflow
15271531
isWithinWorkflow := ok && workflowState != nil
15281532
var err error
15291533
if isWithinWorkflow {
1530-
_, err = RunAsStep(c, func(ctx context.Context) (any, error) {
1534+
_, err = RunAsStep(c, func(ctx context.Context) (string, error) {
15311535
err := c.systemDB.resumeWorkflow(ctx, workflowID)
1532-
return nil, err
1536+
return "", err
15331537
}, WithStepName("DBOS.resumeWorkflow"))
15341538
} else {
15351539
err = c.systemDB.resumeWorkflow(c, workflowID)

0 commit comments

Comments
 (0)