Skip to content

Commit 16f6aea

Browse files
committed
system db expects pointers to string, correctly set in serialize()
1 parent ecad764 commit 16f6aea

File tree

3 files changed

+85
-142
lines changed

3 files changed

+85
-142
lines changed

dbos/serialization.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,33 @@ import (
99
"strings"
1010
)
1111

12-
func serialize(data any) (string, error) {
13-
var inputBytes []byte
14-
if data != nil {
15-
var buf bytes.Buffer
16-
enc := gob.NewEncoder(&buf)
17-
if err := enc.Encode(&data); err != nil {
18-
return "", fmt.Errorf("failed to encode data: %w", err)
19-
}
20-
inputBytes = buf.Bytes()
12+
func serialize(data any) (*string, error) {
13+
// Handle nil values specially - return nil pointer which will be stored as NULL in DB
14+
if data == nil {
15+
return nil, nil
16+
}
17+
18+
// Handle empty string specially - return pointer to empty string which will be stored as "" in DB
19+
if str, ok := data.(string); ok && str == "" {
20+
return &str, nil
2121
}
22-
return base64.StdEncoding.EncodeToString(inputBytes), nil
22+
23+
// Register the type with gob to avoid interface{} issues
24+
safeGobRegister(data, nil)
25+
26+
var buf bytes.Buffer
27+
enc := gob.NewEncoder(&buf)
28+
if err := enc.Encode(data); err != nil {
29+
return nil, fmt.Errorf("failed to encode data: %w", err)
30+
}
31+
inputBytes := buf.Bytes()
32+
33+
encoded := base64.StdEncoding.EncodeToString(inputBytes)
34+
return &encoded, nil
2335
}
2436

2537
func deserialize(data *string) (any, error) {
26-
if data == nil || *data == "" {
38+
if data == nil {
2739
return nil, nil
2840
}
2941

dbos/system_database.go

Lines changed: 34 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ type systemDatabase interface {
5555

5656
// Communication (special steps)
5757
send(ctx context.Context, input WorkflowSendInput) error
58-
recv(ctx context.Context, input recvInput) (any, error)
58+
recv(ctx context.Context, input recvInput) (*string, error)
5959
setEvent(ctx context.Context, input WorkflowSetEventInput) error
6060
getEvent(ctx context.Context, input getEventInput) (any, error)
6161

@@ -440,12 +440,6 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt
440440
timeoutMs = &millis
441441
}
442442

443-
// Input should already be serialized by caller
444-
inputString, ok := input.status.Input.(string)
445-
if !ok && input.status.Input != nil {
446-
return nil, fmt.Errorf("workflow input must be a pre-encoded string, got %T", input.status.Input)
447-
}
448-
449443
// Our DB works with NULL values
450444
var applicationVersion *string
451445
if len(input.status.ApplicationVersion) > 0 {
@@ -517,7 +511,7 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt
517511
updatedAt.UnixMilli(),
518512
timeoutMs,
519513
deadline,
520-
inputString,
514+
input.status.Input, // encoded input
521515
deduplicationID,
522516
input.status.Priority,
523517
WorkflowStatusEnqueued,
@@ -792,15 +786,13 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) (
792786
wf.Error = errors.New(*errorStr)
793787
}
794788

795-
// Return output as encoded string (stored as any)
796-
if outputString != nil {
797-
wf.Output = *outputString
798-
}
789+
// Return output as encoded *string
790+
wf.Output = outputString
799791
}
800792

801-
// Return input as encoded string (stored as any)
802-
if input.loadInput && inputString != nil {
803-
wf.Input = *inputString
793+
// Return input as encoded *string
794+
if input.loadInput {
795+
wf.Input = inputString
804796
}
805797

806798
workflows = append(workflows, wf)
@@ -816,7 +808,7 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) (
816808
type updateWorkflowOutcomeDBInput struct {
817809
workflowID string
818810
status WorkflowStatusType
819-
output any
811+
output *string
820812
err error
821813
tx pgx.Tx
822814
}
@@ -828,22 +820,16 @@ func (s *sysDB) updateWorkflowOutcome(ctx context.Context, input updateWorkflowO
828820
SET status = $1, output = $2, error = $3, updated_at = $4, deduplication_id = NULL
829821
WHERE workflow_uuid = $5 AND NOT (status = $6 AND $1 in ($7, $8))`, pgx.Identifier{s.schema}.Sanitize())
830822

831-
// Output should already be serialized by caller
832-
outputString, ok := input.output.(string)
833-
if !ok && input.output != nil {
834-
return fmt.Errorf("workflow output must be a pre-encoded string, got %T", input.output)
835-
}
836-
837823
var errorStr string
838824
if input.err != nil {
839825
errorStr = input.err.Error()
840826
}
841827

842828
var err error
843829
if input.tx != nil {
844-
_, err = input.tx.Exec(ctx, query, input.status, outputString, errorStr, time.Now().UnixMilli(), input.workflowID, WorkflowStatusCancelled, WorkflowStatusSuccess, WorkflowStatusError)
830+
_, err = input.tx.Exec(ctx, query, input.status, input.output, errorStr, time.Now().UnixMilli(), input.workflowID, WorkflowStatusCancelled, WorkflowStatusSuccess, WorkflowStatusError)
845831
} else {
846-
_, err = s.pool.Exec(ctx, query, input.status, outputString, errorStr, time.Now().UnixMilli(), input.workflowID, WorkflowStatusCancelled, WorkflowStatusSuccess, WorkflowStatusError)
832+
_, err = s.pool.Exec(ctx, query, input.status, input.output, errorStr, time.Now().UnixMilli(), input.workflowID, WorkflowStatusCancelled, WorkflowStatusSuccess, WorkflowStatusError)
847833
}
848834

849835
if err != nil {
@@ -1105,12 +1091,6 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) (st
11051091
recovery_attempts
11061092
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)`, pgx.Identifier{s.schema}.Sanitize())
11071093

1108-
// Input should already be serialized in originalWorkflow.Input
1109-
inputString, ok := originalWorkflow.Input.(string)
1110-
if !ok && originalWorkflow.Input != nil {
1111-
return "", fmt.Errorf("workflow input must be a pre-encoded string, got %T", originalWorkflow.Input)
1112-
}
1113-
11141094
// Marshal authenticated roles (slice of strings) to JSON for TEXT column
11151095
authenticatedRoles, err := json.Marshal(originalWorkflow.AuthenticatedRoles)
11161096

@@ -1128,7 +1108,7 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) (st
11281108
&appVersion,
11291109
originalWorkflow.ApplicationID,
11301110
_DBOS_INTERNAL_QUEUE_NAME,
1131-
inputString,
1111+
originalWorkflow.Input, // encoded input
11321112
time.Now().UnixMilli(),
11331113
time.Now().UnixMilli(),
11341114
0)
@@ -1204,7 +1184,7 @@ type recordOperationResultDBInput struct {
12041184
workflowID string
12051185
stepID int
12061186
stepName string
1207-
output any
1187+
output *string
12081188
err error
12091189
tx pgx.Tx
12101190
}
@@ -1220,26 +1200,20 @@ func (s *sysDB) recordOperationResult(ctx context.Context, input recordOperation
12201200
errorString = &e
12211201
}
12221202

1223-
// Output should already be serialized by caller
1224-
outputString, ok := input.output.(string)
1225-
if !ok && input.output != nil {
1226-
return fmt.Errorf("step output must be a pre-encoded string, got %T", input.output)
1227-
}
1228-
12291203
var err error
12301204
if input.tx != nil {
12311205
_, err = input.tx.Exec(ctx, query,
12321206
input.workflowID,
12331207
input.stepID,
1234-
outputString,
1208+
input.output,
12351209
errorString,
12361210
input.stepName,
12371211
)
12381212
} else {
12391213
_, err = s.pool.Exec(ctx, query,
12401214
input.workflowID,
12411215
input.stepID,
1242-
outputString,
1216+
input.output,
12431217
errorString,
12441218
input.stepName,
12451219
)
@@ -1329,7 +1303,7 @@ type recordChildGetResultDBInput struct {
13291303
parentWorkflowID string
13301304
childWorkflowID string
13311305
stepID int
1332-
output string
1306+
output *string
13331307
err error
13341308
}
13351309

@@ -1364,7 +1338,7 @@ func (s *sysDB) recordChildGetResult(ctx context.Context, input recordChildGetRe
13641338
/*******************************/
13651339

13661340
type recordedResult struct {
1367-
output any
1341+
output *string
13681342
err error
13691343
}
13701344

@@ -1434,18 +1408,12 @@ func (s *sysDB) checkOperationExecution(ctx context.Context, input checkOperatio
14341408
return nil, newUnexpectedStepError(input.workflowID, input.stepID, input.stepName, recordedFunctionName)
14351409
}
14361410

1437-
// Return output as encoded string
1438-
var output any
1439-
if outputString != nil {
1440-
output = *outputString
1441-
}
1442-
14431411
var recordedError error
14441412
if errorStr != nil && *errorStr != "" {
14451413
recordedError = errors.New(*errorStr)
14461414
}
14471415
result := &recordedResult{
1448-
output: output,
1416+
output: outputString,
14491417
err: recordedError,
14501418
}
14511419
return result, nil
@@ -1490,8 +1458,8 @@ func (s *sysDB) getWorkflowSteps(ctx context.Context, input getWorkflowStepsInpu
14901458
}
14911459

14921460
// Return output as encoded string if loadOutput is true
1493-
if input.loadOutput && outputString != nil {
1494-
step.Output = *outputString
1461+
if input.loadOutput {
1462+
step.Output = outputString
14951463
}
14961464

14971465
// Convert error string to error if present
@@ -1565,11 +1533,7 @@ func (s *sysDB) sleep(ctx context.Context, input sleepInput) (time.Duration, err
15651533
}
15661534

15671535
// Deserialize the recorded end time
1568-
encodedStr, ok := recordedResult.output.(string)
1569-
if !ok {
1570-
return 0, fmt.Errorf("recorded output must be encoded string, got %T", recordedResult.output)
1571-
}
1572-
decodedOutput, err := deserialize(&encodedStr)
1536+
decodedOutput, err := deserialize(recordedResult.output)
15731537
if err != nil {
15741538
return 0, fmt.Errorf("failed to deserialize sleep end time: %w", err)
15751539
}
@@ -1746,7 +1710,7 @@ const _DBOS_NULL_TOPIC = "__null__topic__"
17461710

17471711
type WorkflowSendInput struct {
17481712
DestinationID string
1749-
Message any
1713+
Message *string
17501714
Topic string
17511715
}
17521716

@@ -1799,14 +1763,8 @@ func (s *sysDB) send(ctx context.Context, input WorkflowSendInput) error {
17991763
topic = input.Topic
18001764
}
18011765

1802-
// Message should already be serialized by caller
1803-
messageString, ok := input.Message.(string)
1804-
if !ok && input.Message != nil {
1805-
return fmt.Errorf("message must be a pre-encoded string, got %T", input.Message)
1806-
}
1807-
18081766
insertQuery := fmt.Sprintf(`INSERT INTO %s.notifications (destination_uuid, topic, message) VALUES ($1, $2, $3)`, pgx.Identifier{s.schema}.Sanitize())
1809-
_, err = tx.Exec(ctx, insertQuery, input.DestinationID, topic, messageString)
1767+
_, err = tx.Exec(ctx, insertQuery, input.DestinationID, topic, input.Message)
18101768
if err != nil {
18111769
// Check for foreign key violation (destination workflow doesn't exist)
18121770
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == _PG_ERROR_FOREIGN_KEY_VIOLATION {
@@ -1841,7 +1799,7 @@ func (s *sysDB) send(ctx context.Context, input WorkflowSendInput) error {
18411799
}
18421800

18431801
// Recv is a special type of step that receives a message destined for a given workflow
1844-
func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) {
1802+
func (s *sysDB) recv(ctx context.Context, input recvInput) (*string, error) {
18451803
functionName := "DBOS.recv"
18461804

18471805
// Get workflow state from context
@@ -1901,7 +1859,7 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) {
19011859
err = s.pool.QueryRow(ctx, query, destinationID, topic).Scan(&exists)
19021860
if err != nil {
19031861
cond.L.Unlock()
1904-
return false, fmt.Errorf("failed to check message: %w", err)
1862+
return nil, fmt.Errorf("failed to check message: %w", err)
19051863
}
19061864
if !exists {
19071865
done := make(chan struct{})
@@ -1955,26 +1913,17 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) {
19551913
var messageString *string
19561914
err = tx.QueryRow(ctx, query, destinationID, topic).Scan(&messageString)
19571915
if err != nil {
1958-
if err == pgx.ErrNoRows {
1959-
// No message found, record nil result
1960-
messageString = nil
1961-
} else {
1916+
if err != pgx.ErrNoRows {
19621917
return nil, fmt.Errorf("failed to consume message: %w", err)
19631918
}
19641919
}
19651920

1966-
// Return message as encoded string
1967-
var message any
1968-
if messageString != nil {
1969-
message = *messageString
1970-
}
1971-
19721921
// Record the operation result (with encoded message string)
19731922
recordInput := recordOperationResultDBInput{
19741923
workflowID: destinationID,
19751924
stepID: stepID,
19761925
stepName: functionName,
1977-
output: message,
1926+
output: messageString,
19781927
tx: tx,
19791928
}
19801929
err = s.recordOperationResult(ctx, recordInput)
@@ -1986,12 +1935,13 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) {
19861935
return nil, fmt.Errorf("failed to commit transaction: %w", err)
19871936
}
19881937

1989-
return message, nil
1938+
// Return the message string pointer
1939+
return messageString, nil
19901940
}
19911941

19921942
type WorkflowSetEventInput struct {
19931943
Key string
1994-
Message any
1944+
Message *string
19951945
}
19961946

19971947
func (s *sysDB) setEvent(ctx context.Context, input WorkflowSetEventInput) error {
@@ -2031,19 +1981,13 @@ func (s *sysDB) setEvent(ctx context.Context, input WorkflowSetEventInput) error
20311981
return nil
20321982
}
20331983

2034-
// Message should already be serialized by caller
2035-
messageString, ok := input.Message.(string)
2036-
if !ok && input.Message != nil {
2037-
return fmt.Errorf("event value must be a pre-encoded string, got %T", input.Message)
2038-
}
2039-
20401984
// Insert or update the event using UPSERT
20411985
insertQuery := fmt.Sprintf(`INSERT INTO %s.workflow_events (workflow_uuid, key, value)
20421986
VALUES ($1, $2, $3)
20431987
ON CONFLICT (workflow_uuid, key)
20441988
DO UPDATE SET value = EXCLUDED.value`, pgx.Identifier{s.schema}.Sanitize())
20451989

2046-
_, err = tx.Exec(ctx, insertQuery, wfState.workflowID, input.Key, messageString)
1990+
_, err = tx.Exec(ctx, insertQuery, wfState.workflowID, input.Key, input.Message)
20471991
if err != nil {
20481992
return fmt.Errorf("failed to insert/update workflow event: %w", err)
20491993
}
@@ -2173,19 +2117,13 @@ func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error)
21732117
}
21742118
}
21752119

2176-
// Return value as encoded string
2177-
var value any
2178-
if valueString != nil {
2179-
value = *valueString
2180-
}
2181-
2182-
// Record the operation result if this is called within a workflow (with encoded value string)
2120+
// Record the operation result if this is called within a workflow
21832121
if isInWorkflow {
21842122
recordInput := recordOperationResultDBInput{
21852123
workflowID: wfState.workflowID,
21862124
stepID: stepID,
21872125
stepName: functionName,
2188-
output: value,
2126+
output: valueString,
21892127
err: nil,
21902128
}
21912129

@@ -2195,7 +2133,8 @@ func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error)
21952133
}
21962134
}
21972135

2198-
return value, nil
2136+
// Return the value string pointer
2137+
return valueString, nil
21992138
}
22002139

22012140
/*******************************/

0 commit comments

Comments
 (0)