Skip to content

Commit d631774

Browse files
committed
garbage collection, global timeout, set max/min conns and timeouts on pool, fix a small race in a test, remove unused code
1 parent 5fde81e commit d631774

File tree

3 files changed

+572
-42
lines changed

3 files changed

+572
-42
lines changed

dbos/admin_server.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -278,21 +278,23 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
278278
ctx.logger.Debug("Registering admin server endpoint", "pattern", _GLOBAL_TIMEOUT_PATTERN)
279279
mux.HandleFunc(_GLOBAL_TIMEOUT_PATTERN, func(w http.ResponseWriter, r *http.Request) {
280280
var inputs struct {
281-
CutoffEpochTimestampMs *int64 `json:"cutoff_epoch_timestamp_ms"`
281+
CutoffEpochTimestampMs int64 `json:"cutoff_epoch_timestamp_ms"`
282282
}
283283

284284
if err := json.NewDecoder(r.Body).Decode(&inputs); err != nil {
285285
http.Error(w, "Invalid JSON body", http.StatusBadRequest)
286286
return
287287
}
288288

289-
// TODO: Implement global timeout
290-
// err := globalTimeout(ctx, inputs.CutoffEpochTimestampMs)
291-
// if err != nil {
292-
// ctx.logger.Error("Global timeout failed", "error", err)
293-
// http.Error(w, fmt.Sprintf("Global timeout failed: %v", err), http.StatusInternalServerError)
294-
// return
295-
// }
289+
cutoffTime := time.UnixMilli(inputs.CutoffEpochTimestampMs)
290+
ctx.logger.Info("Global timeout request", "cutoff_time", cutoffTime)
291+
292+
err := ctx.systemDB.cancelAllBefore(ctx, cutoffTime)
293+
if err != nil {
294+
ctx.logger.Error("Global timeout failed", "error", err)
295+
http.Error(w, fmt.Sprintf("Global timeout failed: %v", err), http.StatusInternalServerError)
296+
return
297+
}
296298

297299
w.WriteHeader(http.StatusNoContent)
298300
})

dbos/system_database.go

Lines changed: 109 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type systemDatabase interface {
3535
updateWorkflowOutcome(ctx context.Context, input updateWorkflowOutcomeDBInput) error
3636
awaitWorkflowResult(ctx context.Context, workflowID string) (any, error)
3737
cancelWorkflow(ctx context.Context, workflowID string) error
38+
cancelAllBefore(ctx context.Context, cutoffTime time.Time) error
3839
resumeWorkflow(ctx context.Context, workflowID string) error
3940
forkWorkflow(ctx context.Context, input forkWorkflowDBInput) error
4041

@@ -60,6 +61,9 @@ type systemDatabase interface {
6061
// Queues
6162
dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInput) ([]dequeuedWorkflow, error)
6263
clearQueueAssignment(ctx context.Context, workflowID string) (bool, error)
64+
65+
// Garbage collection
66+
garbageCollectWorkflows(ctx context.Context, input garbageCollectWorkflowsInput) error
6367
}
6468

