Skip to content

Commit cacb25c

Browse files
committed
allow custom workflow names
1 parent f81fff4 commit cacb25c

File tree

2 files changed

+89
-6
lines changed

2 files changed

+89
-6
lines changed

dbos/workflow.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -205,10 +205,11 @@ type WrappedWorkflowFunc func(ctx DBOSContext, input any, opts ...WorkflowOption
205205
type workflowRegistryEntry struct {
206206
wrappedFunction WrappedWorkflowFunc
207207
maxRetries int
208+
name string
208209
}
209210

210211
// Register adds a workflow function to the registry (thread-safe, only once per name)
211-
func registerWorkflow(ctx DBOSContext, workflowName string, fn WrappedWorkflowFunc, maxRetries int) {
212+
func registerWorkflow(ctx DBOSContext, workflowFQN string, fn WrappedWorkflowFunc, maxRetries int, customName string) {
212213
// Skip if we don't have a concrete dbosContext
213214
c, ok := ctx.(*dbosContext)
214215
if !ok {
@@ -222,14 +223,15 @@ func registerWorkflow(ctx DBOSContext, workflowName string, fn WrappedWorkflowFu
222223
c.workflowRegMutex.Lock()
223224
defer c.workflowRegMutex.Unlock()
224225

225-
if _, exists := c.workflowRegistry[workflowName]; exists {
226-
c.logger.Error("workflow function already registered", "fqn", workflowName)
227-
panic(newConflictingRegistrationError(workflowName))
226+
if _, exists := c.workflowRegistry[workflowFQN]; exists {
227+
c.logger.Error("workflow function already registered", "fqn", workflowFQN)
228+
panic(newConflictingRegistrationError(workflowFQN))
228229
}
229230

230-
c.workflowRegistry[workflowName] = workflowRegistryEntry{
231+
c.workflowRegistry[workflowFQN] = workflowRegistryEntry{
231232
wrappedFunction: fn,
232233
maxRetries: maxRetries,
234+
name: customName,
233235
}
234236
}
235237

@@ -275,6 +277,7 @@ func registerScheduledWorkflow(ctx DBOSContext, workflowName string, fn Workflow
275277
type workflowRegistrationParams struct {
276278
cronSchedule string
277279
maxRetries int
280+
name string
278281
}
279282

280283
type workflowRegistrationOption func(*workflowRegistrationParams)
@@ -295,6 +298,12 @@ func WithSchedule(schedule string) workflowRegistrationOption {
295298
}
296299
}
297300

301+
func WithWorkflowName(name string) workflowRegistrationOption {
302+
return func(p *workflowRegistrationParams) {
303+
p.name = name
304+
}
305+
}
306+
298307
// RegisterWorkflow registers the provided function as a durable workflow with the provided DBOSContext workflow registry
299308
// If the workflow is a scheduled workflow (determined by the presence of a cron schedule), it will also register a cron job to execute it
300309
// RegisterWorkflow is generically typed, providing compile-time type checking and allowing us to register the workflow input and output types for gob encoding
@@ -348,7 +357,7 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R
348357
}
349358
return &workflowPollingHandle[any]{workflowID: handle.GetWorkflowID(), dbosContext: ctx}, nil // this is only used by recovery and queue runner so far -- queue runner dismisses it
350359
})
351-
registerWorkflow(ctx, fqn, typeErasedWrapper, registrationParams.maxRetries)
360+
registerWorkflow(ctx, fqn, typeErasedWrapper, registrationParams.maxRetries, registrationParams.name)
352361

353362
// If this is a scheduled workflow, register a cron job
354363
if registrationParams.cronSchedule != "" {
@@ -398,6 +407,7 @@ func WithApplicationVersion(version string) WorkflowOption {
398407
}
399408
}
400409

410+
// An internal option we use to map the reflection function name to the registration options.
401411
func withWorkflowName(name string) WorkflowOption {
402412
return func(p *workflowParams) {
403413
p.workflowName = name
@@ -486,6 +496,11 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
486496
params.maxRetries = registeredWorkflow.maxRetries
487497
}
488498

499+
// Use the custom workflow name if it was provided during registration
500+
if registeredWorkflow.name != "" {
501+
params.workflowName = registeredWorkflow.name
502+
}
503+
489504
// Check if we are within a workflow (and thus a child workflow)
490505
parentWorkflowState, ok := c.Value(workflowStateKey).(*workflowState)
491506
isChildWorkflow := ok && parentWorkflowState != nil

dbos/workflows_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,74 @@ func TestWorkflowsRegistration(t *testing.T) {
300300
}
301301
})
302302
}
303+
304+
t.Run("DoubleRegistrationWithoutName", func(t *testing.T) {
305+
// Create a fresh DBOS context for this test
306+
freshCtx := setupDBOS(t, false, false) // Don't check for leaks and don't reset DB
307+
308+
// First registration should work
309+
RegisterWorkflow(freshCtx, simpleWorkflow)
310+
311+
// Second registration of the same workflow should panic with ConflictingRegistrationError
312+
defer func() {
313+
r := recover()
314+
if r == nil {
315+
t.Fatal("expected panic from double registration but got none")
316+
}
317+
dbosErr, ok := r.(*DBOSError)
318+
if !ok {
319+
t.Fatalf("expected panic to be *DBOSError, got %T", r)
320+
}
321+
if dbosErr.Code != ConflictingRegistrationError {
322+
t.Fatalf("expected ConflictingRegistrationError, got %v", dbosErr.Code)
323+
}
324+
}()
325+
RegisterWorkflow(freshCtx, simpleWorkflow)
326+
})
327+
328+
t.Run("DoubleRegistrationWithCustomName", func(t *testing.T) {
329+
// Create a fresh DBOS context for this test
330+
freshCtx := setupDBOS(t, false, false) // Don't check for leaks and don't reset DB
331+
332+
// First registration with custom name should work
333+
RegisterWorkflow(freshCtx, simpleWorkflow, WithWorkflowName("custom-workflow"))
334+
335+
// Second registration with same custom name should panic with ConflictingRegistrationError
336+
defer func() {
337+
r := recover()
338+
if r == nil {
339+
t.Fatal("expected panic from double registration with custom name but got none")
340+
}
341+
dbosErr, ok := r.(*DBOSError)
342+
if !ok {
343+
t.Fatalf("expected panic to be *DBOSError, got %T", r)
344+
}
345+
if dbosErr.Code != ConflictingRegistrationError {
346+
t.Fatalf("expected ConflictingRegistrationError, got %v", dbosErr.Code)
347+
}
348+
}()
349+
RegisterWorkflow(freshCtx, simpleWorkflow, WithWorkflowName("custom-workflow"))
350+
})
351+
352+
t.Run("RegisterAfterLaunchPanics", func(t *testing.T) {
353+
// Create a fresh DBOS context for this test
354+
freshCtx := setupDBOS(t, false, false) // Don't check for leaks and don't reset DB
355+
356+
// Launch DBOS context
357+
err := freshCtx.Launch()
358+
if err != nil {
359+
t.Fatalf("failed to launch DBOS context: %v", err)
360+
}
361+
defer freshCtx.Cancel()
362+
363+
// Attempting to register after launch should panic
364+
defer func() {
365+
if r := recover(); r == nil {
366+
t.Fatal("expected panic from registration after launch but got none")
367+
}
368+
}()
369+
RegisterWorkflow(freshCtx, simpleWorkflow)
370+
})
303371
}
304372

305373
func stepWithinAStep(ctx context.Context) (string, error) {

0 commit comments

Comments
 (0)