Skip to content

Commit b61d9c5

Browse files
authored
Shutdown/Cancel interface, "Deactivate" (#71)
- Stop workflows scheduler on /deactivate requests - Add timeouts to Shutdown
1 parent 31f29c0 commit b61d9c5

File tree

9 files changed

+182
-66
lines changed

9 files changed

+182
-66
lines changed

dbos/admin_server.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"log/slog"
88
"net/http"
9+
"sync"
910
"sync/atomic"
1011
"time"
1112
)
@@ -27,7 +28,6 @@ const (
2728
_WORKFLOW_FORK_PATTERN = "POST /workflows/{id}/fork"
2829

2930
_ADMIN_SERVER_READ_HEADER_TIMEOUT = 5 * time.Second
30-
_ADMIN_SERVER_SHUTDOWN_TIMEOUT = 10 * time.Second
3131
)
3232

3333
// listWorkflowsRequest represents the request structure for listing workflows
@@ -103,6 +103,7 @@ type adminServer struct {
103103
logger *slog.Logger
104104
port int
105105
isDeactivated atomic.Int32
106+
wg sync.WaitGroup
106107
}
107108

108109
// toListWorkflowResponse converts a WorkflowStatus to a map with all time fields in UTC
@@ -226,7 +227,10 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
226227
mux.HandleFunc(_DEACTIVATE_PATTERN, func(w http.ResponseWriter, r *http.Request) {
227228
if as.isDeactivated.CompareAndSwap(0, 1) {
228229
ctx.logger.Info("Deactivating DBOS executor", "executor_id", ctx.executorID, "app_version", ctx.applicationVersion)
229-
// TODO: Stop queue runner, workflow scheduler, etc
230+
// Stop the workflow scheduler. Note we don't wait for running jobs to complete
231+
if ctx.workflowScheduler != nil {
232+
ctx.workflowScheduler.Stop()
233+
}
230234
}
231235

232236
w.Header().Set("Content-Type", "text/plain")
@@ -532,7 +536,9 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
532536
func (as *adminServer) Start() error {
533537
as.logger.Info("Starting admin server", "port", as.port)
534538

539+
as.wg.Add(1)
535540
go func() {
541+
defer as.wg.Done()
536542
if err := as.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
537543
as.logger.Error("Admin server error", "error", err)
538544
}
@@ -541,18 +547,30 @@ func (as *adminServer) Start() error {
541547
return nil
542548
}
543549

544-
func (as *adminServer) Shutdown(ctx context.Context) error {
550+
func (as *adminServer) Shutdown(timeout time.Duration) error {
545551
as.logger.Info("Shutting down admin server")
546552

547-
// Note: consider moving the grace period to DBOSContext.Shutdown()
548-
ctx, cancel := context.WithTimeout(ctx, _ADMIN_SERVER_SHUTDOWN_TIMEOUT)
553+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
549554
defer cancel()
550555

551556
if err := as.server.Shutdown(ctx); err != nil {
552557
as.logger.Error("Admin server shutdown error", "error", err)
553558
return fmt.Errorf("failed to shutdown admin server: %w", err)
554559
}
555560

556-
as.logger.Info("Admin server shutdown complete")
561+
// Wait for the server goroutine to return
562+
done := make(chan struct{})
563+
go func() {
564+
as.wg.Wait()
565+
close(done)
566+
}()
567+
568+
select {
569+
case <-done:
570+
as.logger.Info("Admin server shutdown complete")
571+
case <-ctx.Done():
572+
as.logger.Warn("Admin server shutdown timed out")
573+
}
574+
557575
return nil
558576
}

dbos/admin_server_test.go

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"io"
88
"net/http"
99
"strings"
10+
"sync/atomic"
1011
"testing"
1112
"time"
1213

@@ -29,7 +30,7 @@ func TestAdminServer(t *testing.T) {
2930
// Ensure cleanup
3031
defer func() {
3132
if ctx != nil {
32-
ctx.Cancel()
33+
ctx.Shutdown(1 * time.Minute)
3334
}
3435
}()
3536

@@ -65,7 +66,7 @@ func TestAdminServer(t *testing.T) {
6566
// Ensure cleanup
6667
defer func() {
6768
if ctx != nil {
68-
ctx.Cancel()
69+
ctx.Shutdown(1 * time.Minute)
6970
}
7071
}()
7172

@@ -252,7 +253,7 @@ func TestAdminServer(t *testing.T) {
252253
// Ensure cleanup
253254
defer func() {
254255
if ctx != nil {
255-
ctx.Cancel()
256+
ctx.Shutdown(1 * time.Minute)
256257
}
257258
}()
258259

@@ -379,7 +380,7 @@ func TestAdminServer(t *testing.T) {
379380
// Ensure cleanup
380381
defer func() {
381382
if ctx != nil {
382-
ctx.Cancel()
383+
ctx.Shutdown(1 * time.Minute)
383384
}
384385
}()
385386

@@ -530,8 +531,74 @@ func TestAdminServer(t *testing.T) {
530531
}
531532
assert.True(t, foundIDs4[workflowID1], "Expected to find first workflow ID in empty body results")
532533
assert.True(t, foundIDs4[workflowID2], "Expected to find second workflow ID in empty body results")
534+
})
535+
536+
t.Run("TestDeactivate", func(t *testing.T) {
537+
t.Run("Deactivate stops workflow scheduler", func(t *testing.T) {
538+
resetTestDatabase(t, databaseURL)
539+
ctx, err := NewDBOSContext(Config{
540+
DatabaseURL: databaseURL,
541+
AppName: "test-app",
542+
AdminServer: true,
543+
})
544+
require.NoError(t, err)
545+
546+
// Track scheduled workflow executions
547+
var executionCount atomic.Int32
548+
549+
// Register a scheduled workflow that runs every second
550+
RegisterWorkflow(ctx, func(dbosCtx DBOSContext, scheduledTime time.Time) (string, error) {
551+
executionCount.Add(1)
552+
return fmt.Sprintf("executed at %v", scheduledTime), nil
553+
}, WithSchedule("* * * * * *")) // Every second
533554

534-
return // Skip the normal test flow
555+
err = ctx.Launch()
556+
require.NoError(t, err)
557+
558+
client := &http.Client{Timeout: 5 * time.Second}
559+
560+
// Ensure cleanup
561+
defer func() {
562+
if ctx != nil {
563+
ctx.Shutdown(1 * time.Minute)
564+
}
565+
if client.Transport != nil {
566+
client.Transport.(*http.Transport).CloseIdleConnections()
567+
}
568+
}()
569+
570+
// Wait for 2-3 executions to verify scheduler is running
571+
require.Eventually(t, func() bool {
572+
return executionCount.Load() >= 2
573+
}, 3*time.Second, 100*time.Millisecond, "Expected at least 2 scheduled workflow executions")
574+
575+
// Call deactivate endpoint
576+
endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_DEACTIVATE_PATTERN, "GET /"))
577+
req, err := http.NewRequest("GET", endpoint, nil)
578+
require.NoError(t, err, "Failed to create deactivate request")
579+
580+
resp, err := client.Do(req)
581+
require.NoError(t, err, "Failed to call deactivate endpoint")
582+
defer resp.Body.Close()
583+
584+
// Verify endpoint returned 200 OK
585+
assert.Equal(t, http.StatusOK, resp.StatusCode, "Expected 200 OK from deactivate endpoint")
586+
587+
// Verify response body
588+
body, err := io.ReadAll(resp.Body)
589+
require.NoError(t, err, "Failed to read response body")
590+
assert.Equal(t, "deactivated", string(body), "Expected 'deactivated' response body")
591+
592+
// Record count after deactivate and wait
593+
countAfterDeactivate := executionCount.Load()
594+
time.Sleep(4 * time.Second) // Wait long enough for multiple executions if scheduler was still running
595+
596+
// Verify no new executions occurred
597+
finalCount := executionCount.Load()
598+
assert.Equal(t, countAfterDeactivate, finalCount,
599+
"Expected no new scheduled workflows after deactivate (had %d before, %d after)",
600+
countAfterDeactivate, finalCount)
601+
})
535602
})
536603

537604
}

dbos/dbos.go

Lines changed: 70 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ type DBOSContext interface {
7171
context.Context
7272

7373
// Context Lifecycle
74-
Launch() error // Launch the DBOS runtime including system database, queues, admin server, and workflow recovery
75-
Cancel() // Gracefully shutdown the DBOS runtime, waiting for workflows to complete and cleaning up resources
74+
Launch() error // Launch the DBOS runtime including system database, queues, admin server, and workflow recovery
75+
Shutdown(timeout time.Duration) // Gracefully shutdown all DBOS runtime components with ordered cleanup sequence
7676

7777
// Workflow operations
7878
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
@@ -239,7 +239,7 @@ func (c *dbosContext) GetApplicationID() string {
239239
}
240240

241241
// NewDBOSContext creates a new DBOS context with the provided configuration.
242-
// The context must be launched with Launch() before use and should be shut down with Cancel().
242+
// The context must be launched with Launch() before use and should be shut down with Shutdown().
243243
// This function initializes the DBOS system database, sets up the queue sub-system,
244244
// and prepares the workflow registry.
245245
//
@@ -253,7 +253,7 @@ func (c *dbosContext) GetApplicationID() string {
253253
// if err != nil {
254254
// log.Fatal(err)
255255
// }
256-
// defer ctx.Cancel()
256+
// defer ctx.Shutdown(30*time.Second)
257257
//
258258
// if err := ctx.Launch(); err != nil {
259259
// log.Fatal(err)
@@ -372,62 +372,86 @@ func (c *dbosContext) Launch() error {
372372
return nil
373373
}
374374

375-
// Cancel gracefully shuts down the DBOS runtime by canceling the context, waiting for
376-
// all workflows to complete, and cleaning up system resources including the database
377-
// connection pool, queue runner, workflow scheduler, and admin server.
378-
// All workflows and steps contexts will be canceled, which one can check using their context's Done() method.
375+
// Shutdown gracefully shuts down the DBOS runtime by performing a complete, ordered cleanup
376+
// of all system components. The shutdown sequence includes:
379377
//
380-
// This method blocks until all workflows finish and all resources are properly cleaned up.
381-
// It should be called when the application is shutting down to ensure data consistency.
382-
func (c *dbosContext) Cancel() {
378+
// 1. Calls Cancel to stop workflows and cancel the context
379+
// 2. Waits for the queue runner to complete processing
380+
// 3. Stops the workflow scheduler and waits for scheduled jobs to finish
381+
// 4. Shuts down the system database connection pool and notification listener
382+
// 5. Shuts down the admin server
383+
// 6. Marks the context as not launched
384+
//
385+
// Each step respects the provided timeout. If any component doesn't shut down within the timeout,
386+
// a warning is logged and the shutdown continues to the next component.
387+
//
388+
// Shutdown is a permanent operation and should be called when the application is terminating.
389+
func (c *dbosContext) Shutdown(timeout time.Duration) {
383390
c.logger.Info("Shutting down DBOS context")
384391

385392
// Cancel the context to signal all resources to stop
386-
c.ctxCancelFunc(errors.New("DBOS shutdown initiated"))
393+
c.ctxCancelFunc(errors.New("DBOS cancellation initiated"))
387394

388395
// Wait for all workflows to finish
389396
c.logger.Info("Waiting for all workflows to finish")
390-
c.workflowsWg.Wait()
391-
c.logger.Info("All workflows completed")
397+
done := make(chan struct{})
398+
go func() {
399+
c.workflowsWg.Wait()
400+
close(done)
401+
}()
402+
select {
403+
case <-done:
404+
c.logger.Info("All workflows completed")
405+
case <-time.After(timeout):
406+
// For now just log a warning: eventually we might want Cancel to return an error.
407+
c.logger.Warn("Timeout waiting for workflows to complete", "timeout", timeout)
408+
}
392409

393-
// Close the pool and the notification listener if started
394-
if c.systemDB != nil {
395-
c.logger.Info("Shutting down system database")
396-
c.systemDB.shutdown(c)
397-
c.systemDB = nil
410+
// Wait for queue runner to finish
411+
if c.queueRunner != nil && c.launched.Load() {
412+
c.logger.Info("Waiting for queue runner to complete")
413+
select {
414+
case <-c.queueRunner.completionChan:
415+
c.logger.Info("Queue runner completed")
416+
c.queueRunner = nil
417+
case <-time.After(timeout):
418+
c.logger.Warn("Timeout waiting for queue runner to complete", "timeout", timeout)
419+
}
398420
}
399421

400-
if c.launched.Load() {
401-
// Wait for queue runner to finish
402-
<-c.queueRunner.completionChan
403-
c.logger.Info("Queue runner completed")
404-
405-
if c.workflowScheduler != nil {
406-
c.logger.Info("Stopping workflow scheduler")
407-
ctx := c.workflowScheduler.Stop()
408-
// Wait for all running jobs to complete with 5-second timeout
409-
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
410-
defer cancel()
411-
412-
select {
413-
case <-ctx.Done():
414-
c.logger.Info("All scheduled jobs completed")
415-
case <-timeoutCtx.Done():
416-
c.logger.Warn("Timeout waiting for jobs to complete. Moving on", "timeout", "5s")
417-
}
422+
// Stop the workflow scheduler and wait until all scheduled workflows are done
423+
if c.workflowScheduler != nil && c.launched.Load() {
424+
c.logger.Info("Stopping workflow scheduler")
425+
ctx := c.workflowScheduler.Stop()
426+
427+
select {
428+
case <-ctx.Done():
429+
c.logger.Info("All scheduled jobs completed")
430+
c.workflowScheduler = nil
431+
case <-time.After(timeout):
432+
c.logger.Warn("Timeout waiting for jobs to complete. Moving on", "timeout", timeout)
418433
}
434+
}
419435

420-
if c.adminServer != nil {
421-
c.logger.Info("Shutting down admin server")
422-
err := c.adminServer.Shutdown(c)
423-
if err != nil {
424-
c.logger.Error("Failed to shutdown admin server", "error", err)
425-
} else {
426-
c.logger.Info("Admin server shutdown complete")
427-
}
428-
c.adminServer = nil
436+
// Shutdown the admin server
437+
if c.adminServer != nil && c.launched.Load() {
438+
c.logger.Info("Shutting down admin server")
439+
err := c.adminServer.Shutdown(timeout)
440+
if err != nil {
441+
c.logger.Error("Failed to shutdown admin server", "error", err)
442+
} else {
443+
c.logger.Info("Admin server shutdown complete")
429444
}
445+
c.adminServer = nil
430446
}
447+
448+
// Close the system database
449+
if c.systemDB != nil {
450+
c.logger.Info("Shutting down system database")
451+
c.systemDB.shutdown(c, timeout)
452+
c.systemDB = nil
453+
}
454+
431455
c.launched.Store(false)
432456
}
433457

dbos/dbos_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package dbos
22

33
import (
44
"testing"
5+
"time"
56

67
"github.com/stretchr/testify/assert"
78
"github.com/stretchr/testify/require"
@@ -21,7 +22,7 @@ func TestConfigValidationErrorTypes(t *testing.T) {
2122
require.NoError(t, err)
2223
defer func() {
2324
if ctx != nil {
24-
ctx.Cancel()
25+
ctx.Shutdown(1*time.Minute)
2526
}
2627
}() // Clean up executor
2728

0 commit comments

Comments
 (0)