6569
type sysDB struct {
@@ -186,8 +190,22 @@ func newSystemDatabase(ctx context.Context, databaseURL string, logger *slog.Log
186190
return nil, fmt.Errorf("failed to run migrations: %v", err)
187191
}
188192

189-
// Create pgx pool
190-
pool, err := pgxpool.New(ctx, databaseURL)
193+
// Parse the connection string to get a config
194+
config, err := pgxpool.ParseConfig(databaseURL)
195+
if err != nil {
196+
return nil, fmt.Errorf("failed to parse database URL: %v", err)
197+
}
198+
// Set pool configuration
199+
config.MaxConns = 20
200+
config.MinConns = 0
201+
config.MaxConnLifetime = time.Hour
202+
config.MaxConnIdleTime = time.Minute * 5
203+
204+
// Add acquire timeout to prevent indefinite blocking
205+
config.ConnConfig.ConnectTimeout = 10 * time.Second
206+
207+
// Create pool with configuration
208+
pool, err := pgxpool.NewWithConfig(ctx, config)
191209
if err != nil {
192210
return nil, fmt.Errorf("failed to create connection pool: %v", err)
193211
}
@@ -202,11 +220,11 @@ func newSystemDatabase(ctx context.Context, databaseURL string, logger *slog.Log
202220
notificationsMap := &sync.Map{}
203221

204222
// Create a connection to listen on notifications
205-
config, err := pgconn.ParseConfig(databaseURL)
223+
notifierConnConfig, err := pgconn.ParseConfig(databaseURL)
206224
if err != nil {
207225
return nil, fmt.Errorf("failed to parse database URL: %v", err)
208226
}
209-
config.OnNotification = func(c *pgconn.PgConn, n *pgconn.Notification) {
227+
notifierConnConfig.OnNotification = func(c *pgconn.PgConn, n *pgconn.Notification) {
210228
if n.Channel == _DBOS_NOTIFICATIONS_CHANNEL || n.Channel == _DBOS_WORKFLOW_EVENTS_CHANNEL {
211229
// Check if an entry exists in the map, indexed by the payload
212230
// If yes, broadcast on the condition variable so listeners can wake up
@@ -215,7 +233,7 @@ func newSystemDatabase(ctx context.Context, databaseURL string, logger *slog.Log
215233
}
216234
}
217235
}
218-
notificationListenerConnection, err := pgconn.ConnectConfig(ctx, config)
236+
notificationListenerConnection, err := pgconn.ConnectConfig(ctx, notifierConnConfig)
219237
if err != nil {
220238
return nil, fmt.Errorf("failed to connect notification listener to database: %v", err)
221239
}
@@ -756,6 +774,87 @@ func (s *sysDB) cancelWorkflow(ctx context.Context, workflowID string) error {
756774
return nil
757775
}
758776

777+
func (s *sysDB) cancelAllBefore(ctx context.Context, cutoffTime time.Time) error {
778+
// List all workflows in PENDING or ENQUEUED state ending at cutoffTime
779+
listInput := listWorkflowsDBInput{
780+
endTime: cutoffTime,
781+
status: []WorkflowStatusType{WorkflowStatusPending, WorkflowStatusEnqueued},
782+
}
783+
784+
workflows, err := s.listWorkflows(ctx, listInput)
785+
if err != nil {
786+
return fmt.Errorf("failed to list workflows for cancellation: %w", err)
787+
}
788+
789+
// Cancel each workflow
790+
for _, workflow := range workflows {
791+
if err := s.cancelWorkflow(ctx, workflow.ID); err != nil {
792+
s.logger.Error("Failed to cancel workflow during cancelAllBefore", "workflowID", workflow.ID, "error", err)
793+
// Continue with other workflows even if one fails
794+
// If desired we could funnel the errors back the caller (conductor, admin server)
795+
}
796+
}
797+
798+
return nil
799+
}
800+
801+
func (s *sysDB) garbageCollectWorkflows(ctx context.Context, input garbageCollectWorkflowsInput) error {
802+
// Validate input parameters
803+
if input.rowsThreshold != nil && *input.rowsThreshold <= 0 {
804+
return fmt.Errorf("rowsThreshold must be greater than 0, got %d", *input.rowsThreshold)
805+
}
806+
807+
cutoffTimestamp := input.cutoffEpochTimestampMs
808+
809+
// If rowsThreshold is provided, get the timestamp of the Nth newest workflow
810+
if input.rowsThreshold != nil {
811+
query := `SELECT created_at
812+
FROM dbos.workflow_status
813+
ORDER BY created_at DESC
814+
LIMIT 1 OFFSET $1`
815+
816+
var rowsBasedCutoff int64
817+
err := s.pool.QueryRow(ctx, query, *input.rowsThreshold-1).Scan(&rowsBasedCutoff)
818+
if err != nil {
819+
if err == pgx.ErrNoRows {
820+
// Not enough rows to apply threshold, no garbage collection needed
821+
return nil
822+
}
823+
return fmt.Errorf("failed to query cutoff timestamp by rows threshold: %w", err)
824+
}
825+
826+
// Use the more restrictive cutoff (higher timestamp = more recent = less deletion)
827+
if cutoffTimestamp == nil || rowsBasedCutoff > *cutoffTimestamp {
828+
cutoffTimestamp = &rowsBasedCutoff
829+
}
830+
}
831+
832+
// If no cutoff is determined, no garbage collection is needed
833+
if cutoffTimestamp == nil {
834+
return nil
835+
}
836+
837+
// Delete all workflows older than cutoff that are NOT PENDING or ENQUEUED
838+
query := `DELETE FROM dbos.workflow_status
839+
WHERE created_at < $1
840+
AND status NOT IN ($2, $3)`
841+
842+
commandTag, err := s.pool.Exec(ctx, query,
843+
*cutoffTimestamp,
844+
WorkflowStatusPending,
845+
WorkflowStatusEnqueued)
846+
847+
if err != nil {
848+
return fmt.Errorf("failed to garbage collect workflows: %w", err)
849+
}
850+
851+
s.logger.Info("Garbage collected workflows",
852+
"cutoff_timestamp", *cutoffTimestamp,
853+
"deleted_count", commandTag.RowsAffected())
854+
855+
return nil
856+
}
857+
759858
func (s *sysDB) resumeWorkflow(ctx context.Context, workflowID string) error {
760859
tx, err := s.pool.Begin(ctx)
761860
if err != nil {
@@ -1872,6 +1971,11 @@ type dequeueWorkflowsInput struct {
18721971
applicationVersion string
18731972
}
18741973

1974+
type garbageCollectWorkflowsInput struct {
1975+
cutoffEpochTimestampMs *int64
1976+
rowsThreshold *int
1977+
}
1978+
18751979
func (s *sysDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInput) ([]dequeuedWorkflow, error) {
18761980
// Begin transaction with snapshot isolation
18771981
tx, err := s.pool.Begin(ctx)

0 commit comments

Comments
 (0)