@@ -25,9 +25,9 @@ type RateLimiter struct {
2525 Period float64 // Time period in seconds for the rate limit
2626}
2727
28- // QueueOptions defines a named queue for workflow execution.
28+ // WorkflowQueue defines a named queue for workflow execution.
2929// Queues provide controlled workflow execution with concurrency limits, priority scheduling, and rate limiting.
30- type QueueOptions struct {
30+ type WorkflowQueue struct {
3131 Name string `json:"name"` // Unique queue name
3232 WorkerConcurrency * int `json:"workerConcurrency,omitempty"` // Max concurrent workflows per executor
3333 GlobalConcurrency * int `json:"concurrency,omitempty"` // Max concurrent workflows across all executors
@@ -37,44 +37,44 @@ type QueueOptions struct {
3737}
3838
3939// QueueOption is a functional option for configuring a workflow queue
40- type QueueOption func (* QueueOptions )
40+ type QueueOption func (* WorkflowQueue )
4141
4242// WithWorkerConcurrency limits the number of workflows this executor can run concurrently from the queue.
4343// This provides per-executor concurrency control.
4444func WithWorkerConcurrency (concurrency int ) QueueOption {
45- return func (q * QueueOptions ) {
45+ return func (q * WorkflowQueue ) {
4646 q .WorkerConcurrency = & concurrency
4747 }
4848}
4949
5050// WithGlobalConcurrency limits the total number of workflows that can run concurrently from the queue
5151// across all executors. This provides global concurrency control.
5252func WithGlobalConcurrency (concurrency int ) QueueOption {
53- return func (q * QueueOptions ) {
53+ return func (q * WorkflowQueue ) {
5454 q .GlobalConcurrency = & concurrency
5555 }
5656}
5757
5858// WithPriorityEnabled enables priority-based scheduling for the queue.
5959// When enabled, workflows with lower priority numbers are executed first.
6060func WithPriorityEnabled (enabled bool ) QueueOption {
61- return func (q * QueueOptions ) {
61+ return func (q * WorkflowQueue ) {
6262 q .PriorityEnabled = enabled
6363 }
6464}
6565
6666// WithRateLimiter configures rate limiting for the queue to prevent overwhelming external services.
6767// The rate limiter enforces a maximum number of workflow starts within a time period.
6868func WithRateLimiter (limiter * RateLimiter ) QueueOption {
69- return func (q * QueueOptions ) {
69+ return func (q * WorkflowQueue ) {
7070 q .RateLimit = limiter
7171 }
7272}
7373
7474// WithMaxTasksPerIteration sets the maximum number of workflows to dequeue in a single iteration.
7575// This controls batch sizes for queue processing.
7676func WithMaxTasksPerIteration (maxTasks int ) QueueOption {
77- return func (q * QueueOptions ) {
77+ return func (q * WorkflowQueue ) {
7878 q .MaxTasksPerIteration = maxTasks
7979 }
8080}
@@ -96,10 +96,10 @@ func WithMaxTasksPerIteration(maxTasks int) QueueOption {
9696//
9797// // Enqueue workflows to this queue:
9898// handle, err := dbos.RunAsWorkflow(ctx, SendEmailWorkflow, emailData, dbos.WithQueue("email-queue"))
99- func NewWorkflowQueue (dbosCtx DBOSContext , name string , options ... QueueOption ) QueueOptions {
99+ func NewWorkflowQueue (dbosCtx DBOSContext , name string , options ... QueueOption ) WorkflowQueue {
100100 ctx , ok := dbosCtx .(* dbosContext )
101101 if ! ok {
102- return QueueOptions {} // Do nothing if the concrete type is not dbosContext
102+ return WorkflowQueue {} // Do nothing if the concrete type is not dbosContext
103103 }
104104 if ctx .launched .Load () {
105105 panic ("Cannot register workflow queue after DBOS has launched" )
@@ -111,7 +111,7 @@ func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...QueueOption)
111111 }
112112
113113 // Create queue with default settings
114- q := QueueOptions {
114+ q := WorkflowQueue {
115115 Name : name ,
116116 WorkerConcurrency : nil ,
117117 GlobalConcurrency : nil ,
@@ -142,7 +142,7 @@ type queueRunner struct {
142142 jitterMax float64
143143
144144 // Queue registry
145- workflowQueueRegistry map [string ]QueueOptions
145+ workflowQueueRegistry map [string ]WorkflowQueue
146146
147147 // Channel to signal completion back to the DBOS context
148148 completionChan chan struct {}
@@ -157,13 +157,13 @@ func newQueueRunner() *queueRunner {
157157 scalebackFactor : 0.9 ,
158158 jitterMin : 0.95 ,
159159 jitterMax : 1.05 ,
160- workflowQueueRegistry : make (map [string ]QueueOptions ),
160+ workflowQueueRegistry : make (map [string ]WorkflowQueue ),
161161 completionChan : make (chan struct {}),
162162 }
163163}
164164
165- func (qr * queueRunner ) listQueues () []QueueOptions {
166- queues := make ([]QueueOptions , 0 , len (qr .workflowQueueRegistry ))
165+ func (qr * queueRunner ) listQueues () []WorkflowQueue {
166+ queues := make ([]WorkflowQueue , 0 , len (qr .workflowQueueRegistry ))
167167 for _ , queue := range qr .workflowQueueRegistry {
168168 queues = append (queues , queue )
169169 }
0 commit comments