Skip to content

Commit 2d41d0c

Browse files
committed
list workflow registration
1 parent fe30608 commit 2d41d0c

File tree

3 files changed

+284
-7
lines changed

3 files changed

+284
-7
lines changed

dbos/dbos.go

Lines changed: 88 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ type DBOSContext interface {
127127
ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step
128128
ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria
129129
GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) // Get the execution steps of a workflow
130+
ListRegisteredWorkflows(_ DBOSContext) ([]RegisteredWorkflowInfo, error) // List all registered workflows with their registration parameters
131+
ListScheduledWorkflows(_ DBOSContext) ([]ScheduledWorkflowInfo, error) // List all registered scheduled workflows with their registration parameters
130132

131133
// Accessors
132134
GetApplicationVersion() string // Get the application version for this context
@@ -159,8 +161,9 @@ type dbosContext struct {
159161
workflowsWg *sync.WaitGroup
160162

161163
// Workflow registry - read-mostly sync.Map since registration happens only before launch
162-
workflowRegistry *sync.Map // map[string]workflowRegistryEntry
163-
workflowCustomNametoFQN *sync.Map // Maps fully qualified workflow names to custom names. Usefor when client enqueues a workflow by name because registry is indexed by FQN.
164+
workflowRegistry *sync.Map // map[string]workflowRegistryEntry
165+
workflowCustomNametoFQN *sync.Map // Maps fully qualified workflow names to custom names. Usefor when client enqueues a workflow by name because registry is indexed by FQN.
166+
scheduledWorkflowRegistry *sync.Map // map[string]string - Maps workflow FQN to cron schedule string
164167

165168
// Workflow scheduler
166169
workflowScheduler *cron.Cron
@@ -275,6 +278,83 @@ func (c *dbosContext) GetApplicationID() string {
275278
return c.applicationID
276279
}
277280

281+
// RegisteredWorkflowInfo contains information about a registered workflow
282+
type RegisteredWorkflowInfo struct {
283+
FQN string // Fully qualified name of the workflow function
284+
CustomName string // Custom name if provided during registration
285+
MaxRetries int // Maximum retry attempts for workflow recovery
286+
}
287+
288+
// listRegisteredWorkflows returns information about all registered workflows
289+
func (c *dbosContext) listRegisteredWorkflows() []RegisteredWorkflowInfo {
290+
var workflows []RegisteredWorkflowInfo
291+
292+
c.workflowRegistry.Range(func(key, value interface{}) bool {
293+
fqn := key.(string)
294+
entry := value.(workflowRegistryEntry)
295+
296+
workflows = append(workflows, RegisteredWorkflowInfo{
297+
FQN: fqn,
298+
CustomName: entry.name,
299+
MaxRetries: entry.maxRetries,
300+
})
301+
return true
302+
})
303+
304+
return workflows
305+
}
306+
307+
// ScheduledWorkflowInfo contains information about a registered scheduled workflow
308+
type ScheduledWorkflowInfo struct {
309+
FQN string // Fully qualified name of the workflow function
310+
CustomName string // Custom name if provided during registration
311+
MaxRetries int // Maximum retry attempts for workflow recovery
312+
CronSchedule string // Cron schedule string
313+
}
314+
315+
// listScheduledWorkflows returns information about all registered scheduled workflows
316+
func (c *dbosContext) listScheduledWorkflows() []ScheduledWorkflowInfo {
317+
var scheduledWorkflows []ScheduledWorkflowInfo
318+
319+
// Get all registered workflows first
320+
registeredWorkflows := c.listRegisteredWorkflows()
321+
322+
// Create a map of FQN to workflow info for quick lookup
323+
workflowMap := make(map[string]RegisteredWorkflowInfo)
324+
for _, workflow := range registeredWorkflows {
325+
workflowMap[workflow.FQN] = workflow
326+
}
327+
328+
// Check which ones are scheduled by examining the scheduled workflow registry
329+
c.scheduledWorkflowRegistry.Range(func(key, value interface{}) bool {
330+
workflowFQN := key.(string)
331+
cronSchedule := value.(string)
332+
333+
// Find the corresponding workflow info
334+
if workflow, exists := workflowMap[workflowFQN]; exists {
335+
scheduledWorkflows = append(scheduledWorkflows, ScheduledWorkflowInfo{
336+
FQN: workflow.FQN,
337+
CustomName: workflow.CustomName,
338+
MaxRetries: workflow.MaxRetries,
339+
CronSchedule: cronSchedule,
340+
})
341+
}
342+
return true
343+
})
344+
345+
return scheduledWorkflows
346+
}
347+
348+
// ListRegisteredWorkflows returns information about all registered workflows with their registration parameters.
349+
func (c *dbosContext) ListRegisteredWorkflows(_ DBOSContext) ([]RegisteredWorkflowInfo, error) {
350+
return c.listRegisteredWorkflows(), nil
351+
}
352+
353+
// ListScheduledWorkflows returns information about all registered scheduled workflows with their registration parameters.
354+
func (c *dbosContext) ListScheduledWorkflows(_ DBOSContext) ([]ScheduledWorkflowInfo, error) {
355+
return c.listScheduledWorkflows(), nil
356+
}
357+
278358
// NewDBOSContext creates a new DBOS context with the provided configuration.
279359
// The context must be launched with Launch() for workflow execution and should be shut down with Shutdown().
280360
// This function initializes the DBOS system database, sets up the queue sub-system, and prepares the workflow registry.
@@ -297,11 +377,12 @@ func (c *dbosContext) GetApplicationID() string {
297377
func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error) {
298378
dbosBaseCtx, cancelFunc := context.WithCancelCause(ctx)
299379
initExecutor := &dbosContext{
300-
workflowsWg: &sync.WaitGroup{},
301-
ctx: dbosBaseCtx,
302-
ctxCancelFunc: cancelFunc,
303-
workflowRegistry: &sync.Map{},
304-
workflowCustomNametoFQN: &sync.Map{},
380+
workflowsWg: &sync.WaitGroup{},
381+
ctx: dbosBaseCtx,
382+
ctxCancelFunc: cancelFunc,
383+
workflowRegistry: &sync.Map{},
384+
workflowCustomNametoFQN: &sync.Map{},
385+
scheduledWorkflowRegistry: &sync.Map{},
305386
}
306387

307388
// Load and process the configuration

dbos/workflow.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,9 @@ func registerScheduledWorkflow(ctx DBOSContext, workflowName string, fn Workflow
321321
panic("Cannot register scheduled workflow after DBOS has launched")
322322
}
323323

324+
// Store the schedule information in the scheduled workflow registry
325+
c.scheduledWorkflowRegistry.Store(workflowName, cronSchedule)
326+
324327
c.getWorkflowScheduler().Start()
325328
var entryID cron.EntryID
326329
entryID, err := c.getWorkflowScheduler().AddFunc(cronSchedule, func() {
@@ -1921,3 +1924,43 @@ func GetWorkflowSteps(ctx DBOSContext, workflowID string) ([]StepInfo, error) {
19211924
}
19221925
return ctx.GetWorkflowSteps(ctx, workflowID)
19231926
}
1927+
1928+
// ListRegisteredWorkflows returns information about all registered workflows with their registration parameters.
1929+
// This includes the fully qualified name, custom name (if provided), and maximum retry attempts.
1930+
//
1931+
// Example:
1932+
//
1933+
// workflows, err := dbos.ListRegisteredWorkflows(ctx)
1934+
// if err != nil {
1935+
// log.Fatal(err)
1936+
// }
1937+
// for _, workflow := range workflows {
1938+
// log.Printf("Workflow: %s (Custom: %s, MaxRetries: %d)",
1939+
// workflow.FQN, workflow.CustomName, workflow.MaxRetries)
1940+
// }
1941+
func ListRegisteredWorkflows(ctx DBOSContext) ([]RegisteredWorkflowInfo, error) {
1942+
if ctx == nil {
1943+
return nil, errors.New("ctx cannot be nil")
1944+
}
1945+
return ctx.ListRegisteredWorkflows(ctx)
1946+
}
1947+
1948+
// ListScheduledWorkflows returns information about all registered scheduled workflows with their registration parameters.
1949+
// This includes the fully qualified name, custom name (if provided), maximum retry attempts, and cron schedule.
1950+
//
1951+
// Example:
1952+
//
1953+
// scheduledWorkflows, err := dbos.ListScheduledWorkflows(ctx)
1954+
// if err != nil {
1955+
// log.Fatal(err)
1956+
// }
1957+
// for _, workflow := range scheduledWorkflows {
1958+
// log.Printf("Scheduled Workflow: %s (Schedule: %s, MaxRetries: %d)",
1959+
// workflow.FQN, workflow.CronSchedule, workflow.MaxRetries)
1960+
// }
1961+
func ListScheduledWorkflows(ctx DBOSContext) ([]ScheduledWorkflowInfo, error) {
1962+
if ctx == nil {
1963+
return nil, errors.New("ctx cannot be nil")
1964+
}
1965+
return ctx.ListScheduledWorkflows(ctx)
1966+
}

dbos/workflows_test.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4131,3 +4131,156 @@ func TestSpecialSteps(t *testing.T) {
41314131
require.Equal(t, "success", result, "workflow should return success")
41324132
})
41334133
}
4134+
4135+
func TestRegisteredWorkflowListing(t *testing.T) {
4136+
dbosCtx := setupDBOS(t, true, true)
4137+
4138+
// Register some regular workflows
4139+
RegisterWorkflow(dbosCtx, simpleWorkflow)
4140+
RegisterWorkflow(dbosCtx, simpleWorkflowError, WithMaxRetries(5))
4141+
RegisterWorkflow(dbosCtx, simpleWorkflowWithStep, WithWorkflowName("CustomStepWorkflow"))
4142+
4143+
// Register some scheduled workflows
4144+
RegisterWorkflow(dbosCtx, func(ctx DBOSContext, scheduledTime time.Time) (string, error) {
4145+
return "scheduled", nil
4146+
}, WithSchedule("0 0 * * * *"), WithMaxRetries(3), WithWorkflowName("DailyWorkflow"))
4147+
4148+
RegisterWorkflow(dbosCtx, func(ctx DBOSContext, scheduledTime time.Time) (string, error) {
4149+
return "hourly", nil
4150+
}, WithSchedule("0 0 * * * *"))
4151+
4152+
// Stop the cron scheduler to prevent goroutine leaks in tests
4153+
// The scheduler is started when scheduled workflows are registered
4154+
if dbosCtx.(*dbosContext).workflowScheduler != nil {
4155+
ctx := dbosCtx.(*dbosContext).workflowScheduler.Stop()
4156+
<-ctx.Done() // Wait for it to stop
4157+
dbosCtx.(*dbosContext).workflowScheduler = nil
4158+
}
4159+
4160+
t.Run("ListRegisteredWorkflows", func(t *testing.T) {
4161+
workflows := dbosCtx.(*dbosContext).listRegisteredWorkflows()
4162+
4163+
// Should have at least 5 workflows (3 regular + 2 scheduled)
4164+
require.GreaterOrEqual(t, len(workflows), 5, "Should have at least 5 registered workflows")
4165+
4166+
// Create a map for easier lookup
4167+
workflowMap := make(map[string]RegisteredWorkflowInfo)
4168+
for _, wf := range workflows {
4169+
workflowMap[wf.FQN] = wf
4170+
}
4171+
4172+
// Check that simpleWorkflow is registered
4173+
simpleWorkflowFQN := runtime.FuncForPC(reflect.ValueOf(simpleWorkflow).Pointer()).Name()
4174+
simpleWf, exists := workflowMap[simpleWorkflowFQN]
4175+
require.True(t, exists, "simpleWorkflow should be registered")
4176+
require.Equal(t, _DEFAULT_MAX_RECOVERY_ATTEMPTS, simpleWf.MaxRetries, "simpleWorkflow should have default max retries")
4177+
require.Empty(t, simpleWf.CustomName, "simpleWorkflow should not have custom name")
4178+
4179+
// Check that simpleWorkflowError is registered with custom max retries
4180+
simpleWorkflowErrorFQN := runtime.FuncForPC(reflect.ValueOf(simpleWorkflowError).Pointer()).Name()
4181+
errorWf, exists := workflowMap[simpleWorkflowErrorFQN]
4182+
require.True(t, exists, "simpleWorkflowError should be registered")
4183+
require.Equal(t, 5, errorWf.MaxRetries, "simpleWorkflowError should have custom max retries")
4184+
4185+
// Check that custom named workflow is registered
4186+
var customWf RegisteredWorkflowInfo
4187+
for _, wf := range workflows {
4188+
if wf.CustomName == "CustomStepWorkflow" {
4189+
customWf = wf
4190+
break
4191+
}
4192+
}
4193+
require.NotEmpty(t, customWf.FQN, "CustomStepWorkflow should be found")
4194+
require.Equal(t, "CustomStepWorkflow", customWf.CustomName, "Custom name should be preserved")
4195+
})
4196+
4197+
t.Run("ListRegisteredWorkflowsPackageFunction", func(t *testing.T) {
4198+
workflows, err := ListRegisteredWorkflows(dbosCtx)
4199+
require.NoError(t, err, "ListRegisteredWorkflows should not return an error")
4200+
require.GreaterOrEqual(t, len(workflows), 5, "Should have at least 5 registered workflows")
4201+
4202+
// Create a map for easier lookup
4203+
workflowMap := make(map[string]RegisteredWorkflowInfo)
4204+
for _, wf := range workflows {
4205+
workflowMap[wf.FQN] = wf
4206+
}
4207+
4208+
// Check that simpleWorkflow is registered
4209+
simpleWorkflowFQN := runtime.FuncForPC(reflect.ValueOf(simpleWorkflow).Pointer()).Name()
4210+
simpleWf, exists := workflowMap[simpleWorkflowFQN]
4211+
require.True(t, exists, "simpleWorkflow should be registered")
4212+
require.Equal(t, _DEFAULT_MAX_RECOVERY_ATTEMPTS, simpleWf.MaxRetries, "simpleWorkflow should have default max retries")
4213+
require.Empty(t, simpleWf.CustomName, "simpleWorkflow should not have custom name")
4214+
})
4215+
4216+
t.Run("ListScheduledWorkflows", func(t *testing.T) {
4217+
scheduledWorkflows := dbosCtx.(*dbosContext).listScheduledWorkflows()
4218+
4219+
// Should have exactly 2 scheduled workflows
4220+
require.Equal(t, 2, len(scheduledWorkflows), "Should have exactly 2 scheduled workflows")
4221+
4222+
// Create a map for easier lookup
4223+
scheduledMap := make(map[string]ScheduledWorkflowInfo)
4224+
for _, wf := range scheduledWorkflows {
4225+
scheduledMap[wf.FQN] = wf
4226+
}
4227+
4228+
// Check that both scheduled workflows have cron schedules
4229+
for _, wf := range scheduledWorkflows {
4230+
require.NotEmpty(t, wf.CronSchedule, "Scheduled workflow should have cron schedule")
4231+
require.Equal(t, "0 0 * * * *", wf.CronSchedule, "Both scheduled workflows should have the same schedule")
4232+
}
4233+
4234+
// Check that one has a custom name
4235+
var hasCustomName bool
4236+
for _, wf := range scheduledWorkflows {
4237+
if wf.CustomName == "DailyWorkflow" {
4238+
hasCustomName = true
4239+
require.Equal(t, 3, wf.MaxRetries, "DailyWorkflow should have custom max retries")
4240+
break
4241+
}
4242+
}
4243+
require.True(t, hasCustomName, "One scheduled workflow should have custom name")
4244+
})
4245+
4246+
t.Run("ListScheduledWorkflowsPackageFunction", func(t *testing.T) {
4247+
scheduledWorkflows, err := ListScheduledWorkflows(dbosCtx)
4248+
require.NoError(t, err, "ListScheduledWorkflows should not return an error")
4249+
require.Equal(t, 2, len(scheduledWorkflows), "Should have exactly 2 scheduled workflows")
4250+
4251+
// Create a map for easier lookup
4252+
scheduledMap := make(map[string]ScheduledWorkflowInfo)
4253+
for _, wf := range scheduledWorkflows {
4254+
scheduledMap[wf.FQN] = wf
4255+
}
4256+
4257+
// Check that both scheduled workflows have cron schedules
4258+
for _, wf := range scheduledWorkflows {
4259+
require.NotEmpty(t, wf.CronSchedule, "Scheduled workflow should have cron schedule")
4260+
require.Equal(t, "0 0 * * * *", wf.CronSchedule, "Both scheduled workflows should have the same schedule")
4261+
}
4262+
4263+
// Check that one has a custom name
4264+
var hasCustomName bool
4265+
for _, wf := range scheduledWorkflows {
4266+
if wf.CustomName == "DailyWorkflow" {
4267+
hasCustomName = true
4268+
require.Equal(t, 3, wf.MaxRetries, "DailyWorkflow should have custom max retries")
4269+
break
4270+
}
4271+
}
4272+
require.True(t, hasCustomName, "One scheduled workflow should have custom name")
4273+
})
4274+
4275+
t.Run("EmptyRegistry", func(t *testing.T) {
4276+
// Create a new context without any registered workflow
4277+
emptyCtx := setupDBOS(t, true, false)
4278+
4279+
workflows := emptyCtx.(*dbosContext).listRegisteredWorkflows()
4280+
require.Empty(t, workflows, "Empty context should have no registered workflows")
4281+
4282+
scheduledWorkflows := emptyCtx.(*dbosContext).listScheduledWorkflows()
4283+
require.Empty(t, scheduledWorkflows, "Empty context should have no scheduled workflows")
4284+
})
4285+
4286+
}

0 commit comments

Comments
 (0)