Skip to content

Commit 038174e

Browse files
committed
record runtime type errors in the typed erased wrapped workflow function
1 parent 8f37ec3 commit 038174e

File tree

3 files changed

+16
-11
lines changed

3 files changed

+16
-11
lines changed

dbos/queues_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ func TestQueueRecovery(t *testing.T) {
439439
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after global concurrency test")
440440
}
441441

442-
// TODO: we can update this test to have the same logic than TestWorkerConcurrency
442+
// Note: we could update this test to have the same logic than TestWorkerConcurrency
443443
func TestGlobalConcurrency(t *testing.T) {
444444
dbosCtx := setupDBOS(t, true, true)
445445

dbos/system_database.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1402,7 +1402,8 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) {
14021402
return
14031403
}
14041404

1405-
// Other errors - log and retry. XXX eventually add exponential backoff + jitter
1405+
// Other errors - log and retry.
1406+
// TODO add exponential backoff + jitter
14061407
s.logger.Error("Error waiting for notification", "error", err)
14071408
time.Sleep(_DB_CONNECTION_RETRY_DELAY)
14081409
continue

dbos/workflow.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -392,24 +392,28 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R
392392

393393
// Register a type-erased version of the durable workflow for recovery
394394
typedErasedWorkflow := WorkflowFunc(func(ctx DBOSContext, input any) (any, error) {
395-
// This type check is redundant with the one in the wrapper, but I'd better be safe than sorry
396395
typedInput, ok := input.(P)
397396
if !ok {
398-
// FIXME: we need to record the error in the database here
397+
wfID, err := ctx.GetWorkflowID()
398+
if err != nil {
399+
return nil, fmt.Errorf("failed to get workflow ID: %w", err)
400+
}
401+
err = ctx.(*dbosContext).systemDB.updateWorkflowOutcome(WithoutCancel(ctx), updateWorkflowOutcomeDBInput{
402+
workflowID: wfID,
403+
status: WorkflowStatusError,
404+
err: newWorkflowUnexpectedInputType(fqn, fmt.Sprintf("%T", typedInput), fmt.Sprintf("%T", input)),
405+
})
406+
if err != nil {
407+
return nil, fmt.Errorf("failed to record unexpected input type error: %w", err)
408+
}
399409
return nil, newWorkflowUnexpectedInputType(fqn, fmt.Sprintf("%T", typedInput), fmt.Sprintf("%T", input))
400410
}
401411
return fn(ctx, typedInput)
402412
})
403413

404414
typeErasedWrapper := WrappedWorkflowFunc(func(ctx DBOSContext, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) {
405-
typedInput, ok := input.(P)
406-
if !ok {
407-
// FIXME: we need to record the error in the database here
408-
return nil, newWorkflowUnexpectedInputType(fqn, fmt.Sprintf("%T", typedInput), fmt.Sprintf("%T", input))
409-
}
410-
411415
opts = append(opts, withWorkflowName(fqn)) // Append the name so ctx.RunAsWorkflow can look it up from the registry to apply registration-time options
412-
handle, err := ctx.RunAsWorkflow(ctx, typedErasedWorkflow, typedInput, opts...)
416+
handle, err := ctx.RunAsWorkflow(ctx, typedErasedWorkflow, input, opts...)
413417
if err != nil {
414418
return nil, err
415419
}

0 commit comments

Comments
 (0)