Skip to content

Commit 7cc6ffb

Browse files
committed
cleanup
1 parent 69869d2 commit 7cc6ffb

File tree

4 files changed

+10
-28
lines changed

4 files changed

+10
-28
lines changed

dbos/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu
149149
}
150150

151151
// Serialize input before storing in workflow status
152-
encodedInputStr, err := serialize(dbosCtx.serializer, params.workflowInput)
152+
encodedInputStr, err := dbosCtx.serializer.Encode(params.workflowInput)
153153
if err != nil {
154154
return nil, fmt.Errorf("failed to serialize workflow input: %w", err)
155155
}

dbos/recovery.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,6 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow
1414
}
1515

1616
for _, workflow := range pendingWorkflows {
17-
// Pass encoded input directly - decoding will happen in workflow wrapper when we know the target type
18-
var encodedInput *string
19-
if workflow.Input != nil {
20-
inputString, ok := workflow.Input.(*string)
21-
if !ok {
22-
ctx.logger.Warn("Skipping workflow recovery due to invalid input type", "workflow_id", workflow.ID, "name", workflow.Name, "input_type", workflow.Input)
23-
continue
24-
}
25-
encodedInput = inputString
26-
}
27-
2817
if workflow.QueueName != "" {
2918
cleared, err := ctx.systemDB.clearQueueAssignment(ctx, workflow.ID)
3019
if err != nil {
@@ -59,7 +48,8 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow
5948
WithWorkflowID(workflow.ID),
6049
}
6150
// Create a workflow context from the executor context
62-
handle, err := registeredWorkflow.wrappedFunction(ctx, encodedInput, opts...)
51+
// Pass encoded input directly - decoding will happen in workflow wrapper when we know the target type
52+
handle, err := registeredWorkflow.wrappedFunction(ctx, workflow.Input, opts...)
6353
if err != nil {
6454
return nil, err
6555
}

dbos/serialization.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,6 @@ func (j *JSONSerializer) Decode(data *string) (any, error) {
5858
return result, nil
5959
}
6060

61-
// serialize serializes data using the provided serializer
62-
func serialize[T any](serializer Serializer, data T) (string, error) {
63-
if serializer == nil {
64-
return "", fmt.Errorf("serializer cannot be nil")
65-
}
66-
return serializer.Encode(data)
67-
}
68-
6961
// deserialize decodes an encoded string directly into a typed variable.
7062
// For JSON serializer, this decodes directly into the target type, preserving type information.
7163
// For other serializers, it decodes into any and then type-asserts.

dbos/workflow.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ func (h *workflowHandle[R]) processOutcome(outcome workflowOutcome[R]) (R, error
217217
if dbosCtx.serializer == nil {
218218
return *new(R), newWorkflowExecutionError(workflowState.workflowID, fmt.Errorf("no serializer configured in DBOSContext"))
219219
}
220-
encodedOutputStr, encErr := serialize(dbosCtx.serializer, typedResult)
220+
encodedOutputStr, encErr := dbosCtx.serializer.Encode(typedResult)
221221
if encErr != nil {
222222
return *new(R), newWorkflowExecutionError(workflowState.workflowID, fmt.Errorf("serializing child workflow result: %w", encErr))
223223
}
@@ -286,7 +286,7 @@ func (h *workflowPollingHandle[R]) GetResult(opts ...GetResultOption) (R, error)
286286
if dbosCtx.serializer == nil {
287287
return *new(R), newWorkflowExecutionError(workflowState.workflowID, fmt.Errorf("no serializer configured in DBOSContext"))
288288
}
289-
encodedOutputStr, encErr := serialize(dbosCtx.serializer, typedResult)
289+
encodedOutputStr, encErr := dbosCtx.serializer.Encode(typedResult)
290290
if encErr != nil {
291291
return *new(R), newWorkflowExecutionError(workflowState.workflowID, fmt.Errorf("serializing child workflow result: %w", encErr))
292292
}
@@ -876,7 +876,7 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
876876
}
877877

878878
// Serialize input before storing in workflow status
879-
encodedInputStr, serErr := serialize(c.serializer, input)
879+
encodedInputStr, serErr := c.serializer.Encode(input)
880880
if serErr != nil {
881881
return nil, newWorkflowExecutionError(workflowID, fmt.Errorf("failed to serialize workflow input: %w", serErr))
882882
}
@@ -1044,7 +1044,7 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
10441044
}
10451045

10461046
// Serialize the output before recording
1047-
encodedOutput, serErr := serialize(c.serializer, result)
1047+
encodedOutput, serErr := c.serializer.Encode(result)
10481048
if serErr != nil {
10491049
c.logger.Error("Failed to serialize workflow output", "workflow_id", workflowID, "error", serErr)
10501050
outcomeChan <- workflowOutcome[any]{result: nil, err: fmt.Errorf("failed to serialize output: %w", serErr)}
@@ -1343,7 +1343,7 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
13431343
}
13441344

13451345
// Serialize step output before recording
1346-
encodedStepOutput, serErr := serialize(c.serializer, stepOutput)
1346+
encodedStepOutput, serErr := c.serializer.Encode(stepOutput)
13471347
if serErr != nil {
13481348
return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Errorf("failed to serialize step output: %w", serErr))
13491349
}
@@ -1372,7 +1372,7 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
13721372

13731373
func (c *dbosContext) Send(_ DBOSContext, destinationID string, message any, topic string) error {
13741374
// Serialize the message before sending
1375-
encodedMessageStr, err := serialize(c.serializer, message)
1375+
encodedMessageStr, err := c.serializer.Encode(message)
13761376
if err != nil {
13771377
return fmt.Errorf("failed to serialize message: %w", err)
13781378
}
@@ -1483,7 +1483,7 @@ func Recv[T any](ctx DBOSContext, topic string, timeout time.Duration) (T, error
14831483

14841484
func (c *dbosContext) SetEvent(_ DBOSContext, key string, message any) error {
14851485
// Serialize the event value before storing
1486-
encodedMessageStr, err := serialize(c.serializer, message)
1486+
encodedMessageStr, err := c.serializer.Encode(message)
14871487
if err != nil {
14881488
return fmt.Errorf("failed to serialize event value: %w", err)
14891489
}

0 commit comments

Comments
 (0)