Skip to content

Commit 77b5acf

Browse files
authored
Various fixes (#43)
- print scheduled workflows schedules - fix bug in test - wait for workflows to complete in shutdown - fix GetResult() error handling - Handle errors in admin server - move global variables in singleton - fix `processConfig` now that app name and db url are mandatory
1 parent 9c81f4c commit 77b5acf

File tree

9 files changed

+77
-72
lines changed

9 files changed

+77
-72
lines changed

dbos/admin_server.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,10 @@ func newAdminServer(port int) *adminServer {
6464
}
6565

6666
w.Header().Set("Content-Type", "application/json")
67-
w.WriteHeader(http.StatusOK)
6867
if err := json.NewEncoder(w).Encode(workflowIDs); err != nil {
6968
getLogger().Error("Error encoding response", "error", err)
69+
http.Error(w, fmt.Sprintf("Failed to encode response: %v", err), http.StatusInternalServerError)
70+
return
7071
}
7172
})
7273

@@ -92,9 +93,10 @@ func newAdminServer(port int) *adminServer {
9293
}
9394

9495
w.Header().Set("Content-Type", "application/json")
95-
w.WriteHeader(http.StatusOK)
9696
if err := json.NewEncoder(w).Encode(queueMetadataArray); err != nil {
9797
getLogger().Error("Error encoding queue metadata response", "error", err)
98+
http.Error(w, fmt.Sprintf("Failed to encode response: %v", err), http.StatusInternalServerError)
99+
return
98100
}
99101
})
100102

dbos/dbos.go

Lines changed: 29 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,14 @@ import (
88
"fmt"
99
"io"
1010
"log/slog"
11-
"net/url"
1211
"os"
12+
"sync"
1313
"time"
1414

1515
"github.com/robfig/cron/v3"
1616
)
1717

1818
var (
19-
_APP_VERSION string
20-
_EXECUTOR_ID string
21-
_APP_ID string
2219
_DEFAULT_ADMIN_SERVER_PORT = 3001
2320
)
2421

@@ -40,10 +37,7 @@ type Config struct {
4037
AdminServer bool
4138
}
4239

