Skip to content

Commit bf35e78

Browse files
committed
lift encoding outside of the system db
1 parent 0259dd6 commit bf35e78

File tree

2 files changed

+220
-71
lines changed

2 files changed

+220
-71
lines changed

dbos/system_database.go

Lines changed: 68 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -440,9 +440,10 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt
440440
timeoutMs = &millis
441441
}
442442

443-
inputString, err := serialize(input.status.Input)
444-
if err != nil {
445-
return nil, fmt.Errorf("failed to serialize input: %w", err)
443+
// Input should already be serialized by caller
444+
inputString, ok := input.status.Input.(string)
445+
if !ok && input.status.Input != nil {
446+
return nil, fmt.Errorf("workflow input must be a pre-encoded string, got %T", input.status.Input)
446447
}
447448

448449
// Our DB works with NULL values
@@ -791,18 +792,15 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) (
791792
wf.Error = errors.New(*errorStr)
792793
}
793794

794-
wf.Output, err = deserialize(outputString)
795-
if err != nil {
796-
return nil, fmt.Errorf("failed to deserialize output: %w", err)
795+
// Return output as encoded string (stored as any)
796+
if outputString != nil {
797+
wf.Output = *outputString
797798
}
798799
}
799800

800-
// Handle input only if loadInput is true
801-
if input.loadInput {
802-
wf.Input, err = deserialize(inputString)
803-
if err != nil {
804-
return nil, fmt.Errorf("failed to deserialize input: %w", err)
805-
}
801+
// Return input as encoded string (stored as any)
802+
if input.loadInput && inputString != nil {
803+
wf.Input = *inputString
806804
}
807805

808806
workflows = append(workflows, wf)
@@ -830,16 +828,18 @@ func (s *sysDB) updateWorkflowOutcome(ctx context.Context, input updateWorkflowO
830828
SET status = $1, output = $2, error = $3, updated_at = $4, deduplication_id = NULL
831829
WHERE workflow_uuid = $5 AND NOT (status = $6 AND $1 in ($7, $8))`, pgx.Identifier{s.schema}.Sanitize())
832830

833-
outputString, err := serialize(input.output)
834-
if err != nil {
835-
return fmt.Errorf("failed to serialize output: %w", err)
831+
// Output should already be serialized by caller
832+
outputString, ok := input.output.(string)
833+
if !ok && input.output != nil {
834+
return fmt.Errorf("workflow output must be a pre-encoded string, got %T", input.output)
836835
}
837836

838837
var errorStr string
839838
if input.err != nil {
840839
errorStr = input.err.Error()
841840
}
842841

842+
var err error
843843
if input.tx != nil {
844844
_, err = input.tx.Exec(ctx, query, input.status, outputString, errorStr, time.Now().UnixMilli(), input.workflowID, WorkflowStatusCancelled, WorkflowStatusSuccess, WorkflowStatusError)
845845
} else {
@@ -1105,9 +1105,10 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) (st
11051105
recovery_attempts
11061106
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)`, pgx.Identifier{s.schema}.Sanitize())
11071107

1108-
inputString, err := serialize(originalWorkflow.Input)
1109-
if err != nil {
1110-
return "", fmt.Errorf("failed to serialize input: %w", err)
1108+
// Input should already be serialized in originalWorkflow.Input
1109+
inputString, ok := originalWorkflow.Input.(string)
1110+
if !ok && originalWorkflow.Input != nil {
1111+
return "", fmt.Errorf("workflow input must be a pre-encoded string, got %T", originalWorkflow.Input)
11111112
}
11121113

11131114
// Marshal authenticated roles (slice of strings) to JSON for TEXT column
@@ -1179,10 +1180,10 @@ func (s *sysDB) awaitWorkflowResult(ctx context.Context, workflowID string) (any
11791180
return nil, fmt.Errorf("failed to query workflow status: %w", err)
11801181
}
11811182

1182-
// Deserialize output from TEXT to bytes then from bytes to R using gob
1183-
output, err := deserialize(outputString)
1184-
if err != nil {
1185-
return nil, fmt.Errorf("failed to deserialize output: %w", err)
1183+
// Return output as encoded string
1184+
var output any
1185+
if outputString != nil {
1186+
output = *outputString
11861187
}
11871188

11881189
switch status {
@@ -1219,11 +1220,13 @@ func (s *sysDB) recordOperationResult(ctx context.Context, input recordOperation
12191220
errorString = &e
12201221
}
12211222

1222-
outputString, err := serialize(input.output)
1223-
if err != nil {
1224-
return fmt.Errorf("failed to serialize output: %w", err)
1223+
// Output should already be serialized by caller
1224+
outputString, ok := input.output.(string)
1225+
if !ok && input.output != nil {
1226+
return fmt.Errorf("step output must be a pre-encoded string, got %T", input.output)
12251227
}
12261228

1229+
var err error
12271230
if input.tx != nil {
12281231
_, err = input.tx.Exec(ctx, query,
12291232
input.workflowID,
@@ -1431,9 +1434,10 @@ func (s *sysDB) checkOperationExecution(ctx context.Context, input checkOperatio
14311434
return nil, newUnexpectedStepError(input.workflowID, input.stepID, input.stepName, recordedFunctionName)
14321435
}
14331436

1434-
output, err := deserialize(outputString)
1435-
if err != nil {
1436-
return nil, fmt.Errorf("failed to deserialize output: %w", err)
1437+
// Return output as encoded string
1438+
var output any
1439+
if outputString != nil {
1440+
output = *outputString
14371441
}
14381442

14391443
var recordedError error
@@ -1485,13 +1489,9 @@ func (s *sysDB) getWorkflowSteps(ctx context.Context, input getWorkflowStepsInpu
14851489
return nil, fmt.Errorf("failed to scan step row: %w", err)
14861490
}
14871491

1488-
// Deserialize output if present and loadOutput is true
1492+
// Return output as encoded string if loadOutput is true
14891493
if input.loadOutput && outputString != nil {
1490-
output, err := deserialize(outputString)
1491-
if err != nil {
1492-
return nil, fmt.Errorf("failed to deserialize output: %w", err)
1493-
}
1494-
step.Output = output
1494+
step.Output = *outputString
14951495
}
14961496

14971497
// Convert error string to error if present
@@ -1564,10 +1564,20 @@ func (s *sysDB) sleep(ctx context.Context, input sleepInput) (time.Duration, err
15641564
return 0, fmt.Errorf("no recorded end time for recorded sleep operation")
15651565
}
15661566

1567+
// Deserialize the recorded end time
1568+
encodedStr, ok := recordedResult.output.(string)
1569+
if !ok {
1570+
return 0, fmt.Errorf("recorded output must be encoded string, got %T", recordedResult.output)
1571+
}
1572+
decodedOutput, err := deserialize(&encodedStr)
1573+
if err != nil {
1574+
return 0, fmt.Errorf("failed to deserialize sleep end time: %w", err)
1575+
}
1576+
15671577
// The output should be a time.Time representing the end time
1568-
endTimeInterface, ok := recordedResult.output.(time.Time)
1578+
endTimeInterface, ok := decodedOutput.(time.Time)
15691579
if !ok {
1570-
return 0, fmt.Errorf("recorded output is not a time.Time: %T", recordedResult.output)
1580+
return 0, fmt.Errorf("decoded output is not a time.Time: %T", decodedOutput)
15711581
}
15721582
endTime = endTimeInterface
15731583

@@ -1578,12 +1588,18 @@ func (s *sysDB) sleep(ctx context.Context, input sleepInput) (time.Duration, err
15781588
// First execution: calculate and record the end time
15791589
endTime = time.Now().Add(input.duration)
15801590

1591+
// Serialize the end time before recording
1592+
encodedEndTime, serErr := serialize(endTime)
1593+
if serErr != nil {
1594+
return 0, fmt.Errorf("failed to serialize sleep end time: %w", serErr)
1595+
}
1596+
15811597
// Record the operation result with the calculated end time
15821598
recordInput := recordOperationResultDBInput{
15831599
workflowID: wfState.workflowID,
15841600
stepID: stepID,
15851601
stepName: functionName,
1586-
output: endTime,
1602+
output: encodedEndTime,
15871603
err: nil,
15881604
}
15891605

@@ -1783,10 +1799,10 @@ func (s *sysDB) send(ctx context.Context, input WorkflowSendInput) error {
17831799
topic = input.Topic
17841800
}
17851801

1786-
// Serialize the message. It must have been registered with encoding/gob by the user if not a basic type.
1787-
messageString, err := serialize(input.Message)
1788-
if err != nil {
1789-
return fmt.Errorf("failed to serialize message: %w", err)
1802+
// Message should already be serialized by caller
1803+
messageString, ok := input.Message.(string)
1804+
if !ok && input.Message != nil {
1805+
return fmt.Errorf("message must be a pre-encoded string, got %T", input.Message)
17901806
}
17911807

17921808
insertQuery := fmt.Sprintf(`INSERT INTO %s.notifications (destination_uuid, topic, message) VALUES ($1, $2, $3)`, pgx.Identifier{s.schema}.Sanitize())
@@ -1947,16 +1963,13 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) {
19471963
}
19481964
}
19491965

1950-
// Deserialize the message
1966+
// Return message as encoded string
19511967
var message any
1952-
if messageString != nil { // nil message can happen on the timeout path only
1953-
message, err = deserialize(messageString)
1954-
if err != nil {
1955-
return nil, fmt.Errorf("failed to deserialize message: %w", err)
1956-
}
1968+
if messageString != nil {
1969+
message = *messageString
19571970
}
19581971

1959-
// Record the operation result
1972+
// Record the operation result (with encoded message string)
19601973
recordInput := recordOperationResultDBInput{
19611974
workflowID: destinationID,
19621975
stepID: stepID,
@@ -2018,10 +2031,10 @@ func (s *sysDB) setEvent(ctx context.Context, input WorkflowSetEventInput) error
20182031
return nil
20192032
}
20202033

2021-
// Serialize the message. It must have been registered with encoding/gob by the user if not a basic type.
2022-
messageString, err := serialize(input.Message)
2023-
if err != nil {
2024-
return fmt.Errorf("failed to serialize message: %w", err)
2034+
// Message should already be serialized by caller
2035+
messageString, ok := input.Message.(string)
2036+
if !ok && input.Message != nil {
2037+
return fmt.Errorf("event value must be a pre-encoded string, got %T", input.Message)
20252038
}
20262039

20272040
// Insert or update the event using UPSERT
@@ -2160,16 +2173,13 @@ func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error)
21602173
}
21612174
}
21622175

2163-
// Deserialize the value if it exists
2176+
// Return value as encoded string
21642177
var value any
21652178
if valueString != nil {
2166-
value, err = deserialize(valueString)
2167-
if err != nil {
2168-
return nil, fmt.Errorf("failed to deserialize event value: %w", err)
2169-
}
2179+
value = *valueString
21702180
}
21712181

2172-
// Record the operation result if this is called within a workflow
2182+
// Record the operation result if this is called within a workflow (with encoded value string)
21732183
if isInWorkflow {
21742184
recordInput := recordOperationResultDBInput{
21752185
workflowID: wfState.workflowID,

0 commit comments

Comments
 (0)