Skip to content

Commit 7548d41

Browse files
committed
queue.go docs
1 parent f8cf8c7 commit 7548d41

File tree

1 file changed

+39
-10
lines changed

1 file changed

+39
-10
lines changed

dbos/queue.go

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,55 +18,84 @@ const (
1818
_DEFAULT_MAX_TASKS_PER_ITERATION = 100
1919
)
2020

21-
// RateLimiter represents a rate limiting configuration
21+
// RateLimiter configures rate limiting for workflow queue execution.
22+
// Rate limits prevent overwhelming external services and provide backpressure.
2223
type RateLimiter struct {
23-
Limit int
24-
Period float64
24+
Limit int // Maximum number of workflows to start within the period
25+
Period float64 // Time period in seconds for the rate limit
2526
}
2627

28+
// WorkflowQueue defines a named queue for workflow execution.
29+
// Queues provide controlled workflow execution with concurrency limits, priority scheduling, and rate limiting.
2730
type WorkflowQueue struct {
28-
Name string `json:"name"`
29-
WorkerConcurrency *int `json:"workerConcurrency,omitempty"`
30-
GlobalConcurrency *int `json:"concurrency,omitempty"` // Different key to match other transact APIs
31-
PriorityEnabled bool `json:"priorityEnabled,omitempty"`
32-
RateLimit *RateLimiter `json:"rateLimit,omitempty"` // Named after other Transact APIs
33-
MaxTasksPerIteration int `json:"maxTasksPerIteration"`
31+
Name string `json:"name"` // Unique queue name
32+
WorkerConcurrency *int `json:"workerConcurrency,omitempty"` // Max concurrent workflows per executor
33+
GlobalConcurrency *int `json:"concurrency,omitempty"` // Max concurrent workflows across all executors
34+
PriorityEnabled bool `json:"priorityEnabled,omitempty"` // Enable priority-based scheduling
35+
RateLimit *RateLimiter `json:"rateLimit,omitempty"` // Rate limiting configuration
36+
MaxTasksPerIteration int `json:"maxTasksPerIteration"` // Max workflows to dequeue per iteration
3437
}
3538

3639
// queueOption is a functional option for configuring a workflow queue
3740
type queueOption func(*WorkflowQueue)
3841

42+
// WithWorkerConcurrency limits the number of workflows this executor can run concurrently from the queue.
43+
// This provides per-executor concurrency control.
3944
func WithWorkerConcurrency(concurrency int) queueOption {
4045
return func(q *WorkflowQueue) {
4146
q.WorkerConcurrency = &concurrency
4247
}
4348
}
4449

50+
// WithGlobalConcurrency limits the total number of workflows that can run concurrently from the queue
51+
// across all executors. This provides global concurrency control.
4552
func WithGlobalConcurrency(concurrency int) queueOption {
4653
return func(q *WorkflowQueue) {
4754
q.GlobalConcurrency = &concurrency
4855
}
4956
}
5057

58+
// WithPriorityEnabled enables priority-based scheduling for the queue.
59+
// When enabled, workflows with lower priority numbers are executed first.
5160
func WithPriorityEnabled(enabled bool) queueOption {
5261
return func(q *WorkflowQueue) {
5362
q.PriorityEnabled = enabled
5463
}
5564
}
5665

66+
// WithRateLimiter configures rate limiting for the queue to prevent overwhelming external services.
67+
// The rate limiter enforces a maximum number of workflow starts within a time period.
5768
func WithRateLimiter(limiter *RateLimiter) queueOption {
5869
return func(q *WorkflowQueue) {
5970
q.RateLimit = limiter
6071
}
6172
}
6273

74+
// WithMaxTasksPerIteration sets the maximum number of workflows to dequeue in a single iteration.
75+
// This controls batch sizes for queue processing.
6376
func WithMaxTasksPerIteration(maxTasks int) queueOption {
6477
return func(q *WorkflowQueue) {
6578
q.MaxTasksPerIteration = maxTasks
6679
}
6780
}
6881

69-
// NewWorkflowQueue creates a new workflow queue with optional configuration
82+
// NewWorkflowQueue creates a new workflow queue with the specified name and configuration options.
83+
// The queue must be created before workflows can be enqueued to it using the WithQueue option in RunAsWorkflow.
84+
// Queues provide controlled execution with support for concurrency limits, priority scheduling, and rate limiting.
85+
//
86+
// Example:
87+
//
88+
// queue := dbos.NewWorkflowQueue(ctx, "email-queue",
89+
// dbos.WithWorkerConcurrency(5),
90+
// dbos.WithRateLimiter(&dbos.RateLimiter{
91+
// Limit: 100,
92+
// Period: 60.0, // 100 workflows per minute
93+
// }),
94+
// dbos.WithPriorityEnabled(true),
95+
// )
96+
//
97+
// // Enqueue workflows to this queue:
98+
// handle, err := dbos.RunAsWorkflow(ctx, SendEmailWorkflow, emailData, dbos.WithQueue("email-queue"))
7099
func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...queueOption) WorkflowQueue {
71100
ctx, ok := dbosCtx.(*dbosContext)
72101
if !ok {

0 commit comments

Comments
 (0)