Skip to content

Commit b6bef08

Browse files
committed
return input/outputs as a raw json string from list workflows and get workflow steps
1 parent cdf7642 commit b6bef08

File tree

5 files changed

+54
-49
lines changed

5 files changed

+54
-49
lines changed

dbos/client_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package dbos
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"strings"
78
"sync"
@@ -373,7 +374,8 @@ func TestCancelResume(t *testing.T) {
373374
require.NoError(t, err, "failed to get result from resumed workflow")
374375

375376
// Decode the result from any (which may be float64 after JSON decode) to int
376-
result, err := decodeAnyToType[int](resultAny)
377+
var result int
378+
err = json.Unmarshal([]byte(resultAny.(string)), &result)
377379
require.NoError(t, err, "failed to decode result to int")
378380

379381
// Verify the result
@@ -399,7 +401,8 @@ func TestCancelResume(t *testing.T) {
399401
require.NoError(t, err, "failed to get result from second resume")
400402

401403
// Decode the result from any (which may be float64 after JSON decode) to int
402-
resultAgain, err := decodeAnyToType[int](resultAgainAny)
404+
var resultAgain int
405+
err = json.Unmarshal([]byte(resultAgainAny.(string)), &resultAgain)
403406
require.NoError(t, err, "failed to decode second result to int")
404407

405408
assert.Equal(t, input, resultAgain, "expected second resume result to match input")

dbos/queues_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package dbos
22

33
import (
44
"context"
5+
"encoding/json"
56
"errors"
67
"fmt"
78
"os"
@@ -541,7 +542,8 @@ func TestQueueRecovery(t *testing.T) {
541542
resultAny, err := h.GetResult()
542543
require.NoError(t, err, "failed to get result from recovered root workflow handle")
543544
// Decode the result from any (which may be []interface{} after JSON decode) to []int
544-
castedResult, err := decodeAnyToType[[]int](resultAny)
545+
var castedResult []int
546+
err = json.Unmarshal([]byte(resultAny.(string)), &castedResult)
545547
require.NoError(t, err, "failed to decode result to []int")
546548
expectedResult := []int{0, 1, 2, 3, 4}
547549
assert.Equal(t, expectedResult, castedResult, "expected result %v, got %v", expectedResult, castedResult)

dbos/serialization_test.go

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,6 @@ import (
1111
"github.com/stretchr/testify/require"
1212
)
1313

14-
// decodeAnyToType converts an any value (which is map[string]interface{} after JSON decode)
15-
// back to the expected type T by re-encoding to JSON and decoding into T.
16-
func decodeAnyToType[T any](value any) (T, error) {
17-
var result T
18-
if value == nil {
19-
return result, nil
20-
}
21-
// Re-encode the any value to JSON
22-
jsonBytes, err := json.Marshal(value)
23-
if err != nil {
24-
return result, fmt.Errorf("failed to marshal any value: %w", err)
25-
}
26-
// Decode JSON into the expected type T
27-
if err := json.Unmarshal(jsonBytes, &result); err != nil {
28-
return result, fmt.Errorf("failed to unmarshal into type T: %w", err)
29-
}
30-
return result, nil
31-
}
32-
3314
// testAllSerializationPaths tests workflow recovery and verifies all read paths.
3415
// This is the unified test function that exercises:
3516
// 1. Workflow recovery: starts a workflow, blocks it, recovers it, then verifies completion
@@ -116,13 +97,17 @@ func testAllSerializationPaths[T any](
11697
if len(steps) > 0 {
11798
lastStep := steps[len(steps)-1]
11899
if isNilExpected {
119-
assert.Nil(t, lastStep.Output, "Step output should be nil")
100+
// Should be an empty string
101+
assert.Equal(t, "", lastStep.Output, "Step output should be an empty string")
120102
} else {
121103
require.NotNil(t, lastStep.Output)
122-
// GetWorkflowSteps returns any (map[string]interface{} after JSON decode)
123-
// We need to re-encode to JSON and decode into type T
124-
decodedOutput, err := decodeAnyToType[T](lastStep.Output)
125-
require.NoError(t, err, "Failed to decode step output to type T")
104+
// GetWorkflowSteps returns a string (base64-decoded JSON)
105+
// Unmarshal the JSON string into type T
106+
strValue, ok := lastStep.Output.(string)
107+
require.True(t, ok, "Step output should be a string")
108+
var decodedOutput T
109+
err := json.Unmarshal([]byte(strValue), &decodedOutput)
110+
require.NoError(t, err, "Failed to unmarshal step output to type T")
126111
assert.Equal(t, expectedOutput, decodedOutput, "Step output should match expected output")
127112
}
128113
assert.Nil(t, lastStep.Error)
@@ -138,18 +123,28 @@ func testAllSerializationPaths[T any](
138123
require.Len(t, wfs, 1)
139124
wf := wfs[0]
140125
if isNilExpected {
141-
assert.Nil(t, wf.Input, "Workflow input should be nil")
142-
assert.Nil(t, wf.Output, "Workflow output should be nil")
126+
// Should be an empty string
127+
assert.Equal(t, "", wf.Input, "Workflow input should be an empty string")
128+
assert.Equal(t, "", wf.Output, "Workflow output should be an empty string")
143129
} else {
144130
require.NotNil(t, wf.Input)
145131
require.NotNil(t, wf.Output)
146132

147-
// ListWorkflows returns any (map[string]interface{} after JSON decode)
148-
// We need to re-encode to JSON and decode into type T
149-
decodedInput, err := decodeAnyToType[T](wf.Input)
150-
require.NoError(t, err, "Failed to decode workflow input to type T")
151-
decodedOutput, err := decodeAnyToType[T](wf.Output)
152-
require.NoError(t, err, "Failed to decode workflow output to type T")
133+
// ListWorkflows returns strings (base64-decoded JSON)
134+
// Unmarshal the JSON strings into type T
135+
inputStr, ok := wf.Input.(string)
136+
require.True(t, ok, "Workflow input should be a string")
137+
outputStr, ok := wf.Output.(string)
138+
require.True(t, ok, "Workflow output should be a string")
139+
140+
var decodedInput T
141+
err := json.Unmarshal([]byte(inputStr), &decodedInput)
142+
require.NoError(t, err, "Failed to unmarshal workflow input to type T")
143+
144+
var decodedOutput T
145+
err = json.Unmarshal([]byte(outputStr), &decodedOutput)
146+
require.NoError(t, err, "Failed to unmarshal workflow output to type T")
147+
153148
assert.Equal(t, input, decodedInput, "Workflow input should match input")
154149
assert.Equal(t, expectedOutput, decodedOutput, "Workflow output should match expected output")
155150
}

dbos/workflow.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package dbos
22

33
import (
44
"context"
5+
"encoding/base64"
56
"errors"
67
"fmt"
78
"math"
@@ -2063,29 +2064,28 @@ func (c *dbosContext) ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption)
20632064

20642065
// Deserialize Input and Output fields if they were loaded
20652066
if params.loadInput || params.loadOutput {
2066-
serializer := newJSONSerializer[any]()
20672067
for i := range workflows {
20682068
if params.loadInput && workflows[i].Input != nil {
20692069
encodedInput, ok := workflows[i].Input.(*string)
20702070
if !ok {
20712071
return nil, fmt.Errorf("workflow input must be encoded string, got %T", workflows[i].Input)
20722072
}
2073-
decodedInput, err := serializer.Decode(encodedInput)
2073+
decodedBytes, err := base64.StdEncoding.DecodeString(*encodedInput)
20742074
if err != nil {
2075-
return nil, fmt.Errorf("failed to deserialize workflow input for %s: %w", workflows[i].ID, err)
2075+
return nil, fmt.Errorf("failed to decode base64 workflow input for %s: %w", workflows[i].ID, err)
20762076
}
2077-
workflows[i].Input = decodedInput
2077+
workflows[i].Input = string(decodedBytes)
20782078
}
20792079
if params.loadOutput && workflows[i].Output != nil {
20802080
encodedOutput, ok := workflows[i].Output.(*string)
20812081
if !ok {
20822082
return nil, fmt.Errorf("workflow output must be encoded string, got %T", workflows[i].Output)
20832083
}
2084-
decodedOutput, err := serializer.Decode(encodedOutput)
2084+
decodedBytes, err := base64.StdEncoding.DecodeString(*encodedOutput)
20852085
if err != nil {
2086-
return nil, fmt.Errorf("failed to deserialize workflow output for %s: %w", workflows[i].ID, err)
2086+
return nil, fmt.Errorf("failed to decode base64 workflow output for %s: %w", workflows[i].ID, err)
20872087
}
2088-
workflows[i].Output = decodedOutput
2088+
workflows[i].Output = string(decodedBytes)
20892089
}
20902090
}
20912091
}
@@ -2191,14 +2191,16 @@ func (c *dbosContext) GetWorkflowSteps(_ DBOSContext, workflowID string) ([]Step
21912191

21922192
// Deserialize outputs if asked to
21932193
if loadOutput {
2194-
serializer := newJSONSerializer[any]()
21952194
for i := range steps {
21962195
encodedOutput := steps[i].Output
2197-
decodedOutput, err := serializer.Decode(encodedOutput)
2196+
if encodedOutput == nil {
2197+
continue
2198+
}
2199+
decodedBytes, err := base64.StdEncoding.DecodeString(*encodedOutput)
21982200
if err != nil {
2199-
return nil, fmt.Errorf("failed to deserialize step output for step %d: %w", steps[i].StepID, err)
2201+
return nil, fmt.Errorf("failed to decode base64 step output for step %d: %w", steps[i].StepID, err)
22002202
}
2201-
stepInfos[i].Output = decodedOutput
2203+
stepInfos[i].Output = string(decodedBytes)
22022204
}
22032205
}
22042206

dbos/workflows_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package dbos
22

33
import (
44
"context"
5+
"encoding/json"
56
"errors"
67
"fmt"
78
"reflect"
@@ -674,8 +675,9 @@ func TestSteps(t *testing.T) {
674675
assert.Nil(t, step.Error)
675676

676677
// Deserialize the output from the database to verify proper encoding
677-
// Use decodeAnyToType to handle JSON encode/decode round-trip
678-
storedOutput, err := decodeAnyToType[StepOutput](step.Output)
678+
// Use json.Unmarshal to handle JSON encode/decode round-trip
679+
var storedOutput StepOutput
680+
err = json.Unmarshal([]byte(step.Output.(string)), &storedOutput)
679681
require.NoError(t, err, "failed to decode step output to StepOutput")
680682

681683
// Verify all fields were correctly serialized and deserialized
@@ -1416,7 +1418,8 @@ func TestWorkflowDeadLetterQueue(t *testing.T) {
14161418
resultAny, err := h.GetResult()
14171419
require.NoError(t, err, "failed to get result from handle %d", i)
14181420
// Decode the result from any (which may be float64 after JSON decode) to int
1419-
result, err := decodeAnyToType[int](resultAny)
1421+
var result int
1422+
err = json.Unmarshal([]byte(resultAny.(string)), &result)
14201423
require.NoError(t, err, "failed to decode result to int")
14211424
require.Equal(t, 0, result)
14221425
}

0 commit comments

Comments
 (0)