Skip to content
Closed
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 27 additions & 11 deletions dbos/dbos.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ type DBOSContext interface {
Shutdown(timeout time.Duration) // Gracefully shutdown all DBOS runtime components with ordered cleanup sequence

// Workflow operations
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution
Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow
Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow
SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow
GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow
Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery
GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows)
GetStepID() (int, error) // Get the current step ID (only available within workflows)
Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow
Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow
SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow
GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow
Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery
GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows)
GetStepID() (int, error) // Get the current step ID (only available within workflows)

// Workflow management
RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow
Expand Down Expand Up @@ -317,12 +317,28 @@ func NewDBOSContext(inputConfig Config) (DBOSContext, error) {
initExecutor.executorID = config.ExecutorID

initExecutor.applicationID = os.Getenv("DBOS__APPID")
maxRetries := 5
retryDelay := time.Second * 2

var systemDB systemDatabase

for attempt := 1; attempt <= maxRetries; attempt++ {
systemDB, err = newSystemDatabase(initExecutor, config.DatabaseURL, initExecutor.logger)
if err == nil {
break
}
initExecutor.logger.Warn("Failed to connect to system DB", "attempt", attempt, "maxRetries", maxRetries, "error", err)

if attempt < maxRetries {
time.Sleep(retryDelay)
retryDelay *= 2 // Exponential backoff
}
}

// Create the system database
systemDB, err := newSystemDatabase(initExecutor, config.DatabaseURL, initExecutor.logger)
if err != nil {
return nil, newInitializationError(fmt.Sprintf("failed to create system database: %v", err))
return nil, newInitializationError(fmt.Sprintf("failed to create system database after %d attempts: %v", maxRetries, err))
}

initExecutor.systemDB = systemDB
initExecutor.logger.Info("System database initialized")

Expand Down
Loading