Skip to content

Commit 60907cf

Browse files
authored
garbage collection, global timeout (#72)
also: - set max/min conns and timeouts on sysdb pool - fix a small race in a test, remove unused code
1 parent b61d9c5 commit 60907cf

File tree

3 files changed

+674
-42
lines changed

3 files changed

+674
-42
lines changed

dbos/admin_server.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -278,21 +278,23 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
278278
ctx.logger.Debug("Registering admin server endpoint", "pattern", _GLOBAL_TIMEOUT_PATTERN)
279279
mux.HandleFunc(_GLOBAL_TIMEOUT_PATTERN, func(w http.ResponseWriter, r *http.Request) {
280280
var inputs struct {
281-
CutoffEpochTimestampMs *int64 `json:"cutoff_epoch_timestamp_ms"`
281+
CutoffEpochTimestampMs int64 `json:"cutoff_epoch_timestamp_ms"`
282282
}
283283

284284
if err := json.NewDecoder(r.Body).Decode(&inputs); err != nil {
285285
http.Error(w, "Invalid JSON body", http.StatusBadRequest)
286286
return
287287
}
288288

289-
// TODO: Implement global timeout
290-
// err := globalTimeout(ctx, inputs.CutoffEpochTimestampMs)
291-
// if err != nil {
292-
// ctx.logger.Error("Global timeout failed", "error", err)
293-
// http.Error(w, fmt.Sprintf("Global timeout failed: %v", err), http.StatusInternalServerError)
294-
// return
295-
// }
289+
cutoffTime := time.UnixMilli(inputs.CutoffEpochTimestampMs)
290+
ctx.logger.Info("Global timeout request", "cutoff_time", cutoffTime)
291+
292+
err := ctx.systemDB.cancelAllBefore(ctx, cutoffTime)
293+
if err != nil {
294+
ctx.logger.Error("Global timeout failed", "error", err)
295+
http.Error(w, fmt.Sprintf("Global timeout failed: %v", err), http.StatusInternalServerError)
296+
return
297+
}
296298

297299
w.WriteHeader(http.StatusNoContent)
298300
})

dbos/system_database.go

Lines changed: 109 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type systemDatabase interface {
3535
updateWorkflowOutcome(ctx context.Context, input updateWorkflowOutcomeDBInput) error
3636
awaitWorkflowResult(ctx context.Context, workflowID string) (any, error)
3737
cancelWorkflow(ctx context.Context, workflowID string) error
38+
cancelAllBefore(ctx context.Context, cutoffTime time.Time) error
3839
resumeWorkflow(ctx context.Context, workflowID string) error
3940
forkWorkflow(ctx context.Context, input forkWorkflowDBInput) error
4041

@@ -60,6 +61,9 @@ type systemDatabase interface {
6061
// Queues
6162
dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInput) ([]dequeuedWorkflow, error)
6263
clearQueueAssignment(ctx context.Context, workflowID string) (bool, error)
64+
65+
// Garbage collection
66+
garbageCollectWorkflows(ctx context.Context, input garbageCollectWorkflowsInput) error
6367
}
6468

6569
type sysDB struct {
@@ -186,8 +190,22 @@ func newSystemDatabase(ctx context.Context, databaseURL string, logger *slog.Log
186190
return nil, fmt.Errorf("failed to run migrations: %v", err)
187191
}
188192

189-
// Create pgx pool
190-
pool, err := pgxpool.New(ctx, databaseURL)
193+
// Parse the connection string to get a config
194+
config, err := pgxpool.ParseConfig(databaseURL)
195+
if err != nil {
196+
return nil, fmt.Errorf("failed to parse database URL: %v", err)
197+
}
198+
// Set pool configuration
199+
config.MaxConns = 20
200+
config.MinConns = 0
201+
config.MaxConnLifetime = time.Hour
202+
config.MaxConnIdleTime = time.Minute * 5
203+
204+
// Add acquire timeout to prevent indefinite blocking
205+
config.ConnConfig.ConnectTimeout = 10 * time.Second
206+
207+
// Create pool with configuration
208+
pool, err := pgxpool.NewWithConfig(ctx, config)
191209
if err != nil {
192210
return nil, fmt.Errorf("failed to create connection pool: %v", err)
193211
}
@@ -202,11 +220,11 @@ func newSystemDatabase(ctx context.Context, databaseURL string, logger *slog.Log
202220
notificationsMap := &sync.Map{}
203221

204222
// Create a connection to listen on notifications
205-
config, err := pgconn.ParseConfig(databaseURL)
223+
notifierConnConfig, err := pgconn.ParseConfig(databaseURL)
206224
if err != nil {
207225
return nil, fmt.Errorf("failed to parse database URL: %v", err)
208226
}
209-
config.OnNotification = func(c *pgconn.PgConn, n *pgconn.Notification) {
227+
notifierConnConfig.OnNotification = func(c *pgconn.PgConn, n *pgconn.Notification) {
210228
if n.Channel == _DBOS_NOTIFICATIONS_CHANNEL || n.Channel == _DBOS_WORKFLOW_EVENTS_CHANNEL {
211229
// Check if an entry exists in the map, indexed by the payload
212230
// If yes, broadcast on the condition variable so listeners can wake up
@@ -215,7 +233,7 @@ func newSystemDatabase(ctx context.Context, databaseURL string, logger *slog.Log
215233
}
216234
}
217235
}
218-
notificationListenerConnection, err := pgconn.ConnectConfig(ctx, config)
236+
notificationListenerConnection, err := pgconn.ConnectConfig(ctx, notifierConnConfig)
219237
if err != nil {
220238
return nil, fmt.Errorf("failed to connect notification listener to database: %v", err)
221239
}
@@ -756,6 +774,92 @@ func (s *sysDB) cancelWorkflow(ctx context.Context, workflowID string) error {
756774
return nil
757775
}
758776

777+
func (s *sysDB) cancelAllBefore(ctx context.Context, cutoffTime time.Time) error {
778+
// List all workflows in PENDING or ENQUEUED state ending at cutoffTime
779+
listInput := listWorkflowsDBInput{
780+
endTime: cutoffTime,
781+
status: []WorkflowStatusType{WorkflowStatusPending, WorkflowStatusEnqueued},
782+
}
783+
784+
workflows, err := s.listWorkflows(ctx, listInput)
785+
if err != nil {
786+
return fmt.Errorf("failed to list workflows for cancellation: %w", err)
787+
}
788+
789+
// Cancel each workflow
790+
for _, workflow := range workflows {
791+
if err := s.cancelWorkflow(ctx, workflow.ID); err != nil {
792+
s.logger.Error("Failed to cancel workflow during cancelAllBefore", "workflowID", workflow.ID, "error", err)
793+
// Continue with other workflows even if one fails
794+
// If desired we could funnel the errors back the caller (conductor, admin server)
795+
}
796+
}
797+
798+
return nil
799+
}
800+
801+
type garbageCollectWorkflowsInput struct {
802+
cutoffEpochTimestampMs *int64
803+
rowsThreshold *int
804+
}
805+
806+
func (s *sysDB) garbageCollectWorkflows(ctx context.Context, input garbageCollectWorkflowsInput) error {
807+
// Validate input parameters
808+
if input.rowsThreshold != nil && *input.rowsThreshold <= 0 {
809+
return fmt.Errorf("rowsThreshold must be greater than 0, got %d", *input.rowsThreshold)
810+
}
811+
812+
cutoffTimestamp := input.cutoffEpochTimestampMs
813+
814+
// If rowsThreshold is provided, get the timestamp of the Nth newest workflow
815+
if input.rowsThreshold != nil {
816+
query := `SELECT created_at
817+
FROM dbos.workflow_status
818+
ORDER BY created_at DESC
819+
LIMIT 1 OFFSET $1`
820+
821+
var rowsBasedCutoff int64
822+
err := s.pool.QueryRow(ctx, query, *input.rowsThreshold-1).Scan(&rowsBasedCutoff)
823+
if err != nil {
824+
if err == pgx.ErrNoRows {
825+
// Not enough rows to apply threshold, no garbage collection needed
826+
return nil
827+
}
828+
return fmt.Errorf("failed to query cutoff timestamp by rows threshold: %w", err)
829+
}
830+
831+
// Use the more restrictive cutoff (higher timestamp = more recent = less deletion)
832+
if cutoffTimestamp == nil || rowsBasedCutoff > *cutoffTimestamp {
833+
cutoffTimestamp = &rowsBasedCutoff
834+
}
835+
}
836+
837+
// If no cutoff is determined, no garbage collection is needed
838+
if cutoffTimestamp == nil {
839+
return nil
840+
}
841+
842+
// Delete all workflows older than cutoff that are NOT PENDING or ENQUEUED
843+
query := `DELETE FROM dbos.workflow_status
844+
WHERE created_at < $1
845+
AND status NOT IN ($2, $3)`
846+
847+
commandTag, err := s.pool.Exec(ctx, query,
848+
*cutoffTimestamp,
849+
WorkflowStatusPending,
850+
WorkflowStatusEnqueued)
851+
852+
if err != nil {
853+
return fmt.Errorf("failed to garbage collect workflows: %w", err)
854+
}
855+
856+
s.logger.Info("Garbage collected workflows",
857+
"cutoff_timestamp", *cutoffTimestamp,
858+
"deleted_count", commandTag.RowsAffected())
859+
860+
return nil
861+
}
862+
759863
func (s *sysDB) resumeWorkflow(ctx context.Context, workflowID string) error {
760864
tx, err := s.pool.Begin(ctx)
761865
if err != nil {

0 commit comments

Comments
 (0)