43-
// processConfig merges configuration from two sources in order of precedence:
44-
// 1. programmatic configuration
45-
// 2. environment variables
46-
// Finally, it applies default values if needed.
40+
// processConfig enforces mandatory fields and applies defaults.
4741
func processConfig(inputConfig *Config) (*Config, error) {
4842
// First check required fields
4943
if len(inputConfig.DatabaseURL) == 0 {
@@ -53,33 +47,17 @@ func processConfig(inputConfig *Config) (*Config, error) {
5347
return nil, fmt.Errorf("missing required config field: appName")
5448
}
5549

56-
dbosConfig := &Config{}
57-
58-
// Start with environment variables (lowest precedence)
59-
if dbURL := os.Getenv("DBOS_SYSTEM_DATABASE_URL"); dbURL != "" {
60-
dbosConfig.DatabaseURL = dbURL
61-
}
62-
63-
// Override with programmatic configuration (highest precedence)
64-
if len(inputConfig.DatabaseURL) > 0 {
65-
dbosConfig.DatabaseURL = inputConfig.DatabaseURL
66-
}
67-
if len(inputConfig.AppName) > 0 {
68-
dbosConfig.AppName = inputConfig.AppName
50+
dbosConfig := &Config{
51+
DatabaseURL: inputConfig.DatabaseURL,
52+
AppName: inputConfig.AppName,
53+
Logger: inputConfig.Logger,
54+
AdminServer: inputConfig.AdminServer,
6955
}
70-
// Copy over parameters that can only be set programmatically
71-
dbosConfig.Logger = inputConfig.Logger
72-
dbosConfig.AdminServer = inputConfig.AdminServer
7356

7457
// Load defaults
7558
if dbosConfig.Logger == nil {
7659
dbosConfig.Logger = slog.New(slog.NewTextHandler(os.Stderr, nil))
7760
}
78-
if len(dbosConfig.DatabaseURL) == 0 {
79-
dbosConfig.Logger.Info("Using default database URL: postgres://postgres:${PGPASSWORD}@localhost:5432/dbos?sslmode=disable")
80-
password := url.QueryEscape(os.Getenv("PGPASSWORD"))
81-
dbosConfig.DatabaseURL = fmt.Sprintf("postgres://postgres:%s@localhost:5432/dbos?sslmode=disable", password)
82-
}
8361

8462
return dbosConfig, nil
8563
}
@@ -93,6 +71,10 @@ type executor struct {
9371
queueRunnerDone chan struct{}
9472
adminServer *adminServer
9573
config *Config
74+
applicationVersion string
75+
applicationID string
76+
executorID string
77+
workflowsWg *sync.WaitGroup
9678
}
9779

9880
func Initialize(inputConfig Config) error {
@@ -101,11 +83,16 @@ func Initialize(inputConfig Config) error {
10183
return newInitializationError("DBOS already initialized")
10284
}
10385

86+
initExecutor := &executor{
87+
workflowsWg: &sync.WaitGroup{},
88+
}
89+
10490
// Load & process the configuration
10591
config, err := processConfig(&inputConfig)
10692
if err != nil {
10793
return newInitializationError(err.Error())
10894
}
95+
initExecutor.config = config
10996

11097
// Set global logger
11198
logger = config.Logger
@@ -115,32 +102,30 @@ func Initialize(inputConfig Config) error {
115102
gob.Register(t)
116103

117104
// Initialize global variables with environment variables, providing defaults if not set
118-
_APP_VERSION = os.Getenv("DBOS__APPVERSION")
119-
if _APP_VERSION == "" {
120-
_APP_VERSION = computeApplicationVersion()
105+
initExecutor.applicationVersion = os.Getenv("DBOS__APPVERSION")
106+
if initExecutor.applicationVersion == "" {
107+
initExecutor.applicationVersion = computeApplicationVersion()
121108
logger.Info("DBOS__APPVERSION not set, using computed hash")
122109
}
123110

124-
_EXECUTOR_ID = os.Getenv("DBOS__VMID")
125-
if _EXECUTOR_ID == "" {
126-
_EXECUTOR_ID = "local"
127-
logger.Info("DBOS__VMID not set, using default", "executor_id", _EXECUTOR_ID)
111+
initExecutor.executorID = os.Getenv("DBOS__VMID")
112+
if initExecutor.executorID == "" {
113+
initExecutor.executorID = "local"
114+
logger.Info("DBOS__VMID not set, using default", "executor_id", initExecutor.executorID)
128115
}
129116

130-
_APP_ID = os.Getenv("DBOS__APPID")
117+
initExecutor.applicationID = os.Getenv("DBOS__APPID")
131118

132119
// Create the system database
133120
systemDB, err := NewSystemDatabase(config.DatabaseURL)
134121
if err != nil {
135122
return newInitializationError(fmt.Sprintf("failed to create system database: %v", err))
136123
}
124+
initExecutor.systemDB = systemDB
137125
logger.Info("System database initialized")
138126

139127
// Set the global dbos instance
140-
dbos = &executor{
141-
config: config,
142-
systemDB: systemDB,
143-
}
128+
dbos = initExecutor
144129

145130
return nil
146131
}
@@ -184,15 +169,15 @@ func Launch() error {
184169
}
185170

186171
// Run a round of recovery on the local executor
187-
recoveryHandles, err := recoverPendingWorkflows(context.Background(), []string{_EXECUTOR_ID}) // XXX maybe use the queue runner context here to allow Shutdown to cancel it?
172+
recoveryHandles, err := recoverPendingWorkflows(context.Background(), []string{dbos.executorID}) // XXX maybe use the queue runner context here to allow Shutdown to cancel it?
188173
if err != nil {
189174
return newInitializationError(fmt.Sprintf("failed to recover pending workflows during launch: %v", err))
190175
}
191176
if len(recoveryHandles) > 0 {
192177
logger.Info("Recovered pending workflows", "count", len(recoveryHandles))
193178
}
194179

195-
logger.Info("DBOS initialized", "app_version", _APP_VERSION, "executor_id", _EXECUTOR_ID)
180+
logger.Info("DBOS initialized", "app_version", dbos.applicationVersion, "executor_id", dbos.executorID)
196181
return nil
197182
}
198183

@@ -203,6 +188,7 @@ func Shutdown() {
203188
}
204189

205190
// XXX is there a way to ensure all workflows goroutine are done before closing?
191+
dbos.workflowsWg.Wait()
206192

207193
// Cancel the context to stop the queue runner
208194
if dbos.queueRunnerCancelFunc != nil {

dbos/queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func queueRunner(ctx context.Context) {
167167

168168
_, err := registeredWorkflow.wrappedFunction(ctx, input, WithWorkflowID(workflow.id))
169169
if err != nil {
170-
getLogger().Error("Error recovering workflow", "error", err)
170+
getLogger().Error("Error running queued workflow", "error", err)
171171
}
172172
}
173173
}

dbos/queues_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -507,8 +507,8 @@ func TestWorkerConcurrency(t *testing.T) {
507507

508508
// Stop the queue runner before changing executor ID to avoid race conditions
509509
stopQueueRunner()
510-
// Change the EXECUTOR_ID global variable to a different value
511-
_EXECUTOR_ID = "worker-2"
510+
// Change the executor ID to a different value
511+
dbos.executorID = "worker-2"
512512
// Restart the queue runner
513513
restartQueueRunner()
514514

@@ -541,7 +541,7 @@ func TestWorkerConcurrency(t *testing.T) {
541541
// Stop the queue runner before changing executor ID to avoid race conditions
542542
stopQueueRunner()
543543
// Change the executor again and wait for the third workflow to start
544-
_EXECUTOR_ID = "local"
544+
dbos.executorID = "local"
545545
// Restart the queue runner
546546
restartQueueRunner()
547547
startEvents[2].Wait()
@@ -573,7 +573,7 @@ func TestWorkerConcurrency(t *testing.T) {
573573
// Stop the queue runner before changing executor ID to avoid race conditions
574574
stopQueueRunner()
575575
// change executor again and wait for the fourth workflow to start
576-
_EXECUTOR_ID = "worker-2"
576+
dbos.executorID = "worker-2"
577577
// Restart the queue runner
578578
restartQueueRunner()
579579
startEvents[3].Wait()
@@ -597,7 +597,7 @@ func TestWorkerConcurrency(t *testing.T) {
597597
t.Fatal("expected queue entries to be cleaned up after global concurrency test")
598598
}
599599

600-
_EXECUTOR_ID = "local" // Reset executor ID for future tests
600+
dbos.executorID = "local" // Reset executor ID for future tests
601601
}
602602

603603
var (

dbos/recovery.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ func recoverPendingWorkflows(ctx context.Context, executorIDs []string) ([]Workf
1111
pendingWorkflows, err := dbos.systemDB.ListWorkflows(ctx, listWorkflowsDBInput{
1212
status: []WorkflowStatusType{WorkflowStatusPending},
1313
executorIDs: executorIDs,
14-
applicationVersion: _APP_VERSION,
14+
applicationVersion: dbos.applicationVersion,
1515
})
1616
if err != nil {
1717
return nil, err

dbos/system_database.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,26 +23,39 @@ import (
2323
/*******************************/
2424

2525
type SystemDatabase interface {
26+
// SysDB management
2627
Launch(ctx context.Context)
2728
Shutdown()
2829
ResetSystemDB(ctx context.Context) error
30+
31+
// Workflows
2932
InsertWorkflowStatus(ctx context.Context, input insertWorkflowStatusDBInput) (*insertWorkflowResult, error)
30-
RecordOperationResult(ctx context.Context, input recordOperationResultDBInput) error
31-
RecordChildWorkflow(ctx context.Context, input recordChildWorkflowDBInput) error
32-
CheckChildWorkflow(ctx context.Context, workflowUUID string, functionID int) (*string, error)
3333
ListWorkflows(ctx context.Context, input listWorkflowsDBInput) ([]WorkflowStatus, error)
3434
UpdateWorkflowOutcome(ctx context.Context, input updateWorkflowOutcomeDBInput) error
3535
AwaitWorkflowResult(ctx context.Context, workflowID string) (any, error)
36-
DequeueWorkflows(ctx context.Context, queue WorkflowQueue) ([]dequeuedWorkflow, error)
37-
ClearQueueAssignment(ctx context.Context, workflowID string) (bool, error)
38-
CheckOperationExecution(ctx context.Context, input checkOperationExecutionDBInput) (*recordedResult, error)
36+
37+
// Child workflows
38+
RecordChildWorkflow(ctx context.Context, input recordChildWorkflowDBInput) error
39+
CheckChildWorkflow(ctx context.Context, workflowUUID string, functionID int) (*string, error)
3940
RecordChildGetResult(ctx context.Context, input recordChildGetResultDBInput) error
41+
42+
// Steps
43+
RecordOperationResult(ctx context.Context, input recordOperationResultDBInput) error
44+
CheckOperationExecution(ctx context.Context, input checkOperationExecutionDBInput) (*recordedResult, error)
4045
GetWorkflowSteps(ctx context.Context, workflowID string) ([]StepInfo, error)
46+
47+
// Communication (special steps)
4148
Send(ctx context.Context, input workflowSendInputInternal) error
4249
Recv(ctx context.Context, input WorkflowRecvInput) (any, error)
4350
SetEvent(ctx context.Context, input workflowSetEventInputInternal) error
4451
GetEvent(ctx context.Context, input WorkflowGetEventInput) (any, error)
52+
53+
// Timers (special steps)
4554
Sleep(ctx context.Context, duration time.Duration) (time.Duration, error)
55+
56+
// Queues
57+
DequeueWorkflows(ctx context.Context, queue WorkflowQueue) ([]dequeuedWorkflow, error)
58+
ClearQueueAssignment(ctx context.Context, workflowID string) (bool, error)
4659
}
4760

4861
type systemDatabase struct {
@@ -1625,7 +1638,7 @@ func (s *systemDatabase) DequeueWorkflows(ctx context.Context, queue WorkflowQue
16251638
pendingWorkflowsDict[executorIDRow] = taskCount
16261639
}
16271640

1628-
localPendingWorkflows := pendingWorkflowsDict[_EXECUTOR_ID]
1641+
localPendingWorkflows := pendingWorkflowsDict[dbos.executorID]
16291642

16301643
// Check worker concurrency limit
16311644
if queue.workerConcurrency != nil {
@@ -1692,7 +1705,7 @@ func (s *systemDatabase) DequeueWorkflows(ctx context.Context, queue WorkflowQue
16921705
}
16931706

16941707
// Execute the query to get workflow IDs
1695-
rows, err := tx.Query(ctx, query, queue.name, WorkflowStatusEnqueued, _APP_VERSION)
1708+
rows, err := tx.Query(ctx, query, queue.name, WorkflowStatusEnqueued, dbos.applicationVersion)
16961709
if err != nil {
16971710
return nil, fmt.Errorf("failed to query enqueued workflows: %w", err)
16981711
}
@@ -1742,8 +1755,8 @@ func (s *systemDatabase) DequeueWorkflows(ctx context.Context, queue WorkflowQue
17421755
var inputString *string
17431756
err := tx.QueryRow(ctx, updateQuery,
17441757
WorkflowStatusPending,
1745-
_APP_VERSION,
1746-
_EXECUTOR_ID,
1758+
dbos.applicationVersion,
1759+
dbos.executorID,
17471760
startTimeMs,
17481761
id).Scan(&retWorkflow.name, &inputString)
17491762

dbos/utils_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ import (
1515
func getDatabaseURL(t *testing.T) string {
1616
databaseURL := os.Getenv("DBOS_SYSTEM_DATABASE_URL")
1717
if databaseURL == "" {
18-
if os.Getenv("PGPASSWORD") == "" {
19-
t.Skip("PGPASSWORD not set, cannot construct database URL")
18+
password := os.Getenv("PGPASSWORD")
19+
if password == "" {
20+
password = "dbos"
2021
}
21-
password := url.QueryEscape(os.Getenv("PGPASSWORD"))
22-
databaseURL = fmt.Sprintf("postgres://postgres:%s@localhost:5432/dbos?sslmode=disable", password)
22+
databaseURL = fmt.Sprintf("postgres://postgres:%s@localhost:5432/dbos?sslmode=disable", url.QueryEscape(password))
2323
}
2424
return databaseURL
2525
}

0 commit comments

Comments
 (0)