Skip to content

Commit 887227a

Browse files
authored
Getevent recv durable sleep (#111)
- Support durable sleep for GetEvent and Recv (this essentially records a sleep step when sleep is needed) - Be less verbose in the DBOS logs
1 parent 165b569 commit 887227a

File tree

6 files changed

+301
-75
lines changed

6 files changed

+301
-75
lines changed

CONTRIBUTING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Contributing to DBOS Transact Python
1+
# Contributing to DBOS Transact Golang
22

33
Thank you for considering contributing to DBOS Transact. We welcome contributions from everyone, including bug fixes, feature enhancements, documentation improvements, or any other form of contribution.
44

dbos/dbos.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ func NewDBOSContext(inputConfig Config) (DBOSContext, error) {
330330
return nil, newInitializationError(fmt.Sprintf("failed to create system database: %v", err))
331331
}
332332
initExecutor.systemDB = systemDB
333-
initExecutor.logger.Info("System database initialized")
333+
initExecutor.logger.Debug("System database initialized")
334334

335335
// Initialize the queue runner and register DBOS internal queue
336336
initExecutor.queueRunner = newQueueRunner(initExecutor.logger)
@@ -355,7 +355,7 @@ func NewDBOSContext(inputConfig Config) (DBOSContext, error) {
355355
return nil, newInitializationError(fmt.Sprintf("failed to initialize conductor: %v", err))
356356
}
357357
initExecutor.conductor = conductor
358-
initExecutor.logger.Info("Conductor initialized")
358+
initExecutor.logger.Debug("Conductor initialized")
359359
}
360360

361361
return initExecutor, nil
@@ -383,7 +383,7 @@ func (c *dbosContext) Launch() error {
383383
c.logger.Error("Failed to start admin server", "error", err)
384384
return newInitializationError(fmt.Sprintf("failed to start admin server: %v", err))
385385
}
386-
c.logger.Info("Admin server started", "port", c.config.AdminServerPort)
386+
c.logger.Debug("Admin server started", "port", c.config.AdminServerPort)
387387
c.adminServer = adminServer
388388
}
389389

