Skip to content

Commit 7b07b5a

Browse files
committed
fix global concurrency
1 parent 19cceaf commit 7b07b5a

File tree

5 files changed

+17
-12
lines changed

5 files changed

+17
-12
lines changed

dbos/queue.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,13 @@ import (
1414
)
1515

1616
var (
17-
workflowQueueRegistry = make(map[string]WorkflowQueue)
18-
DBOS_INTERNAL_QUEUE_NAME = "_dbos_internal_queue"
19-
_ = NewWorkflowQueue(DBOS_INTERNAL_QUEUE_NAME)
17+
workflowQueueRegistry = make(map[string]WorkflowQueue)
18+
_ = NewWorkflowQueue(_DBOS_INTERNAL_QUEUE_NAME)
19+
)
20+
21+
const (
22+
_DBOS_INTERNAL_QUEUE_NAME = "_dbos_internal_queue"
23+
_DEFAULT_MAX_TASKS_PER_ITERATION = 100
2024
)
2125

2226
// RateLimiter represents a rate limiting configuration
@@ -31,7 +35,7 @@ type WorkflowQueue struct {
3135
GlobalConcurrency *int
3236
PriorityEnabled bool
3337
Limiter *RateLimiter
34-
MaxTasksPerIteration uint
38+
MaxTasksPerIteration int
3539
}
3640

3741
// QueueOption is a functional option for configuring a workflow queue
@@ -61,7 +65,7 @@ func WithRateLimiter(limiter *RateLimiter) QueueOption {
6165
}
6266
}
6367

64-
func WithMaxTasksPerIteration(maxTasks uint) QueueOption {
68+
func WithMaxTasksPerIteration(maxTasks int) QueueOption {
6569
return func(q *WorkflowQueue) {
6670
q.MaxTasksPerIteration = maxTasks
6771
}
@@ -84,7 +88,7 @@ func NewWorkflowQueue(name string, options ...QueueOption) WorkflowQueue {
8488
GlobalConcurrency: nil,
8589
PriorityEnabled: false,
8690
Limiter: nil,
87-
MaxTasksPerIteration: 100, // Default max tasks per iteration
91+
MaxTasksPerIteration: _DEFAULT_MAX_TASKS_PER_ITERATION,
8892
}
8993

9094
// Apply functional options

dbos/queues_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,7 @@ func TestGlobalConcurrency(t *testing.T) {
405405

406406
// Wait for the first workflow to start
407407
workflowEvent1.Wait()
408+
time.Sleep(2 * time.Second) // Wait for a few seconds to let the queue runner loop
408409

409410
// Ensure the second workflow has not started yet
410411
if workflowEvent2.IsSet {

dbos/system_database.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,7 +1321,7 @@ func (s *systemDatabase) DequeueWorkflows(ctx context.Context, queue WorkflowQue
13211321
getLogger().Warn("Local pending workflows on queue exceeds worker concurrency limit", "local_pending", localPendingWorkflows, "queue_name", queue.Name, "concurrency_limit", workerConcurrency)
13221322
}
13231323
availableWorkerTasks := max(workerConcurrency-localPendingWorkflows, 0)
1324-
maxTasks = uint(availableWorkerTasks)
1324+
maxTasks = availableWorkerTasks
13251325
}
13261326

13271327
// Check global concurrency limit
@@ -1336,8 +1336,8 @@ func (s *systemDatabase) DequeueWorkflows(ctx context.Context, queue WorkflowQue
13361336
getLogger().Warn("Total pending workflows on queue exceeds global concurrency limit", "total_pending", globalPendingWorkflows, "queue_name", queue.Name, "concurrency_limit", concurrency)
13371337
}
13381338
availableTasks := max(concurrency-globalPendingWorkflows, 0)
1339-
if uint(availableTasks) < maxTasks {
1340-
maxTasks = uint(availableTasks)
1339+
if availableTasks < maxTasks {
1340+
maxTasks = availableTasks
13411341
}
13421342
}
13431343
}
@@ -1375,7 +1375,7 @@ func (s *systemDatabase) DequeueWorkflows(ctx context.Context, queue WorkflowQue
13751375
}
13761376

13771377
// Add limit if maxTasks is finite
1378-
if maxTasks > 0 {
1378+
if maxTasks >= 0 {
13791379
query += fmt.Sprintf(" LIMIT %d", int(maxTasks))
13801380
}
13811381

dbos/utils_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func queueEntriesAreCleanedUp() bool {
153153
AND status IN ('ENQUEUED', 'PENDING')`
154154

155155
var count int
156-
err = tx.QueryRow(context.Background(), query, DBOS_INTERNAL_QUEUE_NAME).Scan(&count)
156+
err = tx.QueryRow(context.Background(), query, _DBOS_INTERNAL_QUEUE_NAME).Scan(&count)
157157
tx.Rollback(context.Background()) // Clean up transaction
158158

159159
if err != nil {

dbos/workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ func WithWorkflow[P any, R any](fn WorkflowFunc[P, R], opts ...WorkflowRegistrat
301301
scheduledTime = entry.Next
302302
}
303303
wfID := fmt.Sprintf("sched-%s-%s", fqn, scheduledTime) // XXX we can rethink the format
304-
wrappedFunction(context.Background(), any(scheduledTime).(P), WithWorkflowID(wfID), WithQueue(DBOS_INTERNAL_QUEUE_NAME))
304+
wrappedFunction(context.Background(), any(scheduledTime).(P), WithWorkflowID(wfID), WithQueue(_DBOS_INTERNAL_QUEUE_NAME))
305305
})
306306
if err != nil {
307307
panic(fmt.Sprintf("failed to register scheduled workflow: %v", err))

0 commit comments

Comments
 (0)