Skip to content

Commit 46cb68a

Browse files
committed
constants
1 parent a8ed0ca commit 46cb68a

File tree

4 files changed

+57
-33
lines changed

4 files changed

+57
-33
lines changed

dbos/admin_server.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,25 @@ import (
1010
)
1111

1212
const (
13-
healthCheckPath = "/dbos-healthz"
14-
workflowRecoveryPath = "/dbos-workflow-recovery"
15-
workflowQueuesMetadataPath = "/dbos-workflow-queues-metadata"
13+
_HEALTHCHECK_PATH = "/dbos-healthz"
14+
_WORKFLOW_RECOVERY_PATH = "/dbos-workflow-recovery"
15+
_WORKFLOW_QUEUES_METADATA_PATH = "/dbos-workflow-queues-metadata"
16+
17+
_ADMIN_SERVER_READ_HEADER_TIMEOUT = 5 * time.Second
18+
_ADMIN_SERVER_SHUTDOWN_TIMEOUT = 10 * time.Second
1619
)
1720

1821
type adminServer struct {
1922
server *http.Server
2023
logger *slog.Logger
24+
port int
2125
}
2226

2327
func newAdminServer(ctx *dbosContext, port int) *adminServer {
2428
mux := http.NewServeMux()
2529

2630
// Health endpoint
27-
mux.HandleFunc(healthCheckPath, func(w http.ResponseWriter, r *http.Request) {
31+
mux.HandleFunc(_HEALTHCHECK_PATH, func(w http.ResponseWriter, r *http.Request) {
2832
w.Header().Set("Content-Type", "application/json")
2933
w.WriteHeader(http.StatusOK)
3034
_, err := w.Write([]byte(`{"status":"healthy"}`))
@@ -36,7 +40,7 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
3640
})
3741

3842
// Recovery endpoint
39-
mux.HandleFunc(workflowRecoveryPath, func(w http.ResponseWriter, r *http.Request) {
43+
mux.HandleFunc(_WORKFLOW_RECOVERY_PATH, func(w http.ResponseWriter, r *http.Request) {
4044
if r.Method != http.MethodPost {
4145
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
4246
return
@@ -72,7 +76,7 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
7276
})
7377

7478
// Queue metadata endpoint
75-
mux.HandleFunc(workflowQueuesMetadataPath, func(w http.ResponseWriter, r *http.Request) {
79+
mux.HandleFunc(_WORKFLOW_QUEUES_METADATA_PATH, func(w http.ResponseWriter, r *http.Request) {
7680
if r.Method != http.MethodGet {
7781
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
7882
return
@@ -91,17 +95,18 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
9195
server := &http.Server{
9296
Addr: fmt.Sprintf(":%d", port),
9397
Handler: mux,
94-
ReadHeaderTimeout: 5 * time.Second,
98+
ReadHeaderTimeout: _ADMIN_SERVER_READ_HEADER_TIMEOUT,
9599
}
96100

97101
return &adminServer{
98102
server: server,
99103
logger: ctx.logger,
104+
port: port,
100105
}
101106
}
102107

103108
func (as *adminServer) Start() error {
104-
as.logger.Info("Starting admin server", "port", 3001)
109+
as.logger.Info("Starting admin server", "port", as.port)
105110

106111
go func() {
107112
if err := as.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
@@ -116,7 +121,7 @@ func (as *adminServer) Shutdown(ctx context.Context) error {
116121
as.logger.Info("Shutting down admin server")
117122

118123
// XXX consider moving the grace period to DBOSContext.Shutdown()
119-
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
124+
ctx, cancel := context.WithTimeout(ctx, _ADMIN_SERVER_SHUTDOWN_TIMEOUT)
120125
defer cancel()
121126

122127
if err := as.server.Shutdown(ctx); err != nil {

dbos/admin_server_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func TestAdminServer(t *testing.T) {
3838

3939
// Verify admin server is not running
4040
client := &http.Client{Timeout: 1 * time.Second}
41-
_, err = client.Get("http://localhost:3001" + healthCheckPath)
41+
_, err = client.Get("http://localhost:3001" + _HEALTHCHECK_PATH)
4242
require.Error(t, err, "Expected request to fail when admin server is not started")
4343

4444
// Verify the DBOS executor doesn't have an admin server instance
@@ -89,13 +89,13 @@ func TestAdminServer(t *testing.T) {
8989
{
9090
name: "Health endpoint responds correctly",
9191
method: "GET",
92-
endpoint: "http://localhost:3001" + healthCheckPath,
92+
endpoint: "http://localhost:3001" + _HEALTHCHECK_PATH,
9393
expectedStatus: http.StatusOK,
9494
},
9595
{
9696
name: "Recovery endpoint responds correctly with valid JSON",
9797
method: "POST",
98-
endpoint: "http://localhost:3001" + workflowRecoveryPath,
98+
endpoint: "http://localhost:3001" + _WORKFLOW_RECOVERY_PATH,
9999
body: bytes.NewBuffer(mustMarshal([]string{"executor1", "executor2"})),
100100
contentType: "application/json",
101101
expectedStatus: http.StatusOK,
@@ -109,21 +109,21 @@ func TestAdminServer(t *testing.T) {
109109
{
110110
name: "Recovery endpoint rejects invalid methods",
111111
method: "GET",
112-
endpoint: "http://localhost:3001" + workflowRecoveryPath,
112+
endpoint: "http://localhost:3001" + _WORKFLOW_RECOVERY_PATH,
113113
expectedStatus: http.StatusMethodNotAllowed,
114114
},
115115
{
116116
name: "Recovery endpoint rejects invalid JSON",
117117
method: "POST",
118-
endpoint: "http://localhost:3001" + workflowRecoveryPath,
118+
endpoint: "http://localhost:3001" + _WORKFLOW_RECOVERY_PATH,
119119
body: strings.NewReader(`{"invalid": json}`),
120120
contentType: "application/json",
121121
expectedStatus: http.StatusBadRequest,
122122
},
123123
{
124124
name: "Queue metadata endpoint responds correctly",
125125
method: "GET",
126-
endpoint: "http://localhost:3001" + workflowQueuesMetadataPath,
126+
endpoint: "http://localhost:3001" + _WORKFLOW_QUEUES_METADATA_PATH,
127127
expectedStatus: http.StatusOK,
128128
validateResp: func(t *testing.T, resp *http.Response) {
129129
var queueMetadata []WorkflowQueue
@@ -149,7 +149,7 @@ func TestAdminServer(t *testing.T) {
149149
{
150150
name: "Queue metadata endpoint rejects invalid methods",
151151
method: "POST",
152-
endpoint: "http://localhost:3001" + workflowQueuesMetadataPath,
152+
endpoint: "http://localhost:3001" + _WORKFLOW_QUEUES_METADATA_PATH,
153153
expectedStatus: http.StatusMethodNotAllowed,
154154
},
155155
}

dbos/system_database.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,21 @@ func createDatabaseIfNotExists(ctx context.Context, databaseURL string, logger *
119119
//go:embed migrations/*.sql
120120
var migrationFiles embed.FS
121121

122-
const _DBOS_MIGRATION_TABLE = "dbos_schema_migrations"
122+
const (
123+
_DBOS_MIGRATION_TABLE = "dbos_schema_migrations"
124+
125+
// PostgreSQL error codes
126+
_PG_ERROR_UNIQUE_VIOLATION = "23505"
127+
_PG_ERROR_FOREIGN_KEY_VIOLATION = "23503"
128+
129+
// Notification channels
130+
_DBOS_NOTIFICATIONS_CHANNEL = "dbos_notifications_channel"
131+
_DBOS_WORKFLOW_EVENTS_CHANNEL = "dbos_workflow_events_channel"
132+
133+
// Database retry timeouts
134+
_DB_CONNECTION_RETRY_DELAY = 500 * time.Millisecond
135+
_DB_RETRY_INTERVAL = 1 * time.Second
136+
)
123137

124138
func runMigrations(databaseURL string) error {
125139
// Change the driver to pgx5
@@ -194,7 +208,7 @@ func newSystemDatabase(ctx context.Context, databaseURL string, logger *slog.Log
194208
return nil, fmt.Errorf("failed to parse database URL: %v", err)
195209
}
196210
config.OnNotification = func(c *pgconn.PgConn, n *pgconn.Notification) {
197-
if n.Channel == "dbos_notifications_channel" || n.Channel == "dbos_workflow_events_channel" {
211+
if n.Channel == _DBOS_NOTIFICATIONS_CHANNEL || n.Channel == _DBOS_WORKFLOW_EVENTS_CHANNEL {
198212
// Check if an entry exists in the map, indexed by the payload
199213
// If yes, broadcast on the condition variable so listeners can wake up
200214
if cond, exists := notificationsMap.Load(n.Payload); exists {
@@ -245,7 +259,7 @@ func (s *sysDB) shutdown(ctx context.Context) {
245259
// Allow pgx health checks to complete
246260
// https://github.com/jackc/pgx/blob/15bca4a4e14e0049777c1245dba4c16300fe4fd0/pgxpool/pool.go#L417
247261
// These trigger go-leak alerts
248-
time.Sleep(500 * time.Millisecond)
262+
time.Sleep(_DB_CONNECTION_RETRY_DELAY)
249263

250264
s.launched = false
251265
}
@@ -380,7 +394,7 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt
380394
)
381395
if err != nil {
382396
// Handle unique constraint violation for the deduplication ID (this should be the only case for a 23505)
383-
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == "23505" {
397+
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == _PG_ERROR_UNIQUE_VIOLATION {
384398
return nil, newQueueDeduplicatedError(
385399
input.status.ID,
386400
input.status.QueueName,
@@ -924,7 +938,7 @@ func (s *sysDB) awaitWorkflowResult(ctx context.Context, workflowID string) (any
924938
err := row.Scan(&status, &outputString, &errorStr)
925939
if err != nil {
926940
if err == pgx.ErrNoRows {
927-
time.Sleep(1 * time.Second)
941+
time.Sleep(_DB_RETRY_INTERVAL)
928942
continue
929943
}
930944
return nil, fmt.Errorf("failed to query workflow status: %w", err)
@@ -945,7 +959,7 @@ func (s *sysDB) awaitWorkflowResult(ctx context.Context, workflowID string) (any
945959
case WorkflowStatusCancelled:
946960
return output, newAwaitedWorkflowCancelledError(workflowID)
947961
default:
948-
time.Sleep(1 * time.Second)
962+
time.Sleep(_DB_RETRY_INTERVAL)
949963
}
950964
}
951965
}
@@ -1003,7 +1017,7 @@ func (s *sysDB) recordOperationResult(ctx context.Context, input recordOperation
10031017

10041018
if err != nil {
10051019
s.logger.Error("RecordOperationResult Error occurred", "error", err)
1006-
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == "23505" {
1020+
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == _PG_ERROR_UNIQUE_VIOLATION {
10071021
return newWorkflowConflictIDError(input.workflowID)
10081022
}
10091023
return err
@@ -1054,7 +1068,7 @@ func (s *sysDB) recordChildWorkflow(ctx context.Context, input recordChildWorkfl
10541068

10551069
if err != nil {
10561070
// Check for unique constraint violation (conflict ID error)
1057-
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == "23505" {
1071+
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == _PG_ERROR_UNIQUE_VIOLATION {
10581072
return fmt.Errorf(
10591073
"child workflow %s already registered for parent workflow %s (operation ID: %d)",
10601074
input.childWorkflowID, input.parentWorkflowID, input.stepID)
@@ -1361,7 +1375,7 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) {
13611375
}()
13621376

13631377
s.logger.Info("DBOS: Starting notification listener loop")
1364-
mrr := s.notificationListenerConnection.Exec(ctx, "LISTEN dbos_notifications_channel; LISTEN dbos_workflow_events_channel")
1378+
mrr := s.notificationListenerConnection.Exec(ctx, fmt.Sprintf("LISTEN %s; LISTEN %s", _DBOS_NOTIFICATIONS_CHANNEL, _DBOS_WORKFLOW_EVENTS_CHANNEL))
13651379
results, err := mrr.ReadAll()
13661380
if err != nil {
13671381
s.logger.Error("Failed to listen on notification channels", "error", err)
@@ -1399,7 +1413,7 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) {
13991413

14001414
// Other errors - log and retry. XXX eventually add exponential backoff + jitter
14011415
s.logger.Error("Error waiting for notification", "error", err)
1402-
time.Sleep(500 * time.Millisecond)
1416+
time.Sleep(_DB_CONNECTION_RETRY_DELAY)
14031417
continue
14041418
}
14051419
}
@@ -1472,7 +1486,7 @@ func (s *sysDB) send(ctx context.Context, input WorkflowSendInput) error {
14721486
_, err = tx.Exec(ctx, insertQuery, input.DestinationID, topic, messageString)
14731487
if err != nil {
14741488
// Check for foreign key violation (destination workflow doesn't exist)
1475-
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == "23503" {
1489+
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == _PG_ERROR_FOREIGN_KEY_VIOLATION {
14761490
return newNonExistentWorkflowError(input.DestinationID)
14771491
}
14781492
return fmt.Errorf("failed to insert notification: %w", err)

dbos/workflow.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,11 @@ type workflowRegistrationOption func(*workflowRegistrationParams)
305305

306306
const (
307307
_DEFAULT_MAX_RECOVERY_ATTEMPTS = 100
308+
309+
// Step retry defaults
310+
_DEFAULT_STEP_BASE_INTERVAL = 100 * time.Millisecond
311+
_DEFAULT_STEP_MAX_INTERVAL = 5 * time.Second
312+
_DEFAULT_STEP_BACKOFF_FACTOR = 2.0
308313
)
309314

310315
// WithMaxRetries sets the maximum number of retry attempts for workflow recovery.
@@ -825,9 +830,9 @@ func setStepParamDefaults(params *StepParams, stepName string) *StepParams {
825830
if params == nil {
826831
return &StepParams{
827832
MaxRetries: 0, // Default to no retries
828-
BackoffFactor: 2.0,
829-
BaseInterval: 100 * time.Millisecond, // Default base interval
830-
MaxInterval: 5 * time.Second, // Default max interval
833+
BackoffFactor: _DEFAULT_STEP_BACKOFF_FACTOR,
834+
BaseInterval: _DEFAULT_STEP_BASE_INTERVAL, // Default base interval
835+
MaxInterval: _DEFAULT_STEP_MAX_INTERVAL, // Default max interval
831836
StepName: func() string {
832837
if value, ok := typeErasedStepNameToStepName.Load(stepName); ok {
833838
return value.(string)
@@ -839,13 +844,13 @@ func setStepParamDefaults(params *StepParams, stepName string) *StepParams {
839844

840845
// Set defaults for zero values
841846
if params.BackoffFactor == 0 {
842-
params.BackoffFactor = 2.0 // Default backoff factor
847+
params.BackoffFactor = _DEFAULT_STEP_BACKOFF_FACTOR // Default backoff factor
843848
}
844849
if params.BaseInterval == 0 {
845-
params.BaseInterval = 100 * time.Millisecond // Default base interval
850+
params.BaseInterval = _DEFAULT_STEP_BASE_INTERVAL // Default base interval
846851
}
847852
if params.MaxInterval == 0 {
848-
params.MaxInterval = 5 * time.Second // Default max interval
853+
params.MaxInterval = _DEFAULT_STEP_MAX_INTERVAL // Default max interval
849854
}
850855
if len(params.StepName) == 0 {
851856
// If the step name is not provided, use the function name

0 commit comments

Comments
 (0)