Skip to content

Commit 74e381d

Browse files
committed
use a sync.Map for workflow registry
1 parent dc036c9 commit 74e381d

File tree

4 files changed

+29
-24
lines changed

4 files changed

+29
-24
lines changed

dbos/dbos.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,8 @@ type dbosContext struct {
120120
// Wait group for workflow goroutines
121121
workflowsWg *sync.WaitGroup
122122

123-
// Workflow registry
124-
workflowRegistry map[string]workflowRegistryEntry
125-
workflowRegMutex *sync.RWMutex
123+
// Workflow registry - read-mostly sync.Map since registration happens only before launch
124+
workflowRegistry sync.Map // map[string]workflowRegistryEntry
126125
workflowCustomNametoFQN sync.Map // Maps fully qualified workflow names to custom names. Usefor when client enqueues a workflow by name because registry is indexed by FQN.
127126

128127
// Workflow scheduler
@@ -164,7 +163,6 @@ func WithValue(ctx DBOSContext, key, val any) DBOSContext {
164163
systemDB: dbosCtx.systemDB,
165164
workflowsWg: dbosCtx.workflowsWg,
166165
workflowRegistry: dbosCtx.workflowRegistry,
167-
workflowRegMutex: dbosCtx.workflowRegMutex,
168166
applicationVersion: dbosCtx.applicationVersion,
169167
executorID: dbosCtx.executorID,
170168
applicationID: dbosCtx.applicationID,
@@ -187,7 +185,6 @@ func WithoutCancel(ctx DBOSContext) DBOSContext {
187185
systemDB: dbosCtx.systemDB,
188186
workflowsWg: dbosCtx.workflowsWg,
189187
workflowRegistry: dbosCtx.workflowRegistry,
190-
workflowRegMutex: dbosCtx.workflowRegMutex,
191188
applicationVersion: dbosCtx.applicationVersion,
192189
executorID: dbosCtx.executorID,
193190
applicationID: dbosCtx.applicationID,
@@ -211,7 +208,6 @@ func WithTimeout(ctx DBOSContext, timeout time.Duration) (DBOSContext, context.C
211208
systemDB: dbosCtx.systemDB,
212209
workflowsWg: dbosCtx.workflowsWg,
213210
workflowRegistry: dbosCtx.workflowRegistry,
214-
workflowRegMutex: dbosCtx.workflowRegMutex,
215211
applicationVersion: dbosCtx.applicationVersion,
216212
executorID: dbosCtx.executorID,
217213
applicationID: dbosCtx.applicationID,
@@ -262,11 +258,9 @@ func (c *dbosContext) GetApplicationID() string {
262258
func NewDBOSContext(inputConfig Config) (DBOSContext, error) {
263259
ctx, cancelFunc := context.WithCancelCause(context.Background())
264260
initExecutor := &dbosContext{
265-
workflowsWg: &sync.WaitGroup{},
266-
ctx: ctx,
267-
ctxCancelFunc: cancelFunc,
268-
workflowRegistry: make(map[string]workflowRegistryEntry),
269-
workflowRegMutex: &sync.RWMutex{},
261+
workflowsWg: &sync.WaitGroup{},
262+
ctx: ctx,
263+
ctxCancelFunc: cancelFunc,
270264
}
271265

272266
// Load and process the configuration

dbos/queue.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,11 +210,16 @@ func (qr *queueRunner) run(ctx *dbosContext) {
210210
continue
211211
}
212212

213-
registeredWorkflow, exists := ctx.workflowRegistry[wfName.(string)]
213+
registeredWorkflowAny, exists := ctx.workflowRegistry.Load(wfName.(string))
214214
if !exists {
215215
ctx.logger.Error("workflow function not found in registry", "workflow_name", workflow.name)
216216
continue
217217
}
218+
registeredWorkflow, ok := registeredWorkflowAny.(workflowRegistryEntry)
219+
if !ok {
220+
ctx.logger.Error("invalid workflow registry entry type", "workflow_name", workflow.name)
221+
continue
222+
}
218223

219224
// Deserialize input
220225
var input any

dbos/recovery.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,16 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow
4343
continue
4444
}
4545

46-
registeredWorkflow, exists := ctx.workflowRegistry[wfName.(string)]
46+
registeredWorkflowAny, exists := ctx.workflowRegistry.Load(wfName.(string))
4747
if !exists {
4848
ctx.logger.Error("Workflow function not found in registry", "workflow_id", workflow.ID, "name", workflow.Name)
4949
continue
5050
}
51+
registeredWorkflow, ok := registeredWorkflowAny.(workflowRegistryEntry)
52+
if !ok {
53+
ctx.logger.Error("invalid workflow registry entry type", "workflow_id", workflow.ID, "name", workflow.Name)
54+
continue
55+
}
5156

5257
// Convert workflow parameters to options
5358
opts := []WorkflowOption{

dbos/workflow.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -234,20 +234,17 @@ func registerWorkflow(ctx DBOSContext, workflowFQN string, fn WrappedWorkflowFun
234234
panic("Cannot register workflow after DBOS has launched")
235235
}
236236

237-
c.workflowRegMutex.Lock()
238-
defer c.workflowRegMutex.Unlock()
239-
240-
if _, exists := c.workflowRegistry[workflowFQN]; exists {
241-
c.logger.Error("workflow function already registered", "fqn", workflowFQN)
242-
panic(newConflictingRegistrationError(workflowFQN))
243-
}
244-
245-
// We must keep the registry indexed by FQN (because RunAsWorkflow uses reflection to find the function name and uses that to look it up in the registry)
246-
c.workflowRegistry[workflowFQN] = workflowRegistryEntry{
237+
// Check if workflow already exists and store atomically using LoadOrStore
238+
entry := workflowRegistryEntry{
247239
wrappedFunction: fn,
248240
maxRetries: maxRetries,
249241
name: customName,
250242
}
243+
244+
if _, exists := c.workflowRegistry.LoadOrStore(workflowFQN, entry); exists {
245+
c.logger.Error("workflow function already registered", "fqn", workflowFQN)
246+
panic(newConflictingRegistrationError(workflowFQN))
247+
}
251248

252249
// We need to get a mapping from custom name to FQN for registry lookups that might not know the FQN (queue, recovery)
253250
if len(customName) > 0 {
@@ -591,10 +588,14 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
591588
}
592589

593590
// Lookup the registry for registration-time options
594-
registeredWorkflow, exists := c.workflowRegistry[params.workflowName]
591+
registeredWorkflowAny, exists := c.workflowRegistry.Load(params.workflowName)
595592
if !exists {
596593
return nil, newNonExistentWorkflowError(params.workflowName)
597594
}
595+
registeredWorkflow, ok := registeredWorkflowAny.(workflowRegistryEntry)
596+
if !ok {
597+
return nil, fmt.Errorf("invalid workflow registry entry type for workflow %s", params.workflowName)
598+
}
598599
if registeredWorkflow.maxRetries > 0 {
599600
params.maxRetries = registeredWorkflow.maxRetries
600601
}

0 commit comments

Comments
 (0)