Skip to content

Commit c4f0c60

Browse files
committed
remove register functions from interface -- simply does nothing if we do not have the right concrete type
1 parent e1eb063 commit c4f0c60

File tree

2 files changed

+46
-33
lines changed

2 files changed

+46
-33
lines changed

dbos/dbos.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,6 @@ type DBOSContext interface {
6767
Launch() error
6868
Shutdown()
6969

70-
// Workflow registration
71-
RegisterWorkflow(fqn string, fn WrappedWorkflowFunc, maxRetries int)
72-
RegisterScheduledWorkflow(fqn string, fn WrappedWorkflowFunc, cronSchedule string, maxRetries int)
73-
7470
// Workflow operations
7571
RunAsStep(_ DBOSContext, fn StepFunc, input ...any) (any, error)
7672
RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error)
@@ -93,6 +89,8 @@ type DBOSContext interface {
9389
type dbosContext struct {
9490
ctx context.Context
9591

92+
launched bool
93+
9694
systemDB SystemDatabase
9795
adminServer *adminServer
9896
config *Config
@@ -224,6 +222,10 @@ func NewDBOSContext(inputConfig Config) (DBOSContext, error) {
224222
}
225223

226224
func (c *dbosContext) Launch() error {
225+
if c.launched {
226+
return newInitializationError("DBOS is already launched")
227+
}
228+
227229
// Start the system database
228230
c.systemDB.Launch(context.Background())
229231

@@ -269,6 +271,7 @@ func (c *dbosContext) Launch() error {
269271
}
270272

271273
logger.Info("DBOS initialized", "app_version", c.applicationVersion, "executor_id", c.executorID)
274+
c.launched = true
272275
return nil
273276
}
274277

dbos/workflow.go

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -208,27 +208,41 @@ type workflowRegistryEntry struct {
208208
}
209209

210210
// Register adds a workflow function to the registry (thread-safe, only once per name)
211-
func (c *dbosContext) RegisterWorkflow(fqn string, fn WrappedWorkflowFunc, maxRetries int) {
211+
func registerWorkflow(dbosCtx DBOSContext, workflowName string, fn WrappedWorkflowFunc, maxRetries int) {
212+
// Skip if we don't have a concrete dbosContext
213+
c, ok := dbosCtx.(*dbosContext)
214+
if !ok {
215+
return
216+
}
217+
212218
c.workflowRegMutex.Lock()
213219
defer c.workflowRegMutex.Unlock()
214220

215-
if _, exists := c.workflowRegistry[fqn]; exists {
216-
getLogger().Error("workflow function already registered", "fqn", fqn)
217-
panic(newConflictingRegistrationError(fqn))
221+
if _, exists := c.workflowRegistry[workflowName]; exists {
222+
getLogger().Error("workflow function already registered", "fqn", workflowName)
223+
panic(newConflictingRegistrationError(workflowName))
218224
}
219225

220-
c.workflowRegistry[fqn] = workflowRegistryEntry{
226+
fmt.Println("registering workflow", "fqn", workflowName, "max_retries", maxRetries)
227+
228+
c.workflowRegistry[workflowName] = workflowRegistryEntry{
221229
wrappedFunction: fn,
222230
maxRetries: maxRetries,
223231
}
224232
}
225233

226-
func (c *dbosContext) RegisterScheduledWorkflow(fqn string, fn WrappedWorkflowFunc, cronSchedule string, maxRetries int) {
234+
func registerScheduledWorkflow(dbosCtx DBOSContext, workflowName string, fn WrappedWorkflowFunc, cronSchedule string, maxRetries int) {
235+
// Skip if we don't have a concrete dbosContext
236+
c, ok := dbosCtx.(*dbosContext)
237+
if !ok {
238+
return
239+
}
240+
227241
c.getWorkflowScheduler().Start()
228242
var entryID cron.EntryID
229243
entryID, err := c.getWorkflowScheduler().AddFunc(cronSchedule, func() {
230244
// Execute the workflow on the cron schedule once DBOS is launched
231-
if c == nil {
245+
if !c.launched {
232246
return
233247
}
234248
// Get the scheduled time from the cron entry
@@ -238,19 +252,19 @@ func (c *dbosContext) RegisterScheduledWorkflow(fqn string, fn WrappedWorkflowFu
238252
// Use Next if Prev is not set, which will only happen for the first run
239253
scheduledTime = entry.Next
240254
}
241-
wfID := fmt.Sprintf("sched-%s-%s", fqn, scheduledTime) // XXX we can rethink the format
255+
wfID := fmt.Sprintf("sched-%s-%s", workflowName, scheduledTime) // XXX we can rethink the format
242256
fn(c, scheduledTime, WithWorkflowID(wfID), WithQueue(_DBOS_INTERNAL_QUEUE_NAME))
243257
})
244258
if err != nil {
245259
panic(fmt.Sprintf("failed to register scheduled workflow: %v", err))
246260
}
247-
getLogger().Info("Registered scheduled workflow", "fqn", fqn, "cron_schedule", cronSchedule)
261+
getLogger().Info("Registered scheduled workflow", "fqn", workflowName, "cron_schedule", cronSchedule)
248262
}
249263

250264
type workflowRegistrationParams struct {
251265
cronSchedule string
252266
maxRetries int
253-
// Likely we will allow a name here
267+
workflowName string
254268
}
255269

256270
type workflowRegistrationOption func(*workflowRegistrationParams)
@@ -271,27 +285,34 @@ func WithSchedule(schedule string) workflowRegistrationOption {
271285
}
272286
}
273287

288+
func WithWorkflowName(name string) workflowRegistrationOption {
289+
return func(p *workflowRegistrationParams) {
290+
p.workflowName = name
291+
}
292+
}
293+
274294
// RegisterWorkflow registers the provided function as a durable workflow with the provided DBOSContext workflow registry
275295
// If the workflow is a scheduled workflow (determined by the presence of a cron schedule), it will also register a cron job to execute it
276296
// RegisterWorkflow is generically typed, allowing us to register the workflow input and output types for gob encoding
277297
// The registered workflow is wrapped in a typed-erased wrapper which performs runtime type checks and conversions
278298
// To execute the workflow, use DBOSContext.RunAsWorkflow
279-
func RegisterWorkflow[P any, R any](dbosCtx DBOSContext, fn GenericWorkflowFunc[P, R], opts ...workflowRegistrationOption) {
299+
func RegisterWorkflow[P any, R any](dbosCtx DBOSContext, workflowName string, fn GenericWorkflowFunc[P, R], opts ...workflowRegistrationOption) {
280300
if dbosCtx == nil {
281301
panic("dbosCtx cannot be nil")
282302
}
283303

284304
registrationParams := workflowRegistrationParams{
285-
maxRetries: _DEFAULT_MAX_RECOVERY_ATTEMPTS,
305+
maxRetries: _DEFAULT_MAX_RECOVERY_ATTEMPTS,
306+
workflowName: runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name(),
286307
}
308+
287309
for _, opt := range opts {
288310
opt(&registrationParams)
289311
}
290312

291313
if fn == nil {
292314
panic("workflow function cannot be nil")
293315
}
294-
fqn := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
295316

296317
// Registry the input/output types for gob encoding
297318
var p P
@@ -300,10 +321,10 @@ func RegisterWorkflow[P any, R any](dbosCtx DBOSContext, fn GenericWorkflowFunc[
300321
gob.Register(r)
301322

302323
// Register a type-erased version of the durable workflow for recovery
303-
typeErasedWrapper := func(ctx DBOSContext, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) {
324+
typeErasedWrapper := WrappedWorkflowFunc(func(ctx DBOSContext, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) {
304325
typedInput, ok := input.(P)
305326
if !ok {
306-
return nil, newWorkflowUnexpectedInputType(fqn, fmt.Sprintf("%T", typedInput), fmt.Sprintf("%T", input))
327+
return nil, newWorkflowUnexpectedInputType(workflowName, fmt.Sprintf("%T", typedInput), fmt.Sprintf("%T", input))
307328
}
308329

309330
opts = append(opts, WithWorkflowMaxRetries(registrationParams.maxRetries))
@@ -312,15 +333,15 @@ func RegisterWorkflow[P any, R any](dbosCtx DBOSContext, fn GenericWorkflowFunc[
312333
return nil, err
313334
}
314335
return &workflowPollingHandle[any]{workflowID: handle.GetWorkflowID(), dbosContext: ctx}, nil
315-
}
316-
dbosCtx.RegisterWorkflow(fqn, typeErasedWrapper, registrationParams.maxRetries)
336+
})
337+
registerWorkflow(dbosCtx, registrationParams.workflowName, typeErasedWrapper, registrationParams.maxRetries)
317338

318339
// If this is a scheduled workflow, register a cron job
319340
if registrationParams.cronSchedule != "" {
320341
if reflect.TypeOf(p) != reflect.TypeOf(time.Time{}) {
321342
panic(fmt.Sprintf("scheduled workflow function must accept a time.Time as input, got %T", p))
322343
}
323-
dbosCtx.RegisterScheduledWorkflow(fqn, typeErasedWrapper, registrationParams.cronSchedule, registrationParams.maxRetries)
344+
registerScheduledWorkflow(dbosCtx, registrationParams.workflowName, typeErasedWrapper, registrationParams.cronSchedule, registrationParams.maxRetries)
324345
}
325346
}
326347

@@ -383,18 +404,7 @@ func WithWorkflowMaxRetries(maxRetries int) WorkflowOption {
383404
}
384405
}
385406

386-
func WithWorkflowName(name string) WorkflowOption {
387-
return func(p *workflowParams) {
388-
if len(p.workflowName) == 0 {
389-
p.workflowName = name
390-
}
391-
}
392-
}
393-
394407
func RunAsWorkflow[P any, R any](dbosCtx DBOSContext, fn GenericWorkflowFunc[P, R], input P, opts ...WorkflowOption) (WorkflowHandle[R], error) {
395-
// Set the workflow name in the options -- will not be applied if the user provided a name
396-
opts = append(opts, WithWorkflowName(runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()))
397-
398408
typedErasedWorkflow := WorkflowFunc(func(ctx DBOSContext, input any) (any, error) {
399409
return fn(ctx, input.(P))
400410
})

0 commit comments

Comments
 (0)