@@ -392,18 +392,18 @@ func (c *dbosContext) Launch() error {
392392
go func() {
393393
c.queueRunner.run(c)
394394
}()
395-
c.logger.Info("Queue runner started")
395+
c.logger.Debug("Queue runner started")
396396

397397
// Start the workflow scheduler if it has been initialized
398398
if c.workflowScheduler != nil {
399399
c.workflowScheduler.Start()
400-
c.logger.Info("Workflow scheduler started")
400+
c.logger.Debug("Workflow scheduler started")
401401
}
402402

403403
// Start the conductor if it has been initialized
404404
if c.conductor != nil {
405405
c.conductor.Launch()
406-
c.logger.Info("Conductor started")
406+
c.logger.Debug("Conductor started")
407407
}
408408

409409
// Run a round of recovery on the local executor
@@ -414,7 +414,7 @@ func (c *dbosContext) Launch() error {
414414
if len(recoveryHandles) > 0 {
415415
c.logger.Info("Recovered pending workflows", "count", len(recoveryHandles))
416416
} else {
417-
c.logger.Info("No pending workflows to recover")
417+
c.logger.Debug("No pending workflows to recover")
418418
}
419419

420420
c.logger.Info("DBOS initialized", "app_version", c.applicationVersion, "executor_id", c.executorID)
@@ -438,45 +438,45 @@ func (c *dbosContext) Launch() error {
438438
//
439439
// Shutdown is a permanent operation and should be called when the application is terminating.
440440
func (c *dbosContext) Shutdown(timeout time.Duration) {
441-
c.logger.Info("Shutting down DBOS context")
441+
c.logger.Debug("Shutting down DBOS context")
442442

443443
// Cancel the context to signal all resources to stop
444444
c.ctxCancelFunc(errors.New("DBOS cancellation initiated"))
445445

446446
// Wait for all workflows to finish
447-
c.logger.Info("Waiting for all workflows to finish")
447+
c.logger.Debug("Waiting for all workflows to finish")
448448
done := make(chan struct{})
449449
go func() {
450450
c.workflowsWg.Wait()
451451
close(done)
452452
}()
453453
select {
454454
case <-done:
455-
c.logger.Info("All workflows completed")
455+
c.logger.Debug("All workflows completed")
456456
case <-time.After(timeout):
457457
// For now just log a warning: eventually we might want Cancel to return an error.
458458
c.logger.Warn("Timeout waiting for workflows to complete", "timeout", timeout)
459459
}
460460

461461
// Wait for queue runner to finish
462462
if c.queueRunner != nil && c.launched.Load() {
463-
c.logger.Info("Waiting for queue runner to complete")
463+
c.logger.Debug("Waiting for queue runner to complete")
464464
select {
465465
case <-c.queueRunner.completionChan:
466-
c.logger.Info("Queue runner completed")
466+
c.logger.Debug("Queue runner completed")
467467
case <-time.After(timeout):
468468
c.logger.Warn("Timeout waiting for queue runner to complete", "timeout", timeout)
469469
}
470470
}
471471

472472
// Stop the workflow scheduler and wait until all scheduled workflows are done
473473
if c.workflowScheduler != nil && c.launched.Load() {
474-
c.logger.Info("Stopping workflow scheduler")
474+
c.logger.Debug("Stopping workflow scheduler")
475475
ctx := c.workflowScheduler.Stop()
476476

477477
select {
478478
case <-ctx.Done():
479-
c.logger.Info("All scheduled jobs completed")
479+
c.logger.Debug("All scheduled jobs completed")
480480
c.workflowScheduler = nil
481481
case <-time.After(timeout):
482482
c.logger.Warn("Timeout waiting for jobs to complete. Moving on", "timeout", timeout)
@@ -485,24 +485,24 @@ func (c *dbosContext) Shutdown(timeout time.Duration) {
485485

486486
// Shutdown the conductor
487487
if c.conductor != nil {
488-
c.logger.Info("Shutting down conductor")
488+
c.logger.Debug("Shutting down conductor")
489489
c.conductor.Shutdown(timeout)
490490
}
491491

492492
// Shutdown the admin server
493493
if c.adminServer != nil && c.launched.Load() {
494-
c.logger.Info("Shutting down admin server")
494+
c.logger.Debug("Shutting down admin server")
495495
err := c.adminServer.Shutdown(timeout)
496496
if err != nil {
497497
c.logger.Error("Failed to shutdown admin server", "error", err)
498498
} else {
499-
c.logger.Info("Admin server shutdown complete")
499+
c.logger.Debug("Admin server shutdown complete")
500500
}
501501
}
502502

503503
// Close the system database
504504
if c.systemDB != nil {
505-
c.logger.Info("Shutting down system database")
505+
c.logger.Debug("Shutting down system database")
506506
c.systemDB.shutdown(c, timeout)
507507
}
508508

dbos/queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ func (qr *queueRunner) run(ctx *dbosContext) {
264264
// Sleep with jittered interval, but allow early exit on context cancellation
265265
select {
266266
case <-ctx.Done():
267-
qr.logger.Info("Queue runner stopping due to context cancellation", "cause", context.Cause(ctx))
267+
qr.logger.Debug("Queue runner stopping due to context cancellation", "cause", context.Cause(ctx))
268268
qr.completionChan <- struct{}{}
269269
return
270270
case <-time.After(sleepDuration):

dbos/system_database.go

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ type systemDatabase interface {
5454
getEvent(ctx context.Context, input getEventInput) (any, error)
5555

5656
// Timers (special steps)
57-
sleep(ctx context.Context, duration time.Duration) (time.Duration, error)
57+
sleep(ctx context.Context, input sleepInput) (time.Duration, error)
5858

5959
// Queues
6060
dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInput) ([]dequeuedWorkflow, error)
@@ -111,7 +111,7 @@ func createDatabaseIfNotExists(ctx context.Context, databaseURL string, logger *
111111
if err != nil {
112112
return newInitializationError(fmt.Sprintf("failed to create database %s: %v", dbName, err))
113113
}
114-
logger.Info("Database created", "name", dbName)
114+
logger.Debug("Database created", "name", dbName)
115115
}
116116

117117
return nil
@@ -304,7 +304,7 @@ func (s *sysDB) launch(ctx context.Context) {
304304
}
305305

306306
func (s *sysDB) shutdown(ctx context.Context, timeout time.Duration) {
307-
s.logger.Info("DBOS: Closing system database connection pool")
307+
s.logger.Debug("DBOS: Closing system database connection pool")
308308
if s.pool != nil {
309309
s.pool.Close()
310310
}
@@ -319,7 +319,7 @@ func (s *sysDB) shutdown(ctx context.Context, timeout time.Duration) {
319319

320320
if s.launched {
321321
// Wait for the notification loop to exit
322-
s.logger.Info("DBOS: Waiting for notification listener loop to finish")
322+
s.logger.Debug("DBOS: Waiting for notification listener loop to finish")
323323
select {
324324
case <-s.notificationLoopDone:
325325
case <-time.After(timeout):
@@ -1438,10 +1438,17 @@ func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]Step
14381438
return steps, nil
14391439
}
14401440

1441+
type sleepInput struct {
1442+
duration time.Duration // Duration to sleep
1443+
skipSleep bool // If true, the function will not actually sleep (useful for testing)
1444+
}
1445+
14411446
// Sleep is a special type of step that sleeps for a specified duration
14421447
// A wakeup time is computed and recorded in the database
14431448
// If we sleep is re-executed, it will only sleep for the remaining duration until the wakeup time
1444-
func (s *sysDB) sleep(ctx context.Context, duration time.Duration) (time.Duration, error) {
1449+
// sleep can be called within other special steps (e.g., getEvent, recv) to provide durable sleep
1450+
1451+
func (s *sysDB) sleep(ctx context.Context, input sleepInput) (time.Duration, error) {
14451452
functionName := "DBOS.sleep"
14461453

14471454
// Get workflow state from context
@@ -1486,9 +1493,9 @@ func (s *sysDB) sleep(ctx context.Context, duration time.Duration) (time.Duratio
14861493
}
14871494
} else {
14881495
// First execution: calculate and record the end time
1489-
s.logger.Debug("Durable sleep", "stepID", stepID, "duration", duration)
1496+
s.logger.Debug("Durable sleep", "stepID", stepID, "duration", input.duration)
14901497

1491-
endTime = time.Now().Add(duration)
1498+
endTime = time.Now().Add(input.duration)
14921499

14931500
// Record the operation result with the calculated end time
14941501
recordInput := recordOperationResultDBInput{
@@ -1512,8 +1519,10 @@ func (s *sysDB) sleep(ctx context.Context, duration time.Duration) (time.Duratio
15121519
// Calculate remaining duration until wake up time
15131520
remainingDuration := max(0, time.Until(endTime))
15141521

1515-
// Actually sleep for the remaining duration
1516-
time.Sleep(remainingDuration)
1522+
if !input.skipSleep {
1523+
// Actually sleep for the remaining duration
1524+
time.Sleep(remainingDuration)
1525+
}
15171526

15181527
return remainingDuration, nil
15191528
}
@@ -1527,7 +1536,7 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) {
15271536
s.notificationLoopDone <- struct{}{}
15281537
}()
15291538

1530-
s.logger.Info("DBOS: Starting notification listener loop")
1539+
s.logger.Debug("DBOS: Starting notification listener loop")
15311540
mrr := s.notificationListenerConnection.Exec(ctx, fmt.Sprintf("LISTEN %s; LISTEN %s", _DBOS_NOTIFICATIONS_CHANNEL, _DBOS_WORKFLOW_EVENTS_CHANNEL))
15321541
results, err := mrr.ReadAll()
15331542
if err != nil {
@@ -1555,7 +1564,7 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) {
15551564
if err != nil {
15561565
// Context cancellation
15571566
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
1558-
s.logger.Info("Notification listener loop exiting due to context cancellation", "cause", context.Cause(ctx), "error", err)
1567+
s.logger.Debug("Notification listener loop exiting due to context cancellation", "cause", context.Cause(ctx), "error", err)
15591568
return
15601569
}
15611570

@@ -1750,10 +1759,18 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) {
17501759
close(done)
17511760
}()
17521761

1762+
timeout, err := s.sleep(ctx, sleepInput{
1763+
duration: input.Timeout,
1764+
skipSleep: true,
1765+
})
1766+
if err != nil {
1767+
return nil, fmt.Errorf("failed to sleep before recv timeout: %w", err)
1768+
}
1769+
17531770
select {
17541771
case <-done:
17551772
s.logger.Debug("Received notification on condition variable", "payload", payload)
1756-
case <-time.After(input.Timeout):
1773+
case <-time.After(timeout):
17571774
s.logger.Warn("Recv() timeout reached", "payload", payload, "timeout", input.Timeout)
17581775
}
17591776
}
@@ -1969,11 +1986,21 @@ func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error)
19691986
close(done)
19701987
}()
19711988

1989+
timeout := input.Timeout
1990+
if isInWorkflow {
1991+
timeout, err = s.sleep(ctx, sleepInput{
1992+
duration: input.Timeout,
1993+
skipSleep: true,
1994+
})
1995+
if err != nil {
1996+
return nil, fmt.Errorf("failed to sleep before getEvent timeout: %w", err)
1997+
}
1998+
}
1999+
19722000
select {
19732001
case <-done:
19742002
// Received notification
1975-
case <-time.After(input.Timeout):
1976-
// Timeout reached
2003+
case <-time.After(timeout):
19772004
s.logger.Warn("GetEvent() timeout reached", "target_workflow_id", input.TargetWorkflowID, "key", input.Key, "timeout", input.Timeout)
19782005
case <-ctx.Done():
19792006
return nil, fmt.Errorf("context cancelled while waiting for event: %w", ctx.Err())

dbos/workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1241,7 +1241,7 @@ func GetEvent[R any](ctx DBOSContext, targetWorkflowID, key string, timeout time
12411241
}
12421242

12431243
func (c *dbosContext) Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) {
1244-
return c.systemDB.sleep(c, duration)
1244+
return c.systemDB.sleep(c, sleepInput{duration: duration, skipSleep: false})
12451245
}
12461246

12471247
// Sleep pauses workflow execution for the specified duration.

0 commit comments

Comments
 (0)