Skip to content

Commit 307b432

Browse files
authored
Move queue runner to DBOSContext + fix races & leaks (#46)
This is a move towards having the `DBOSContext` federate correctly all its underlying resources, and, specifically, propagate cancellation to said resources (including workflows.) Calling `DBOSContext.Shutdown()` will trigger the cancelation of the context, which will signal all resources they should stop work. - New runner struct, attached to DBOSContext. Move queue registry in runner struct. - Queue runner cancellation managed by DBOSContext.Shutdown - Add [go leak checker](https://github.com/uber-go/goleak) in tests - Simplify queue metadata (call list queues on the runner) - Fix races in the tests - Handle context timeout in system DB `DequeueWorkflows` - Better system DB cleanup, including notification loop cancellation - Have GHA test for races using `-race`
1 parent f4f045e commit 307b432

File tree

13 files changed

+392
-379
lines changed

13 files changed

+392
-379
lines changed

.github/workflows/tests.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ on:
66
- main
77
pull_request:
88
branches:
9-
- main
109
types:
1110
- ready_for_review
1211
- opened
@@ -62,7 +61,7 @@ jobs:
6261
run: go install gotest.tools/gotestsum@latest
6362

6463
- name: Run tests
65-
run: gotestsum --format github-action
64+
run: gotestsum --format github-action -- -race ./...
6665
working-directory: ./dbos
6766
env:
6867
PGPASSWORD: a!b@c$d()e*_,/:;=?@ff[]22

dbos/admin_server.go

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,6 @@ type adminServer struct {
2020
logger *slog.Logger
2121
}
2222

23-
type queueMetadata struct {
24-
Name string `json:"name"`
25-
Concurrency *int `json:"concurrency,omitempty"`
26-
WorkerConcurrency *int `json:"workerConcurrency,omitempty"`
27-
RateLimit *RateLimiter `json:"rateLimit,omitempty"`
28-
}
29-
3023
func newAdminServer(ctx *dbosContext, port int) *adminServer {
3124
mux := http.NewServeMux()
3225

@@ -80,19 +73,7 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
8073
return
8174
}
8275

83-
var queueMetadataArray []queueMetadata
84-
85-
// Iterate through all queues in the registry
86-
for _, queue := range workflowQueueRegistry {
87-
queueMetadata := queueMetadata{
88-
Name: queue.name,
89-
WorkerConcurrency: queue.workerConcurrency,
90-
Concurrency: queue.globalConcurrency,
91-
RateLimit: queue.limiter,
92-
}
93-
94-
queueMetadataArray = append(queueMetadataArray, queueMetadata)
95-
}
76+
queueMetadataArray := ctx.queueRunner.listQueues()
9677

9778
w.Header().Set("Content-Type", "application/json")
9879
if err := json.NewEncoder(w).Encode(queueMetadataArray); err != nil {
@@ -125,10 +106,11 @@ func (as *adminServer) Start() error {
125106
return nil
126107
}
127108

128-
func (as *adminServer) Shutdown() error {
109+
func (as *adminServer) Shutdown(ctx context.Context) error {
129110
as.logger.Info("Shutting down admin server")
130111

131-
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
112+
// XXX consider moving the grace period to DBOSContext.Shutdown()
113+
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
132114
defer cancel()
133115

134116
if err := as.server.Shutdown(ctx); err != nil {

dbos/admin_server_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func TestAdminServer(t *testing.T) {
144144
endpoint: "http://localhost:3001" + workflowQueuesMetadataPath,
145145
expectedStatus: http.StatusOK,
146146
validateResp: func(t *testing.T, resp *http.Response) {
147-
var queueMetadata []queueMetadata
147+
var queueMetadata []WorkflowQueue
148148
if err := json.NewDecoder(resp.Body).Decode(&queueMetadata); err != nil {
149149
t.Errorf("Failed to decode response as QueueMetadata array: %v", err)
150150
}
@@ -160,8 +160,8 @@ func TestAdminServer(t *testing.T) {
160160
for _, queue := range queueMetadata {
161161
if queue.Name == _DBOS_INTERNAL_QUEUE_NAME { // Internal queue name
162162
foundInternalQueue = true
163-
if queue.Concurrency != nil {
164-
t.Errorf("Expected internal queue to have no concurrency limit, but got %v", *queue.Concurrency)
163+
if queue.GlobalConcurrency != nil {
164+
t.Errorf("Expected internal queue to have no concurrency limit, but got %v", *queue.GlobalConcurrency)
165165
}
166166
if queue.WorkerConcurrency != nil {
167167
t.Errorf("Expected internal queue to have no worker concurrency limit, but got %v", *queue.WorkerConcurrency)

dbos/dbos.go

Lines changed: 64 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,19 @@ import (
55
"crypto/sha256"
66
"encoding/gob"
77
"encoding/hex"
8+
"errors"
89
"fmt"
910
"io"
1011
"log/slog"
1112
"os"
1213
"sync"
14+
"sync/atomic"
1315
"time"
1416

1517
"github.com/robfig/cron/v3"
1618
)
1719

18-
var (
20+
const (
1921
_DEFAULT_ADMIN_SERVER_PORT = 3001
2022
)
2123

@@ -78,18 +80,17 @@ type DBOSContext interface {
7880
}
7981

8082
type dbosContext struct {
81-
ctx context.Context
83+
ctx context.Context
84+
ctxCancelFunc context.CancelCauseFunc
8285

83-
launched bool
86+
launched atomic.Bool
8487

8588
systemDB SystemDatabase
8689
adminServer *adminServer
8790
config *Config
8891

89-
// Queue runner context and cancel function
90-
queueRunnerCtx context.Context
91-
queueRunnerCancelFunc context.CancelFunc
92-
queueRunnerDone chan struct{}
92+
// Queue runner
93+
queueRunner *queueRunner
9394

9495
// Application metadata
9596
applicationVersion string
@@ -171,9 +172,11 @@ func (c *dbosContext) GetApplicationID() string {
171172
}
172173

173174
func NewDBOSContext(inputConfig Config) (DBOSContext, error) {
175+
ctx, cancelFunc := context.WithCancelCause(context.Background())
174176
initExecutor := &dbosContext{
175177
workflowsWg: &sync.WaitGroup{},
176-
ctx: context.Background(),
178+
ctx: ctx,
179+
ctxCancelFunc: cancelFunc,
177180
workflowRegistry: make(map[string]workflowRegistryEntry),
178181
workflowRegMutex: &sync.RWMutex{},
179182
}
@@ -207,24 +210,34 @@ func NewDBOSContext(inputConfig Config) (DBOSContext, error) {
207210

208211
initExecutor.applicationID = os.Getenv("DBOS__APPID")
209212

213+
initExecutor.logger = initExecutor.logger.With(
214+
//"app_version", initExecutor.applicationVersion, // This is really verbose...
215+
"executor_id", initExecutor.executorID,
216+
//"app_id", initExecutor.applicationID, // This should stay internal
217+
)
218+
210219
// Create the system database
211-
systemDB, err := NewSystemDatabase(config.DatabaseURL, config.Logger)
220+
systemDB, err := newSystemDatabase(initExecutor, config.DatabaseURL, initExecutor.logger)
212221
if err != nil {
213222
return nil, newInitializationError(fmt.Sprintf("failed to create system database: %v", err))
214223
}
215224
initExecutor.systemDB = systemDB
216225
initExecutor.logger.Info("System database initialized")
217226

227+
// Initialize the queue runner and register DBOS internal queue
228+
initExecutor.queueRunner = newQueueRunner()
229+
NewWorkflowQueue(initExecutor, _DBOS_INTERNAL_QUEUE_NAME)
230+
218231
return initExecutor, nil
219232
}
220233

221234
func (c *dbosContext) Launch() error {
222-
if c.launched {
235+
if c.launched.Load() {
223236
return newInitializationError("DBOS is already launched")
224237
}
225238

226239
// Start the system database
227-
c.systemDB.Launch(context.Background())
240+
c.systemDB.Launch(c)
228241

229242
// Start the admin server if configured
230243
if c.config.AdminServer {
@@ -238,17 +251,9 @@ func (c *dbosContext) Launch() error {
238251
c.adminServer = adminServer
239252
}
240253

241-
// Create context with cancel function for queue runner
242-
// FIXME: cancellation now has to go through the DBOSContext
243-
ctx, cancel := context.WithCancel(c.ctx)
244-
c.queueRunnerCtx = ctx
245-
c.queueRunnerCancelFunc = cancel
246-
c.queueRunnerDone = make(chan struct{})
247-
248254
// Start the queue runner in a goroutine
249255
go func() {
250-
defer close(c.queueRunnerDone)
251-
queueRunner(c)
256+
c.queueRunner.run(c)
252257
}()
253258
c.logger.Info("Queue runner started")
254259

@@ -268,62 +273,61 @@ func (c *dbosContext) Launch() error {
268273
}
269274

270275
c.logger.Info("DBOS initialized", "app_version", c.applicationVersion, "executor_id", c.executorID)
271-
c.launched = true
276+
c.launched.Store(true)
272277
return nil
273278
}
274279

280+
// We might consider renaming this to "Cancel" to me more idiomatic
275281
func (c *dbosContext) Shutdown() {
282+
c.logger.Info("Shutting down DBOS context")
283+
284+
// Cancel the context to signal all resources to stop
285+
c.ctxCancelFunc(errors.New("DBOS shutdown initiated"))
286+
276287
// Wait for all workflows to finish
277288
c.logger.Info("Waiting for all workflows to finish")
278289
c.workflowsWg.Wait()
290+
c.logger.Info("All workflows completed")
279291

280-
if !c.launched {
281-
c.logger.Warn("DBOS is not launched, nothing to shutdown")
282-
return
283-
}
284-
285-
// Cancel the context to stop the queue runner
286-
if c.queueRunnerCancelFunc != nil {
287-
c.logger.Info("Stopping queue runner")
288-
c.queueRunnerCancelFunc()
289-
// Wait for queue runner to finish
290-
<-c.queueRunnerDone
291-
c.logger.Info("Queue runner stopped")
292-
}
293-
294-
if c.workflowScheduler != nil {
295-
c.logger.Info("Stopping workflow scheduler")
296-
297-
ctx := c.workflowScheduler.Stop()
298-
// Wait for all running jobs to complete with 5-second timeout
299-
timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
300-
defer cancel()
301-
302-
select {
303-
case <-ctx.Done():
304-
c.logger.Info("All scheduled jobs completed")
305-
case <-timeoutCtx.Done():
306-
c.logger.Warn("Timeout waiting for jobs to complete", "timeout", "5s")
307-
}
308-
}
309-
292+
// Close the pool and the notification listener if started
310293
if c.systemDB != nil {
311294
c.logger.Info("Shutting down system database")
312-
c.systemDB.Shutdown()
295+
c.systemDB.Shutdown(c)
313296
c.systemDB = nil
314297
}
315298

316-
if c.adminServer != nil {
317-
c.logger.Info("Shutting down admin server")
299+
if c.launched.Load() {
300+
// Wait for queue runner to finish
301+
<-c.queueRunner.completionChan
302+
c.logger.Info("Queue runner completed")
303+
304+
if c.workflowScheduler != nil {
305+
c.logger.Info("Stopping workflow scheduler")
306+
ctx := c.workflowScheduler.Stop()
307+
// Wait for all running jobs to complete with 5-second timeout
308+
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
309+
defer cancel()
310+
311+
select {
312+
case <-ctx.Done():
313+
c.logger.Info("All scheduled jobs completed")
314+
case <-timeoutCtx.Done():
315+
c.logger.Warn("Timeout waiting for jobs to complete. Moving on", "timeout", "5s")
316+
}
317+
}
318318

319-
err := c.adminServer.Shutdown()
320-
if err != nil {
321-
c.logger.Error("Failed to shutdown admin server", "error", err)
322-
} else {
323-
c.logger.Info("Admin server shutdown complete")
319+
if c.adminServer != nil {
320+
c.logger.Info("Shutting down admin server")
321+
err := c.adminServer.Shutdown(c)
322+
if err != nil {
323+
c.logger.Error("Failed to shutdown admin server", "error", err)
324+
} else {
325+
c.logger.Info("Admin server shutdown complete")
326+
}
327+
c.adminServer = nil
324328
}
325-
c.adminServer = nil
326329
}
330+
c.launched.Store(false)
327331
}
328332

329333
func GetBinaryHash() (string, error) {

0 commit comments

Comments
 (0)