Skip to content

Commit 1eba3e2

Browse files
Add queue option
1 parent 9479868 commit 1eba3e2

File tree

3 files changed

+53
-5
lines changed

3 files changed

+53
-5
lines changed

dbos/dbos.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ type Config struct {
4040
ConductorAPIKey string // DBOS conductor API key (optional)
4141
ApplicationVersion string // Application version (optional, overridden by DBOS__APPVERSION env var)
4242
ExecutorID string // Executor ID (optional, overridden by DBOS__VMID env var)
43+
QueueRunner QueueConfig // Queue configuration (optional)
44+
}
45+
46+
// QueueConfig configures the queue runner polling behavior.
47+
type QueueConfig struct {
48+
BaseInterval float64 // seconds
49+
MinInterval float64 // seconds
50+
MaxInterval float64 // seconds
4351
}
4452

4553
func processConfig(inputConfig *Config) (*Config, error) {
@@ -54,6 +62,10 @@ func processConfig(inputConfig *Config) (*Config, error) {
5462
inputConfig.AdminServerPort = _DEFAULT_ADMIN_SERVER_PORT
5563
}
5664

65+
if inputConfig.QueueRunner.MinInterval > inputConfig.QueueRunner.MaxInterval {
66+
return nil, fmt.Errorf("minInterval must be less than maxInterval")
67+
}
68+
5769
dbosConfig := &Config{
5870
DatabaseURL: inputConfig.DatabaseURL,
5971
AppName: inputConfig.AppName,
@@ -66,6 +78,7 @@ func processConfig(inputConfig *Config) (*Config, error) {
6678
ApplicationVersion: inputConfig.ApplicationVersion,
6779
ExecutorID: inputConfig.ExecutorID,
6880
SystemDBPool: inputConfig.SystemDBPool,
81+
QueueRunner: inputConfig.QueueRunner,
6982
}
7083

7184
// Load defaults
@@ -379,7 +392,7 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error
379392
initExecutor.logger.Debug("System database initialized")
380393

381394
// Initialize the queue runner and register DBOS internal queue
382-
initExecutor.queueRunner = newQueueRunner(initExecutor.logger)
395+
initExecutor.queueRunner = newQueueRunner(initExecutor.logger, config.QueueRunner)
383396

384397
// Initialize conductor if API key is provided
385398
if config.ConductorAPIKey != "" {

dbos/queue.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ import (
1414
const (
1515
_DBOS_INTERNAL_QUEUE_NAME = "_dbos_internal_queue"
1616
_DEFAULT_MAX_TASKS_PER_ITERATION = 100
17+
_DEFAULT_BASE_INTERVAL = 1.0
18+
_DEFAULT_MAX_INTERVAL = 120.0
19+
_DEFAULT_MIN_INTERVAL = 1.0
1720
)
1821

1922
// RateLimiter configures rate limiting for workflow queue execution.
@@ -159,11 +162,21 @@ type queueRunner struct {
159162
completionChan chan struct{}
160163
}
161164

162-
func newQueueRunner(logger *slog.Logger) *queueRunner {
165+
func newQueueRunner(logger *slog.Logger, config QueueConfig) *queueRunner {
166+
if config.BaseInterval == 0 {
167+
config.BaseInterval = _DEFAULT_BASE_INTERVAL
168+
}
169+
if config.MinInterval == 0 {
170+
config.MinInterval = _DEFAULT_MIN_INTERVAL
171+
}
172+
if config.MaxInterval == 0 {
173+
config.MaxInterval = _DEFAULT_MAX_INTERVAL
174+
}
175+
163176
return &queueRunner{
164-
baseInterval: 1.0,
165-
minInterval: 1.0,
166-
maxInterval: 120.0,
177+
baseInterval: config.BaseInterval,
178+
minInterval: config.MinInterval,
179+
maxInterval: config.MaxInterval,
167180
backoffFactor: 2.0,
168181
scalebackFactor: 0.9,
169182
jitterMin: 0.95,

dbos/queues_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"log/slog"
89
"os"
910
"reflect"
1011
"runtime"
@@ -1579,3 +1580,24 @@ func TestPartitionedQueues(t *testing.T) {
15791580
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after partitioned queue test")
15801581
})
15811582
}
1583+
1584+
func TestNewQueueRunner(t *testing.T) {
1585+
t.Run("init queue with default interval", func(t *testing.T) {
1586+
runner := newQueueRunner(slog.New(slog.NewTextHandler(os.Stdout, nil)), QueueConfig{})
1587+
require.Equal(t, _DEFAULT_BASE_INTERVAL, runner.baseInterval)
1588+
require.Equal(t, _DEFAULT_MAX_INTERVAL, runner.maxInterval)
1589+
require.Equal(t, _DEFAULT_MIN_INTERVAL, runner.minInterval)
1590+
})
1591+
1592+
t.Run("init queue with custom interval", func(t *testing.T) {
1593+
runner := newQueueRunner(slog.New(slog.NewTextHandler(os.Stdout, nil)), QueueConfig{
1594+
BaseInterval: 1,
1595+
MinInterval: 2,
1596+
MaxInterval: 3,
1597+
})
1598+
require.Equal(t, float64(1), runner.baseInterval)
1599+
require.Equal(t, float64(2), runner.minInterval)
1600+
require.Equal(t, float64(3), runner.maxInterval)
1601+
1602+
})
1603+
}

0 commit comments

Comments
 (0)