Skip to content

Commit 5443e4f

Browse files
committed
nits
1 parent de6488a commit 5443e4f

File tree

3 files changed

+10
-23
lines changed

3 files changed

+10
-23
lines changed

dbos/serialization.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ func newGobSerializer[T any]() serializer[T] {
5555
}
5656

5757
func (g *gobSerializer[T]) Encode(data T) (string, error) {
58-
// Check if data is nil (for pointer types, slice, map, interface, chan, func)
5958
if isNilValue(data) {
6059
// For nil values, encode an empty byte slice directly to base64
6160
return base64.StdEncoding.EncodeToString([]byte{}), nil
@@ -100,8 +99,7 @@ func (g *gobSerializer[T]) Decode(data *string) (T, error) {
10099

101100
// Register type T before decoding
102101
// This is required on the recovery path, where the process might not have been doing the encode/registering.
103-
// Note wee do not support interface types in workflows/steps
104-
// This will panic if T is an non-registered interface type
102+
// This will panic if T is an non-registered interface type (which is not supported)
105103
if tType != nil && tType.Kind() != reflect.Interface {
106104
safeGobRegister(zero)
107105
}

dbos/system_database.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type systemDatabase interface {
5757
send(ctx context.Context, input WorkflowSendInput) error
5858
recv(ctx context.Context, input recvInput) (*string, error)
5959
setEvent(ctx context.Context, input WorkflowSetEventInput) error
60-
getEvent(ctx context.Context, input getEventInput) (any, error)
60+
getEvent(ctx context.Context, input getEventInput) (*string, error)
6161

6262
// Timers (special steps)
6363
sleep(ctx context.Context, input sleepInput) (time.Duration, error)
@@ -2016,7 +2016,7 @@ func (s *sysDB) setEvent(ctx context.Context, input WorkflowSetEventInput) error
20162016
return nil
20172017
}
20182018

2019-
func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error) {
2019+
func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (*string, error) {
20202020
functionName := "DBOS.getEvent"
20212021

20222022
// Get workflow state from context (optional for GetEvent as we can get an event from outside a workflow)

dbos/workflow.go

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ func (h *workflowHandle[R]) GetResult(opts ...GetResultOption) (R, error) {
205205

206206
// processOutcome handles the common logic for processing workflow outcomes
207207
func (h *workflowHandle[R]) processOutcome(outcome workflowOutcome[R]) (R, error) {
208-
typedResult := outcome.result
208+
decodedResult := outcome.result
209209
// If we are calling GetResult inside a workflow, record the result as a step result
210210
workflowState, ok := h.dbosContext.Value(workflowStateKey).(*workflowState)
211211
isWithinWorkflow := ok && workflowState != nil
@@ -214,7 +214,7 @@ func (h *workflowHandle[R]) processOutcome(outcome workflowOutcome[R]) (R, error
214214
return *new(R), newWorkflowExecutionError(workflowState.workflowID, fmt.Errorf("invalid DBOSContext: expected *dbosContext"))
215215
}
216216
serializer := newGobSerializer[R]()
217-
encodedOutput, encErr := serializer.Encode(typedResult)
217+
encodedOutput, encErr := serializer.Encode(decodedResult)
218218
if encErr != nil {
219219
return *new(R), newWorkflowExecutionError(workflowState.workflowID, fmt.Errorf("serializing child workflow result: %w", encErr))
220220
}
@@ -233,7 +233,7 @@ func (h *workflowHandle[R]) processOutcome(outcome workflowOutcome[R]) (R, error
233233
return *new(R), newWorkflowExecutionError(workflowState.workflowID, fmt.Errorf("recording child workflow result: %w", recordResultErr))
234234
}
235235
}
236-
return typedResult, outcome.err
236+
return decodedResult, outcome.err
237237
}
238238

239239
type workflowPollingHandle[R any] struct {
@@ -262,7 +262,7 @@ func (h *workflowPollingHandle[R]) GetResult(opts ...GetResultOption) (R, error)
262262
var typedResult R
263263
if encodedResult != nil {
264264
encodedStr, ok := encodedResult.(*string)
265-
if !ok {
265+
if !ok { // Should never happen
266266
return *new(R), newWorkflowUnexpectedResultType(h.workflowID, "string (encoded)", fmt.Sprintf("%T", encodedResult))
267267
}
268268
serializer := newGobSerializer[R]()
@@ -517,7 +517,7 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], opts ...
517517
var p P
518518

519519
// Register a type-erased version of the durable workflow for recovery and queue runner
520-
// Input will always be encoded as *string, so we decode it into the target type (captured by this wrapped closure)
520+
// Input will always come from the database and encoded as *string, so we decode it into the target type (captured by this wrapped closure)
521521
typedErasedWorkflow := WorkflowFunc(func(ctx DBOSContext, input any) (any, error) {
522522
workflowID, err := GetWorkflowID(ctx)
523523
if err != nil {
@@ -681,10 +681,6 @@ func RunWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], input P, opts
681681
opts = append(opts, withWorkflowName(runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()))
682682

683683
typedErasedWorkflow := WorkflowFunc(func(ctx DBOSContext, input any) (any, error) {
684-
if input == nil {
685-
var zero P
686-
return fn(ctx, zero)
687-
}
688684
return fn(ctx, input.(P))
689685
})
690686

@@ -1006,13 +1002,6 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
10061002
encodedResult, err = retryWithResult(c, func() (any, error) {
10071003
return c.systemDB.awaitWorkflowResult(uncancellableCtx, workflowID)
10081004
}, withRetrierLogger(c.logger))
1009-
encodedResult, ok := encodedResult.(*string)
1010-
if !ok {
1011-
c.logger.Error("Unexpected result type when awaiting workflow result after ID conflict", "workflow_id", workflowID, "type", fmt.Sprintf("%T", encodedResult))
1012-
outcomeChan <- workflowOutcome[any]{result: nil, err: fmt.Errorf("unexpected result type when awaiting workflow result after ID conflict: expected *string, got %T", encodedResult)}
1013-
close(outcomeChan)
1014-
return
1015-
}
10161005
// Keep the encoded result - decoding will happen in RunWorkflow[P,R] when we know the target type
10171006
outcomeChan <- workflowOutcome[any]{result: encodedResult, err: err, needsDecoding: true}
10181007
close(outcomeChan)
@@ -1052,12 +1041,12 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
10521041
}, withRetrierLogger(c.logger))
10531042
if recordErr != nil {
10541043
c.logger.Error("Error recording workflow outcome", "workflow_id", workflowID, "error", recordErr)
1055-
outcomeChan <- workflowOutcome[any]{result: nil, err: recordErr, needsDecoding: false}
1044+
outcomeChan <- workflowOutcome[any]{result: nil, err: recordErr}
10561045
close(outcomeChan)
10571046
return
10581047
}
10591048
}
1060-
outcomeChan <- workflowOutcome[any]{result: result, err: err, needsDecoding: false}
1049+
outcomeChan <- workflowOutcome[any]{result: result, err: err}
10611050
close(outcomeChan)
10621051
}()
10631052

0 commit comments

Comments
 (0)