Skip to content

Commit b2d5b8b

Browse files
committed
Shutdown/Cancel interface
1 parent 31f29c0 commit b2d5b8b

File tree

9 files changed

+301
-70
lines changed

9 files changed

+301
-70
lines changed

dbos/admin_server.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"log/slog"
88
"net/http"
9+
"sync"
910
"sync/atomic"
1011
"time"
1112
)
@@ -27,7 +28,6 @@ const (
2728
_WORKFLOW_FORK_PATTERN = "POST /workflows/{id}/fork"
2829

2930
_ADMIN_SERVER_READ_HEADER_TIMEOUT = 5 * time.Second
30-
_ADMIN_SERVER_SHUTDOWN_TIMEOUT = 10 * time.Second
3131
)
3232

3333
// listWorkflowsRequest represents the request structure for listing workflows
@@ -103,6 +103,7 @@ type adminServer struct {
103103
logger *slog.Logger
104104
port int
105105
isDeactivated atomic.Int32
106+
wg sync.WaitGroup
106107
}
107108

108109
// toListWorkflowResponse converts a WorkflowStatus to a map with all time fields in UTC
@@ -226,7 +227,7 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
226227
mux.HandleFunc(_DEACTIVATE_PATTERN, func(w http.ResponseWriter, r *http.Request) {
227228
if as.isDeactivated.CompareAndSwap(0, 1) {
228229
ctx.logger.Info("Deactivating DBOS executor", "executor_id", ctx.executorID, "app_version", ctx.applicationVersion)
229-
// TODO: Stop queue runner, workflow scheduler, etc
230+
ctx.Cancel(1 * time.Minute) // Cancel context, queue runner, and workflow scheduler
230231
}
231232

232233
w.Header().Set("Content-Type", "text/plain")
@@ -532,7 +533,9 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
532533
func (as *adminServer) Start() error {
533534
as.logger.Info("Starting admin server", "port", as.port)
534535

536+
as.wg.Add(1)
535537
go func() {
538+
defer as.wg.Done()
536539
if err := as.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
537540
as.logger.Error("Admin server error", "error", err)
538541
}
@@ -541,18 +544,30 @@ func (as *adminServer) Start() error {
541544
return nil
542545
}
543546

544-
func (as *adminServer) Shutdown(ctx context.Context) error {
547+
func (as *adminServer) Shutdown(timeout time.Duration) error {
545548
as.logger.Info("Shutting down admin server")
546549

547-
// Note: consider moving the grace period to DBOSContext.Shutdown()
548-
ctx, cancel := context.WithTimeout(ctx, _ADMIN_SERVER_SHUTDOWN_TIMEOUT)
550+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
549551
defer cancel()
550552

551553
if err := as.server.Shutdown(ctx); err != nil {
552554
as.logger.Error("Admin server shutdown error", "error", err)
553555
return fmt.Errorf("failed to shutdown admin server: %w", err)
554556
}
555557

556-
as.logger.Info("Admin server shutdown complete")
558+
// Wait for the server goroutine to return
559+
done := make(chan struct{})
560+
go func() {
561+
as.wg.Wait()
562+
close(done)
563+
}()
564+
565+
select {
566+
case <-done:
567+
as.logger.Info("Admin server shutdown complete")
568+
case <-ctx.Done():
569+
as.logger.Warn("Admin server shutdown timed out")
570+
}
571+
557572
return nil
558573
}

dbos/admin_server_test.go

Lines changed: 189 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"io"
88
"net/http"
99
"strings"
10+
"sync/atomic"
1011
"testing"
1112
"time"
1213

@@ -29,15 +30,20 @@ func TestAdminServer(t *testing.T) {
2930
// Ensure cleanup
3031
defer func() {
3132
if ctx != nil {
32-
ctx.Cancel()
33+
ctx.Shutdown(1 * time.Minute)
3334
}
3435
}()
3536

3637
// Give time for any startup processes
3738
time.Sleep(100 * time.Millisecond)
3839

3940
// Verify admin server is not running
40-
client := &http.Client{Timeout: 1 * time.Second}
41+
client := &http.Client{
42+
Timeout: 1 * time.Second,
43+
Transport: &http.Transport{
44+
DisableKeepAlives: true,
45+
},
46+
}
4147
_, err = client.Get(fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_HEALTHCHECK_PATTERN, "GET /")))
4248
require.Error(t, err, "Expected request to fail when admin server is not started")
4349

@@ -65,7 +71,7 @@ func TestAdminServer(t *testing.T) {
6571
// Ensure cleanup
6672
defer func() {
6773
if ctx != nil {
68-
ctx.Cancel()
74+
ctx.Shutdown(1 * time.Minute)
6975
}
7076
}()
7177

@@ -78,7 +84,12 @@ func TestAdminServer(t *testing.T) {
7884
exec := ctx.(*dbosContext)
7985
require.NotNil(t, exec.adminServer, "Expected admin server to be created in DBOS instance")
8086

81-
client := &http.Client{Timeout: 5 * time.Second}
87+
client := &http.Client{
88+
Timeout: 5 * time.Second,
89+
Transport: &http.Transport{
90+
DisableKeepAlives: true,
91+
},
92+
}
8293

8394
type adminServerTestCase struct {
8495
name string
@@ -252,14 +263,19 @@ func TestAdminServer(t *testing.T) {
252263
// Ensure cleanup
253264
defer func() {
254265
if ctx != nil {
255-
ctx.Cancel()
266+
ctx.Shutdown(1 * time.Minute)
256267
}
257268
}()
258269

259270
// Give the server a moment to start
260271
time.Sleep(100 * time.Millisecond)
261272

262-
client := &http.Client{Timeout: 5 * time.Second}
273+
client := &http.Client{
274+
Timeout: 5 * time.Second,
275+
Transport: &http.Transport{
276+
DisableKeepAlives: true,
277+
},
278+
}
263279
endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_WORKFLOWS_PATTERN, "POST /"))
264280

265281
// Create workflows with different input/output types
@@ -379,11 +395,16 @@ func TestAdminServer(t *testing.T) {
379395
// Ensure cleanup
380396
defer func() {
381397
if ctx != nil {
382-
ctx.Cancel()
398+
ctx.Shutdown(1 * time.Minute)
383399
}
384400
}()
385401

386-
client := &http.Client{Timeout: 5 * time.Second}
402+
client := &http.Client{
403+
Timeout: 5 * time.Second,
404+
Transport: &http.Transport{
405+
DisableKeepAlives: true,
406+
},
407+
}
387408
endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_WORKFLOWS_PATTERN, "POST /"))
388409

389410
// Create first workflow
@@ -530,8 +551,167 @@ func TestAdminServer(t *testing.T) {
530551
}
531552
assert.True(t, foundIDs4[workflowID1], "Expected to find first workflow ID in empty body results")
532553
assert.True(t, foundIDs4[workflowID2], "Expected to find second workflow ID in empty body results")
554+
})
555+
556+
t.Run("TestDeactivate", func(t *testing.T) {
557+
t.Run("Deactivate stops workflow scheduler", func(t *testing.T) {
558+
resetTestDatabase(t, databaseURL)
559+
ctx, err := NewDBOSContext(Config{
560+
DatabaseURL: databaseURL,
561+
AppName: "test-app",
562+
AdminServer: true,
563+
})
564+
require.NoError(t, err)
565+
566+
// Track scheduled workflow executions
567+
var executionCount atomic.Int32
568+
569+
// Register a scheduled workflow that runs every second
570+
RegisterWorkflow(ctx, func(dbosCtx DBOSContext, scheduledTime time.Time) (string, error) {
571+
executionCount.Add(1)
572+
return fmt.Sprintf("executed at %v", scheduledTime), nil
573+
}, WithSchedule("* * * * * *")) // Every second
574+
575+
err = ctx.Launch()
576+
require.NoError(t, err)
577+
578+
client := &http.Client{
579+
Timeout: 5 * time.Second,
580+
Transport: &http.Transport{
581+
DisableKeepAlives: true,
582+
},
583+
}
584+
585+
// Ensure cleanup
586+
defer func() {
587+
if ctx != nil {
588+
ctx.Shutdown(1 * time.Minute)
589+
}
590+
if client.Transport != nil {
591+
client.Transport.(*http.Transport).CloseIdleConnections()
592+
}
593+
}()
594+
595+
// Wait for 2-3 executions to verify scheduler is running
596+
require.Eventually(t, func() bool {
597+
return executionCount.Load() >= 2
598+
}, 3*time.Second, 100*time.Millisecond, "Expected at least 2 scheduled workflow executions")
599+
600+
// Call deactivate endpoint
601+
endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_DEACTIVATE_PATTERN, "GET /"))
602+
req, err := http.NewRequest("GET", endpoint, nil)
603+
require.NoError(t, err, "Failed to create deactivate request")
604+
605+
resp, err := client.Do(req)
606+
require.NoError(t, err, "Failed to call deactivate endpoint")
607+
defer resp.Body.Close()
608+
609+
// Verify endpoint returned 200 OK
610+
assert.Equal(t, http.StatusOK, resp.StatusCode, "Expected 200 OK from deactivate endpoint")
611+
612+
// Verify response body
613+
body, err := io.ReadAll(resp.Body)
614+
require.NoError(t, err, "Failed to read response body")
615+
assert.Equal(t, "deactivated", string(body), "Expected 'deactivated' response body")
616+
617+
// Record count after deactivate and wait
618+
countAfterDeactivate := executionCount.Load()
619+
time.Sleep(4 * time.Second) // Wait long enough for multiple executions if scheduler was still running
620+
621+
// Verify no new executions occurred
622+
finalCount := executionCount.Load()
623+
assert.Equal(t, countAfterDeactivate, finalCount,
624+
"Expected no new scheduled workflows after deactivate (had %d before, %d after)",
625+
countAfterDeactivate, finalCount)
626+
})
627+
628+
t.Run("Deactivate stops queue runner", func(t *testing.T) {
629+
resetTestDatabase(t, databaseURL)
630+
ctx, err := NewDBOSContext(Config{
631+
DatabaseURL: databaseURL,
632+
AppName: "test-app",
633+
AdminServer: true,
634+
})
635+
require.NoError(t, err)
533636

534-
return // Skip the normal test flow
637+
// Create a test queue
638+
testQueue := NewWorkflowQueue(ctx, "test-deactivate-queue")
639+
640+
// Track workflow executions with atomic counter
641+
var executionCount atomic.Int32
642+
643+
// Register a simple workflow
644+
testWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
645+
executionCount.Add(1)
646+
return "completed-" + input, nil
647+
}
648+
RegisterWorkflow(ctx, testWorkflow)
649+
650+
err = ctx.Launch()
651+
require.NoError(t, err)
652+
653+
client := &http.Client{
654+
Timeout: 5 * time.Second,
655+
Transport: &http.Transport{
656+
DisableKeepAlives: true,
657+
},
658+
}
659+
660+
// Ensure cleanup
661+
defer func() {
662+
if ctx != nil {
663+
ctx.Shutdown(1 * time.Minute)
664+
}
665+
if client.Transport != nil {
666+
client.Transport.(*http.Transport).CloseIdleConnections()
667+
}
668+
}()
669+
670+
// Enqueue and complete one workflow to verify queue runner is working
671+
handle1, err := RunAsWorkflow(ctx, testWorkflow, "initial-test", WithQueue(testQueue.Name))
672+
require.NoError(t, err, "Failed to enqueue initial workflow")
673+
674+
result1, err := handle1.GetResult()
675+
require.NoError(t, err, "Failed to get initial workflow result")
676+
assert.Equal(t, "completed-initial-test", result1)
677+
assert.Equal(t, int32(1), executionCount.Load(), "Expected one execution before deactivate")
678+
679+
// Call deactivate endpoint to stop the queue runner
680+
endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_DEACTIVATE_PATTERN, "GET /"))
681+
req, err := http.NewRequest("GET", endpoint, nil)
682+
require.NoError(t, err, "Failed to create deactivate request")
683+
684+
resp, err := client.Do(req)
685+
require.NoError(t, err, "Failed to call deactivate endpoint")
686+
defer resp.Body.Close()
687+
688+
// Verify endpoint returned 200 OK
689+
assert.Equal(t, http.StatusOK, resp.StatusCode, "Expected 200 OK from deactivate endpoint")
690+
691+
// Verify response body
692+
body, err := io.ReadAll(resp.Body)
693+
require.NoError(t, err, "Failed to read response body")
694+
assert.Equal(t, "deactivated", string(body), "Expected 'deactivated' response body")
695+
696+
// After deactivate is called, the context is cancelled
697+
// We can enqueue more workflows but they won't be dequeued
698+
handle2, err := RunAsWorkflow(ctx, testWorkflow, "post-deactivate-test", WithQueue(testQueue.Name))
699+
require.NoError(t, err, "Failed to enqueue post-deactivate workflow")
700+
701+
countAfterDeactivate := executionCount.Load()
702+
703+
// Wait to see if any phantom executions happen
704+
time.Sleep(2 * time.Second)
705+
706+
// Verify no additional workflows executed (should still be 1)
707+
finalCount := executionCount.Load()
708+
assert.Equal(t, countAfterDeactivate, finalCount,
709+
"Expected no additional workflow executions after deactivate")
710+
711+
handle2Status, err := handle2.GetStatus()
712+
require.NoError(t, err, "Failed to get status of post-deactivate workflow")
713+
assert.Equal(t, WorkflowStatusEnqueued, handle2Status.Status, "Expected post-deactivate workflow to be pending")
714+
})
535715
})
536716

537717
}

0 commit comments

Comments
 (0)