From 54798b5c5dc2a9d274dcf76ad591154b5bcb2633 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 21 Jul 2025 11:37:55 -0700 Subject: [PATCH 01/13] accept user provided db url --- dbos/dbos.go | 21 ++++++++++++++++++++- dbos/system_database.go | 12 +----------- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/dbos/dbos.go b/dbos/dbos.go index 3bab89e4..a44e8500 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "fmt" "log/slog" + "net/url" "os" "reflect" "runtime" @@ -92,6 +93,7 @@ func getLogger() *slog.Logger { type config struct { logger *slog.Logger adminServer bool + databaseURL string } type LaunchOption func(*config) @@ -108,6 +110,12 @@ func WithAdminServer() LaunchOption { } } +func WithDatabaseURL(url string) LaunchOption { + return func(config *config) { + config.databaseURL = url + } +} + func Launch(options ...LaunchOption) error { if dbos != nil { fmt.Println("warning: DBOS instance already initialized, skipping re-initialization") @@ -139,7 +147,18 @@ func Launch(options ...LaunchOption) error { APP_ID = os.Getenv("DBOS__APPID") // Create the system database - systemDB, err := NewSystemDatabase() + var databaseURL string + if config.databaseURL != "" { + databaseURL = config.databaseURL + } else { + databaseURL = os.Getenv("DBOS_DATABASE_URL") + if databaseURL == "" { + fmt.Println("DBOS_DATABASE_URL not set, using default: postgres://postgres:${PGPASSWORD}@localhost:5432/dbos?sslmode=disable") + password := url.QueryEscape(os.Getenv("PGPASSWORD")) + databaseURL = fmt.Sprintf("postgres://postgres:%s@localhost:5432/dbos?sslmode=disable", password) + } + } + systemDB, err := NewSystemDatabase(databaseURL) if err != nil { return NewInitializationError(fmt.Sprintf("failed to create system database: %v", err)) } diff --git a/dbos/system_database.go b/dbos/system_database.go index ceb2e3e7..18dec90e 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -5,8 +5,6 @@ import ( "embed" "errors" "fmt" - "net/url" - "os" "strings" "sync" "time" @@ -125,15 +123,7 @@ func runMigrations(databaseURL string) error { } // New creates a new SystemDatabase instance and runs migrations -func NewSystemDatabase() (SystemDatabase, error) { - // TODO: pass proper config - databaseURL := os.Getenv("DBOS_DATABASE_URL") - if databaseURL == "" { - fmt.Println("DBOS_DATABASE_URL not set, using default: postgres://postgres:${PGPASSWORD}@localhost:5432/dbos?sslmode=disable") - password := url.QueryEscape(os.Getenv("PGPASSWORD")) - databaseURL = fmt.Sprintf("postgres://postgres:%s@localhost:5432/dbos?sslmode=disable", password) - } - +func NewSystemDatabase(databaseURL string) (SystemDatabase, error) { // Create the database if it doesn't exist if err := createDatabaseIfNotExists(databaseURL); err != nil { return nil, NewInitializationError(fmt.Sprintf("failed to create database: %v", err)) From 53f12ef181eb62922e728b1e2705c16fd453db73 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 21 Jul 2025 11:38:28 -0700 Subject: [PATCH 02/13] do not duplicate InitializationError --- dbos/system_database.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 18dec90e..05d05f08 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -126,25 +126,25 @@ func runMigrations(databaseURL string) error { func NewSystemDatabase(databaseURL string) (SystemDatabase, error) { // Create the database if it doesn't exist if err := createDatabaseIfNotExists(databaseURL); err != nil { - return nil, NewInitializationError(fmt.Sprintf("failed to create database: %v", err)) + return nil, fmt.Errorf("failed to create database: %v", err) } // Run migrations first if err := runMigrations(databaseURL); err != nil { - return nil, NewInitializationError(fmt.Sprintf("failed to run migrations: %v", err)) + return nil, fmt.Errorf("failed to run migrations: %v", err) } // Create pgx pool pool, err := pgxpool.New(context.Background(), databaseURL) if err != nil { - return nil, NewInitializationError(fmt.Sprintf("failed to create connection pool: %v", err)) + return nil, fmt.Errorf("failed to create connection pool: %v", err) } // Test the connection // FIXME: remove this if err := pool.Ping(context.Background()); err != nil { pool.Close() - return nil, NewInitializationError(fmt.Sprintf("failed to ping database: %v", err)) + return nil, fmt.Errorf("failed to ping database: %v", err) } // Create a map of notification payloads to channels @@ -153,7 +153,7 @@ func NewSystemDatabase(databaseURL string) (SystemDatabase, error) { // Create a connection to listen on notifications config, err := pgconn.ParseConfig(databaseURL) if err != nil { - return nil, NewInitializationError(fmt.Sprintf("failed to parse database URL: %v", err)) + return nil, fmt.Errorf("failed to parse database URL: %v", err) } config.OnNotification = func(c *pgconn.PgConn, n *pgconn.Notification) { if n.Channel == "dbos_notifications_channel" { @@ -170,7 +170,7 @@ func NewSystemDatabase(databaseURL string) (SystemDatabase, error) { } notificationListenerConnection, err := pgconn.ConnectConfig(context.Background(), config) if err != nil { - return nil, NewInitializationError(fmt.Sprintf("failed to connect notification listener to database: %v", err)) + return nil, fmt.Errorf("failed to connect notification listener to database: %v", err) } return &systemDatabase{ From 7c22bf1db88feb7bbe7f461187ab089b172b8208 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 21 Jul 2025 13:46:29 -0700 Subject: [PATCH 03/13] parse dbos config file --- dbos/config.go | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++ dbos/dbos.go | 17 +++-------- 2 files changed, 84 insertions(+), 13 deletions(-) create mode 100644 dbos/config.go diff --git a/dbos/config.go b/dbos/config.go new file mode 100644 index 00000000..2e7b5a2e --- /dev/null +++ b/dbos/config.go @@ -0,0 +1,80 @@ +package dbos + +import ( + "fmt" + "net/url" + "os" + + "github.com/spf13/viper" +) + +const dbosConfigFileName = "dbos_config.yaml" + +type configFile struct { + Name string `yaml:"name"` + DatabaseURL string `yaml:"database_url"` +} + +func LoadConfig() (*configFile, error) { + v := viper.New() + v.SetConfigFile(dbosConfigFileName) + v.AutomaticEnv() + + if err := v.ReadInConfig(); err != nil { + return nil, err + } + + var config configFile + if err := v.Unmarshal(&config); err != nil { + return nil, fmt.Errorf("failed to parse config: %w", err) + } + + return &config, nil +} + +// ProcessConfig merges configuration from three sources in order of precedence: +// 1. programmatic configuration (highest precedence) +// 2. configuration file +// 3. environment variables (lowest precedence) +func ProcessConfig(programmaticConfig config) *config { + fileConfig, err := LoadConfig() + if err != nil && !os.IsNotExist(err) { + panic(err) + } + + dbosConfig := &config{} + + // Start with environment variables (lowest precedence) + if dbURL := os.Getenv("DBOS_DATABASE_URL"); dbURL != "" { + dbosConfig.databaseURL = dbURL + } + + // Override with file configuration if available + if fileConfig != nil { + if fileConfig.DatabaseURL != "" { + dbosConfig.databaseURL = fileConfig.DatabaseURL + } + if fileConfig.Name != "" { + dbosConfig.appName = fileConfig.Name + } + } + + // Override with programmatic configuration (highest precedence) + if len(programmaticConfig.databaseURL) > 0 { + dbosConfig.databaseURL = programmaticConfig.databaseURL + } + if len(programmaticConfig.appName) > 0 { + dbosConfig.appName = programmaticConfig.appName + } + // Copy over parameters that can only be set programmatically + dbosConfig.logger = programmaticConfig.logger + dbosConfig.adminServer = programmaticConfig.adminServer + + // Load defaults + if len(dbosConfig.databaseURL) == 0 { + fmt.Println("DBOS_DATABASE_URL not set, using default: postgres://postgres:${PGPASSWORD}@localhost:5432/dbos?sslmode=disable") + password := url.QueryEscape(os.Getenv("PGPASSWORD")) + dbosConfig.databaseURL = fmt.Sprintf("postgres://postgres:%s@localhost:5432/dbos?sslmode=disable", password) + } + return dbosConfig +} diff --git a/dbos/dbos.go b/dbos/dbos.go index a44e8500..3ab83dac 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -6,7 +6,6 @@ import ( "encoding/hex" "fmt" "log/slog" - "net/url" "os" "reflect" "runtime" @@ -94,6 +93,7 @@ type config struct { logger *slog.Logger adminServer bool databaseURL string + appName string } type LaunchOption func(*config) @@ -146,19 +146,10 @@ func Launch(options ...LaunchOption) error { APP_ID = os.Getenv("DBOS__APPID") + config = ProcessConfig(*config) + // Create the system database - var databaseURL string - if config.databaseURL != "" { - databaseURL = config.databaseURL - } else { - databaseURL = os.Getenv("DBOS_DATABASE_URL") - if databaseURL == "" { - fmt.Println("DBOS_DATABASE_URL not set, using default: postgres://postgres:${PGPASSWORD}@localhost:5432/dbos?sslmode=disable") - password := url.QueryEscape(os.Getenv("PGPASSWORD")) - databaseURL = fmt.Sprintf("postgres://postgres:%s@localhost:5432/dbos?sslmode=disable", password) - } - } - systemDB, err := NewSystemDatabase(databaseURL) + systemDB, err := NewSystemDatabase(config.databaseURL) if err != nil { return NewInitializationError(fmt.Sprintf("failed to create system database: %v", err)) } From 67b3e46cc0973f62e1099685670e4db78ef750b7 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 21 Jul 2025 13:48:19 -0700 Subject: [PATCH 04/13] update dependencies --- go.mod | 15 ++++++++++++++- go.sum | 42 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 9b4033e2..298bb361 100644 --- a/go.mod +++ b/go.mod @@ -8,16 +8,29 @@ require ( github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa github.com/jackc/pgx/v5 v5.7.5 github.com/robfig/cron/v3 v3.0.1 + github.com/spf13/viper v1.20.1 ) require ( + github.com/fsnotify/fsnotify v1.8.0 // indirect + github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect - go.uber.org/atomic v1.7.0 // indirect + github.com/pelletier/go-toml/v2 v2.2.3 // indirect + github.com/sagikazarmark/locafero v0.7.0 // indirect + github.com/sourcegraph/conc v0.3.0 // indirect + github.com/spf13/afero v1.12.0 // indirect + github.com/spf13/cast v1.7.1 // indirect + github.com/spf13/pflag v1.0.6 // indirect + github.com/subosito/gotenv v1.6.0 // indirect + go.uber.org/atomic v1.9.0 // indirect + go.uber.org/multierr v1.9.0 // indirect golang.org/x/crypto v0.39.0 // indirect golang.org/x/sync v0.15.0 // indirect + golang.org/x/sys v0.33.0 // indirect golang.org/x/text v0.26.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 7f6653b4..0cc5ebe1 100644 --- a/go.sum +++ b/go.sum @@ -17,14 +17,22 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= +github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= +github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-migrate/migrate/v4 v4.18.3 h1:EYGkoOsvgHHfm5U/naS1RP/6PL/Xv3S4B/swMiAmDLs= github.com/golang-migrate/migrate/v4 v4.18.3/go.mod h1:99BKpIi6ruaaXRM1A77eqZ+FWPQ3cfRa+ZVy5bmWMaY= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -42,6 +50,10 @@ github.com/jackc/pgx/v5 v5.7.5 h1:JHGfMnQY+IEtGM63d+NGMjoRpysB2JBwDr5fsngwmJs= github.com/jackc/pgx/v5 v5.7.5/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= @@ -54,17 +66,35 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= +github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= +github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/sagikazarmark/locafero v0.7.0 h1:5MqpDsTGNDhY8sGp0Aowyf0qKsPrhewaLSsFaodPcyo= +github.com/sagikazarmark/locafero v0.7.0/go.mod h1:2za3Cg5rMaTMoG/2Ulr9AwtFaIppKXTRYnozin4aB5k= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= +github.com/spf13/afero v1.12.0 h1:UcOPyRBYczmFn6yvphxkn9ZEOY65cpwGKb5mL36mrqs= +github.com/spf13/afero v1.12.0/go.mod h1:ZTlWwG4/ahT8W7T0WQ5uYmjI9duaLQGy3Q2OAl4sk/4= +github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y= +github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= +github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/viper v1.20.1 h1:ZMi+z/lvLyPSCoNtFCpqjy0S4kPbirhpTMwl8BkW9X4= +github.com/spf13/viper v1.20.1/go.mod h1:P9Mdzt1zoHIG8m2eZQinpiBjo6kCmZSKBClNNqjJvu4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= +github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+nBOA/+LUkobKGW1ydGcn+G3vRw9+g5HwCphpk= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8= go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= @@ -73,8 +103,10 @@ go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2 go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4= go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ= -go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= -go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= +go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= @@ -84,6 +116,8 @@ golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 5617255d034417f417a574f6ab27f2a6ba486531 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 21 Jul 2025 14:37:55 -0700 Subject: [PATCH 05/13] nit --- dbos/config.go | 4 ++-- dbos/dbos.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dbos/config.go b/dbos/config.go index 2e7b5a2e..3d99debc 100644 --- a/dbos/config.go +++ b/dbos/config.go @@ -32,11 +32,11 @@ func LoadConfig() (*configFile, error) { return &config, nil } -// ProcessConfig merges configuration from three sources in order of precedence: +// NewConfig merges configuration from three sources in order of precedence: // 1. programmatic configuration (highest precedence) // 2. configuration file // 3. environment variables (lowest precedence) -func ProcessConfig(programmaticConfig config) *config { +func NewConfig(programmaticConfig config) *config { fileConfig, err := LoadConfig() if err != nil && !os.IsNotExist(err) { panic(err) diff --git a/dbos/dbos.go b/dbos/dbos.go index 3ab83dac..0ddaaa30 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -122,12 +122,14 @@ func Launch(options ...LaunchOption) error { return NewInitializationError("DBOS already initialized") } + // Load & process the configuration config := &config{ logger: slog.New(slog.NewTextHandler(os.Stderr, nil)), } for _, option := range options { option(config) } + config = NewConfig(*config) logger = config.logger @@ -146,8 +148,6 @@ func Launch(options ...LaunchOption) error { APP_ID = os.Getenv("DBOS__APPID") - config = ProcessConfig(*config) - // Create the system database systemDB, err := NewSystemDatabase(config.databaseURL) if err != nil { From ed712e05873923b56865cabf4780acc71669735f Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 21 Jul 2025 14:40:48 -0700 Subject: [PATCH 06/13] nits --- dbos/config.go | 4 ++-- dbos/dbos.go | 7 +------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/dbos/config.go b/dbos/config.go index 3d99debc..d9293204 100644 --- a/dbos/config.go +++ b/dbos/config.go @@ -15,7 +15,7 @@ type configFile struct { DatabaseURL string `yaml:"database_url"` } -func LoadConfig() (*configFile, error) { +func LoadConfigFile() (*configFile, error) { v := viper.New() v.SetConfigFile(dbosConfigFileName) v.AutomaticEnv() @@ -37,7 +37,7 @@ func LoadConfig() (*configFile, error) { // 2. configuration file // 3. environment variables (lowest precedence) func NewConfig(programmaticConfig config) *config { - fileConfig, err := LoadConfig() + fileConfig, err := LoadConfigFile() if err != nil && !os.IsNotExist(err) { panic(err) } diff --git a/dbos/dbos.go b/dbos/dbos.go index 0ddaaa30..545d6e7e 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -78,12 +78,7 @@ func getExecutor() *executor { var logger *slog.Logger func getLogger() *slog.Logger { - if dbos == nil { - fmt.Println("warning: DBOS instance not initialized, using default logger") - return slog.New(slog.NewTextHandler(os.Stderr, nil)) - } - if logger == nil { - fmt.Println("warning: DBOS logger is nil, using default logger") + if dbos == nil || logger == nil { return slog.New(slog.NewTextHandler(os.Stderr, nil)) } return logger From 92e2202055dad32ed59ee7fde0ba48066d37cb89 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 21 Jul 2025 16:29:32 -0700 Subject: [PATCH 07/13] no config file for now --- dbos/config.go | 80 -------------------------------------------------- dbos/dbos.go | 33 +++++++++++++++++++++ 2 files changed, 33 insertions(+), 80 deletions(-) delete mode 100644 dbos/config.go diff --git a/dbos/config.go b/dbos/config.go deleted file mode 100644 index d9293204..00000000 --- a/dbos/config.go +++ /dev/null @@ -1,80 +0,0 @@ -package dbos - -import ( - "fmt" - "net/url" - "os" - - "github.com/spf13/viper" -) - -const dbosConfigFileName = "dbos_config.yaml" - -type configFile struct { - Name string `yaml:"name"` - DatabaseURL string `yaml:"database_url"` -} - -func LoadConfigFile() (*configFile, error) { - v := viper.New() - v.SetConfigFile(dbosConfigFileName) - v.AutomaticEnv() - - if err := v.ReadInConfig(); err != nil { - return nil, err - } - - var config configFile - if err := v.Unmarshal(&config); err != nil { - return nil, fmt.Errorf("failed to parse config: %w", err) - } - - return &config, nil -} - -// NewConfig merges configuration from three sources in order of precedence: -// 1. programmatic configuration (highest precedence) -// 2. configuration file -// 3. environment variables (lowest precedence) -func NewConfig(programmaticConfig config) *config { - fileConfig, err := LoadConfigFile() - if err != nil && !os.IsNotExist(err) { - panic(err) - } - - dbosConfig := &config{} - - // Start with environment variables (lowest precedence) - if dbURL := os.Getenv("DBOS_DATABASE_URL"); dbURL != "" { - dbosConfig.databaseURL = dbURL - } - - // Override with file configuration if available - if fileConfig != nil { - if fileConfig.DatabaseURL != "" { - dbosConfig.databaseURL = fileConfig.DatabaseURL - } - if fileConfig.Name != "" { - dbosConfig.appName = fileConfig.Name - } - } - - // Override with programmatic configuration (highest precedence) - if len(programmaticConfig.databaseURL) > 0 { - dbosConfig.databaseURL = programmaticConfig.databaseURL - } - if len(programmaticConfig.appName) > 0 { - dbosConfig.appName = programmaticConfig.appName - } - // Copy over parameters that can only be set programmatically - dbosConfig.logger = programmaticConfig.logger - dbosConfig.adminServer = programmaticConfig.adminServer - - // Load defaults - if len(dbosConfig.databaseURL) == 0 { - fmt.Println("DBOS_DATABASE_URL not set, using default: postgres://postgres:${PGPASSWORD}@localhost:5432/dbos?sslmode=disable") - password := url.QueryEscape(os.Getenv("PGPASSWORD")) - dbosConfig.databaseURL = fmt.Sprintf("postgres://postgres:%s@localhost:5432/dbos?sslmode=disable", password) - } - return dbosConfig -} diff --git a/dbos/dbos.go b/dbos/dbos.go index 545d6e7e..fa4b8f50 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "fmt" "log/slog" + "net/url" "os" "reflect" "runtime" @@ -91,6 +92,38 @@ type config struct { appName string } +// NewConfig merges configuration from two sources in order of precedence: +// 1. programmatic configuration +// 2. environment variables +// Finally, it applies default values if needed. +func NewConfig(programmaticConfig config) *config { + dbosConfig := &config{} + + // Start with environment variables (lowest precedence) + if dbURL := os.Getenv("DBOS_DATABASE_URL"); dbURL != "" { + dbosConfig.databaseURL = dbURL + } + + // Override with programmatic configuration (highest precedence) + if len(programmaticConfig.databaseURL) > 0 { + dbosConfig.databaseURL = programmaticConfig.databaseURL + } + if len(programmaticConfig.appName) > 0 { + dbosConfig.appName = programmaticConfig.appName + } + // Copy over parameters that can only be set programmatically + dbosConfig.logger = programmaticConfig.logger + dbosConfig.adminServer = programmaticConfig.adminServer + + // Load defaults + if len(dbosConfig.databaseURL) == 0 { + getLogger().Info("DBOS_DATABASE_URL not set, using default: postgres://postgres:${PGPASSWORD}@localhost:5432/dbos?sslmode=disable") + password := url.QueryEscape(os.Getenv("PGPASSWORD")) + dbosConfig.databaseURL = fmt.Sprintf("postgres://postgres:%s@localhost:5432/dbos?sslmode=disable", password) + } + return dbosConfig +} + type LaunchOption func(*config) func WithLogger(logger *slog.Logger) LaunchOption { From 94239e2156707ccb6294eabf8bb505d224f8ba95 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 21 Jul 2025 16:30:46 -0700 Subject: [PATCH 08/13] update deps --- go.mod | 14 +------------- go.sum | 34 ---------------------------------- 2 files changed, 1 insertion(+), 47 deletions(-) diff --git a/go.mod b/go.mod index 298bb361..2fb3ba40 100644 --- a/go.mod +++ b/go.mod @@ -8,29 +8,17 @@ require ( github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa github.com/jackc/pgx/v5 v5.7.5 github.com/robfig/cron/v3 v3.0.1 - github.com/spf13/viper v1.20.1 ) require ( - github.com/fsnotify/fsnotify v1.8.0 // indirect - github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect - github.com/pelletier/go-toml/v2 v2.2.3 // indirect - github.com/sagikazarmark/locafero v0.7.0 // indirect - github.com/sourcegraph/conc v0.3.0 // indirect - github.com/spf13/afero v1.12.0 // indirect - github.com/spf13/cast v1.7.1 // indirect - github.com/spf13/pflag v1.0.6 // indirect - github.com/subosito/gotenv v1.6.0 // indirect + github.com/stretchr/testify v1.10.0 // indirect go.uber.org/atomic v1.9.0 // indirect - go.uber.org/multierr v1.9.0 // indirect golang.org/x/crypto v0.39.0 // indirect golang.org/x/sync v0.15.0 // indirect - golang.org/x/sys v0.33.0 // indirect golang.org/x/text v0.26.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 0cc5ebe1..ffe888f9 100644 --- a/go.sum +++ b/go.sum @@ -17,22 +17,14 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= -github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= -github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= -github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= -github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-migrate/migrate/v4 v4.18.3 h1:EYGkoOsvgHHfm5U/naS1RP/6PL/Xv3S4B/swMiAmDLs= github.com/golang-migrate/migrate/v4 v4.18.3/go.mod h1:99BKpIi6ruaaXRM1A77eqZ+FWPQ3cfRa+ZVy5bmWMaY= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -50,10 +42,6 @@ github.com/jackc/pgx/v5 v5.7.5 h1:JHGfMnQY+IEtGM63d+NGMjoRpysB2JBwDr5fsngwmJs= github.com/jackc/pgx/v5 v5.7.5/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= -github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= @@ -66,35 +54,17 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= -github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= -github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= -github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= -github.com/sagikazarmark/locafero v0.7.0 h1:5MqpDsTGNDhY8sGp0Aowyf0qKsPrhewaLSsFaodPcyo= -github.com/sagikazarmark/locafero v0.7.0/go.mod h1:2za3Cg5rMaTMoG/2Ulr9AwtFaIppKXTRYnozin4aB5k= -github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= -github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= -github.com/spf13/afero v1.12.0 h1:UcOPyRBYczmFn6yvphxkn9ZEOY65cpwGKb5mL36mrqs= -github.com/spf13/afero v1.12.0/go.mod h1:ZTlWwG4/ahT8W7T0WQ5uYmjI9duaLQGy3Q2OAl4sk/4= -github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y= -github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= -github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= -github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -github.com/spf13/viper v1.20.1 h1:ZMi+z/lvLyPSCoNtFCpqjy0S4kPbirhpTMwl8BkW9X4= -github.com/spf13/viper v1.20.1/go.mod h1:P9Mdzt1zoHIG8m2eZQinpiBjo6kCmZSKBClNNqjJvu4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= -github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+nBOA/+LUkobKGW1ydGcn+G3vRw9+g5HwCphpk= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8= go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= @@ -105,8 +75,6 @@ go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt3 go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= -go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= @@ -116,8 +84,6 @@ golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 4a950e9104c6eacbe48197ad5c026b922cecb12a Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 21 Jul 2025 16:34:14 -0700 Subject: [PATCH 09/13] DBOS_SYSTEM_DATABASE_URL --- README.md | 2 +- dbos/admin-server_test.go | 4 ++-- dbos/dbos.go | 4 ++-- dbos/utils_test.go | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 33e9b40e..18e44ec5 100644 --- a/README.md +++ b/README.md @@ -173,4 +173,4 @@ Install the DBOS Transact package in your program: github.com/dbos-inc/dbos-transact-go ``` -You can store and export a Postgres connection string in the `DBOS_DATABASE_URL` environment variable for DBOS to manage your workflows state. By default, DBOS will use `postgres://postgres:${PGPASSWORD}@localhost:5432/dbos?sslmode=disable`. \ No newline at end of file +You can store and export a Postgres connection string in the `DBOS_SYSTEM_DATABASE_URL` environment variable for DBOS to manage your workflows state. By default, DBOS will use `postgres://postgres:${PGPASSWORD}@localhost:5432/dbos?sslmode=disable`. \ No newline at end of file diff --git a/dbos/admin-server_test.go b/dbos/admin-server_test.go index 07235af3..44deb4f5 100644 --- a/dbos/admin-server_test.go +++ b/dbos/admin-server_test.go @@ -13,9 +13,9 @@ import ( func TestAdminServer(t *testing.T) { // Skip if database is not available - databaseURL := os.Getenv("DBOS_DATABASE_URL") + databaseURL := os.Getenv("DBOS_SYSTEM_DATABASE_URL") if databaseURL == "" && os.Getenv("PGPASSWORD") == "" { - t.Skip("Database not available (DBOS_DATABASE_URL and PGPASSWORD not set), skipping DBOS integration tests") + t.Skip("Database not available (DBOS_SYSTEM_DATABASE_URL and PGPASSWORD not set), skipping DBOS integration tests") } t.Run("Admin server is not started without WithAdminServer option", func(t *testing.T) { diff --git a/dbos/dbos.go b/dbos/dbos.go index fa4b8f50..1fd2c2a1 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -100,7 +100,7 @@ func NewConfig(programmaticConfig config) *config { dbosConfig := &config{} // Start with environment variables (lowest precedence) - if dbURL := os.Getenv("DBOS_DATABASE_URL"); dbURL != "" { + if dbURL := os.Getenv("DBOS_SYSTEM_DATABASE_URL"); dbURL != "" { dbosConfig.databaseURL = dbURL } @@ -117,7 +117,7 @@ func NewConfig(programmaticConfig config) *config { // Load defaults if len(dbosConfig.databaseURL) == 0 { - getLogger().Info("DBOS_DATABASE_URL not set, using default: postgres://postgres:${PGPASSWORD}@localhost:5432/dbos?sslmode=disable") + getLogger().Info("DBOS_SYSTEM_DATABASE_URL not set, using default: postgres://postgres:${PGPASSWORD}@localhost:5432/dbos?sslmode=disable") password := url.QueryEscape(os.Getenv("PGPASSWORD")) dbosConfig.databaseURL = fmt.Sprintf("postgres://postgres:%s@localhost:5432/dbos?sslmode=disable", password) } diff --git a/dbos/utils_test.go b/dbos/utils_test.go index b944a350..3ab64d8b 100644 --- a/dbos/utils_test.go +++ b/dbos/utils_test.go @@ -16,7 +16,7 @@ import ( func setupDBOS(t *testing.T) { t.Helper() - databaseURL := os.Getenv("DBOS_DATABASE_URL") + databaseURL := os.Getenv("DBOS_SYSTEM_DATABASE_URL") if databaseURL == "" { password := url.QueryEscape(os.Getenv("PGPASSWORD")) databaseURL = fmt.Sprintf("postgres://postgres:%s@localhost:5432/dbos?sslmode=disable", password) @@ -30,7 +30,7 @@ func setupDBOS(t *testing.T) { dbName := parsedURL.Database if dbName == "" { - t.Skip("DBOS_DATABASE_URL does not specify a database name, skipping integration test") + t.Skip("DBOS_SYSTEM_DATABASE_URL does not specify a database name, skipping integration test") } postgresURL := parsedURL.Copy() From 25ff583b10740e41a63126c46ed359dc64d9e5e8 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 21 Jul 2025 16:58:21 -0700 Subject: [PATCH 10/13] log --- dbos/dbos.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbos/dbos.go b/dbos/dbos.go index 1fd2c2a1..a09d9a79 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -117,7 +117,7 @@ func NewConfig(programmaticConfig config) *config { // Load defaults if len(dbosConfig.databaseURL) == 0 { - getLogger().Info("DBOS_SYSTEM_DATABASE_URL not set, using default: postgres://postgres:${PGPASSWORD}@localhost:5432/dbos?sslmode=disable") + getLogger().Info("Using default database URL: postgres://postgres:${PGPASSWORD}@localhost:5432/dbos?sslmode=disable") password := url.QueryEscape(os.Getenv("PGPASSWORD")) dbosConfig.databaseURL = fmt.Sprintf("postgres://postgres:%s@localhost:5432/dbos?sslmode=disable", password) } From 19cceaf8acb510cc438139e7b315c60eb77f140f Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 21 Jul 2025 17:11:09 -0700 Subject: [PATCH 11/13] prevent race condition in the test --- dbos/queue.go | 7 ++----- dbos/queues_test.go | 12 ++++++++++++ dbos/system_database.go | 1 + dbos/utils_test.go | 27 +++++++++++++++++++++++++++ 4 files changed, 42 insertions(+), 5 deletions(-) diff --git a/dbos/queue.go b/dbos/queue.go index 23917ee6..fd87d9be 100644 --- a/dbos/queue.go +++ b/dbos/queue.go @@ -111,9 +111,6 @@ func queueRunner(ctx context.Context) { pollingInterval := baseInterval - // XXX doing this lets the dequeue and the task invokation survive the context cancellation - // We might be OK with not doing this. During the tests it results in all sorts of error inside the two functions above due to context cancellation - runnerContext := context.WithoutCancel(ctx) for { hasBackoffError := false @@ -121,7 +118,7 @@ func queueRunner(ctx context.Context) { for queueName, queue := range workflowQueueRegistry { getLogger().Debug("Processing queue", "queue_name", queueName) // Call DequeueWorkflows for each queue - dequeuedWorkflows, err := getExecutor().systemDB.DequeueWorkflows(runnerContext, queue) + dequeuedWorkflows, err := getExecutor().systemDB.DequeueWorkflows(ctx, queue) if err != nil { if pgErr, ok := err.(*pgconn.PgError); ok { switch pgErr.Code { @@ -164,7 +161,7 @@ func queueRunner(ctx context.Context) { } } - _, err := registeredWorkflow.wrappedFunction(runnerContext, input, WithWorkflowID(workflow.id)) + _, err := registeredWorkflow.wrappedFunction(ctx, input, WithWorkflowID(workflow.id)) if err != nil { getLogger().Error("Error recovering workflow", "error", err) } diff --git a/dbos/queues_test.go b/dbos/queues_test.go index 64b68d38..6d683302 100644 --- a/dbos/queues_test.go +++ b/dbos/queues_test.go @@ -504,8 +504,12 @@ func TestWorkerConcurrency(t *testing.T) { t.Fatalf("expected 3 workflows to be enqueued, got %d", len(workflows)) } + // Stop the queue runner before changing executor ID to avoid race conditions + stopQueueRunner() // Change the EXECUTOR_ID global variable to a different value EXECUTOR_ID = "worker-2" + // Restart the queue runner + restartQueueRunner() // Wait for the second workflow to start on the second worker startEvents[1].Wait() @@ -533,8 +537,12 @@ func TestWorkerConcurrency(t *testing.T) { if result1 != 0 { t.Fatalf("expected result from blocking workflow 1 to be 0, got %v", result1) } + // Stop the queue runner before changing executor ID to avoid race conditions + stopQueueRunner() // Change the executor again and wait for the third workflow to start EXECUTOR_ID = "local" + // Restart the queue runner + restartQueueRunner() startEvents[2].Wait() // Ensure the fourth workflow is not started yet if startEvents[3].IsSet { @@ -561,8 +569,12 @@ func TestWorkerConcurrency(t *testing.T) { if result2 != 1 { t.Fatalf("expected result from blocking workflow 2 to be 1, got %v", result2) } + // Stop the queue runner before changing executor ID to avoid race conditions + stopQueueRunner() // change executor again and wait for the fourth workflow to start EXECUTOR_ID = "worker-2" + // Restart the queue runner + restartQueueRunner() startEvents[3].Wait() // Check no workflow is enqueued workflows, err = getExecutor().systemDB.ListWorkflows(context.Background(), ListWorkflowsDBInput{ diff --git a/dbos/system_database.go b/dbos/system_database.go index 05d05f08..2536b949 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -87,6 +87,7 @@ func createDatabaseIfNotExists(databaseURL string) error { if err != nil { return NewInitializationError(fmt.Sprintf("failed to create database %s: %v", dbName, err)) } + getLogger().Info("Database created", "name", dbName) } return nil diff --git a/dbos/utils_test.go b/dbos/utils_test.go index 3ab64d8b..e600bea5 100644 --- a/dbos/utils_test.go +++ b/dbos/utils_test.go @@ -97,6 +97,33 @@ func (e *Event) Clear() { } /* Helpers */ + +// stopQueueRunner stops the queue runner for testing purposes +func stopQueueRunner() { + if dbos != nil && dbos.queueRunnerCancelFunc != nil { + dbos.queueRunnerCancelFunc() + // Wait for queue runner to finish + <-dbos.queueRunnerDone + } +} + +// restartQueueRunner restarts the queue runner for testing purposes +func restartQueueRunner() { + if dbos != nil { + // Create new context and cancel function + ctx, cancel := context.WithCancel(context.Background()) + dbos.queueRunnerCtx = ctx + dbos.queueRunnerCancelFunc = cancel + dbos.queueRunnerDone = make(chan struct{}) + + // Start the queue runner in a goroutine + go func() { + defer close(dbos.queueRunnerDone) + queueRunner(ctx) + }() + } +} + func equal(a, b []int) bool { if len(a) != len(b) { return false From 7b07b5aa40c7bed6282f4f5c42e3e8770cb86b35 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 21 Jul 2025 18:01:34 -0700 Subject: [PATCH 12/13] fix global concurrency --- dbos/queue.go | 16 ++++++++++------ dbos/queues_test.go | 1 + dbos/system_database.go | 8 ++++---- dbos/utils_test.go | 2 +- dbos/workflow.go | 2 +- 5 files changed, 17 insertions(+), 12 deletions(-) diff --git a/dbos/queue.go b/dbos/queue.go index fd87d9be..b12bba2d 100644 --- a/dbos/queue.go +++ b/dbos/queue.go @@ -14,9 +14,13 @@ import ( ) var ( - workflowQueueRegistry = make(map[string]WorkflowQueue) - DBOS_INTERNAL_QUEUE_NAME = "_dbos_internal_queue" - _ = NewWorkflowQueue(DBOS_INTERNAL_QUEUE_NAME) + workflowQueueRegistry = make(map[string]WorkflowQueue) + _ = NewWorkflowQueue(_DBOS_INTERNAL_QUEUE_NAME) +) + +const ( + _DBOS_INTERNAL_QUEUE_NAME = "_dbos_internal_queue" + _DEFAULT_MAX_TASKS_PER_ITERATION = 100 ) // RateLimiter represents a rate limiting configuration @@ -31,7 +35,7 @@ type WorkflowQueue struct { GlobalConcurrency *int PriorityEnabled bool Limiter *RateLimiter - MaxTasksPerIteration uint + MaxTasksPerIteration int } // QueueOption is a functional option for configuring a workflow queue @@ -61,7 +65,7 @@ func WithRateLimiter(limiter *RateLimiter) QueueOption { } } -func WithMaxTasksPerIteration(maxTasks uint) QueueOption { +func WithMaxTasksPerIteration(maxTasks int) QueueOption { return func(q *WorkflowQueue) { q.MaxTasksPerIteration = maxTasks } @@ -84,7 +88,7 @@ func NewWorkflowQueue(name string, options ...QueueOption) WorkflowQueue { GlobalConcurrency: nil, PriorityEnabled: false, Limiter: nil, - MaxTasksPerIteration: 100, // Default max tasks per iteration + MaxTasksPerIteration: _DEFAULT_MAX_TASKS_PER_ITERATION, } // Apply functional options diff --git a/dbos/queues_test.go b/dbos/queues_test.go index 6d683302..310b9828 100644 --- a/dbos/queues_test.go +++ b/dbos/queues_test.go @@ -405,6 +405,7 @@ func TestGlobalConcurrency(t *testing.T) { // Wait for the first workflow to start workflowEvent1.Wait() + time.Sleep(2 * time.Second) // Wait for a few seconds to let the queue runner loop // Ensure the second workflow has not started yet if workflowEvent2.IsSet { diff --git a/dbos/system_database.go b/dbos/system_database.go index 2536b949..99489c7a 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1321,7 +1321,7 @@ func (s *systemDatabase) DequeueWorkflows(ctx context.Context, queue WorkflowQue getLogger().Warn("Local pending workflows on queue exceeds worker concurrency limit", "local_pending", localPendingWorkflows, "queue_name", queue.Name, "concurrency_limit", workerConcurrency) } availableWorkerTasks := max(workerConcurrency-localPendingWorkflows, 0) - maxTasks = uint(availableWorkerTasks) + maxTasks = availableWorkerTasks } // Check global concurrency limit @@ -1336,8 +1336,8 @@ func (s *systemDatabase) DequeueWorkflows(ctx context.Context, queue WorkflowQue getLogger().Warn("Total pending workflows on queue exceeds global concurrency limit", "total_pending", globalPendingWorkflows, "queue_name", queue.Name, "concurrency_limit", concurrency) } availableTasks := max(concurrency-globalPendingWorkflows, 0) - if uint(availableTasks) < maxTasks { - maxTasks = uint(availableTasks) + if availableTasks < maxTasks { + maxTasks = availableTasks } } } @@ -1375,7 +1375,7 @@ func (s *systemDatabase) DequeueWorkflows(ctx context.Context, queue WorkflowQue } // Add limit if maxTasks is finite - if maxTasks > 0 { + if maxTasks >= 0 { query += fmt.Sprintf(" LIMIT %d", int(maxTasks)) } diff --git a/dbos/utils_test.go b/dbos/utils_test.go index e600bea5..45c2cd7c 100644 --- a/dbos/utils_test.go +++ b/dbos/utils_test.go @@ -153,7 +153,7 @@ func queueEntriesAreCleanedUp() bool { AND status IN ('ENQUEUED', 'PENDING')` var count int - err = tx.QueryRow(context.Background(), query, DBOS_INTERNAL_QUEUE_NAME).Scan(&count) + err = tx.QueryRow(context.Background(), query, _DBOS_INTERNAL_QUEUE_NAME).Scan(&count) tx.Rollback(context.Background()) // Clean up transaction if err != nil { diff --git a/dbos/workflow.go b/dbos/workflow.go index 6fa3bfad..9afc6d14 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -301,7 +301,7 @@ func WithWorkflow[P any, R any](fn WorkflowFunc[P, R], opts ...WorkflowRegistrat scheduledTime = entry.Next } wfID := fmt.Sprintf("sched-%s-%s", fqn, scheduledTime) // XXX we can rethink the format - wrappedFunction(context.Background(), any(scheduledTime).(P), WithWorkflowID(wfID), WithQueue(DBOS_INTERNAL_QUEUE_NAME)) + wrappedFunction(context.Background(), any(scheduledTime).(P), WithWorkflowID(wfID), WithQueue(_DBOS_INTERNAL_QUEUE_NAME)) }) if err != nil { panic(fmt.Sprintf("failed to register scheduled workflow: %v", err)) From 425bc07d7ef0f84111e715b0efd120a21d51a1f1 Mon Sep 17 00:00:00 2001 From: maxdml Date: Mon, 21 Jul 2025 18:26:11 -0700 Subject: [PATCH 13/13] comment --- dbos/system_database.go | 1 - 1 file changed, 1 deletion(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index 99489c7a..a4d68b2c 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -1374,7 +1374,6 @@ func (s *systemDatabase) DequeueWorkflows(ctx context.Context, queue WorkflowQue %s`, lockClause) } - // Add limit if maxTasks is finite if maxTasks >= 0 { query += fmt.Sprintf(" LIMIT %d", int(maxTasks)) }