Skip to content

Commit 1df6687

Browse files
committed
configurable buffer
1 parent 041ee48 commit 1df6687

File tree

4 files changed

+19
-1
lines changed

4 files changed

+19
-1
lines changed

cmd/hatchet-engine/engine/run.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ func runV0Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro
356356
dispatcher.WithCache(cacheInstance),
357357
dispatcher.WithPayloadSizeThreshold(sc.Runtime.GRPCMaxMsgSize),
358358
dispatcher.WithDefaultMaxWorkerBacklogSize(int64(sc.Runtime.GRPCWorkerStreamMaxBacklogSize)),
359+
dispatcher.WithWorkflowRunBufferSize(sc.Runtime.WorkflowRunBufferSize),
359360
)
360361

361362
if err != nil {
@@ -722,6 +723,7 @@ func runV1Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro
722723
dispatcher.WithCache(cacheInstance),
723724
dispatcher.WithPayloadSizeThreshold(sc.Runtime.GRPCMaxMsgSize),
724725
dispatcher.WithDefaultMaxWorkerBacklogSize(int64(sc.Runtime.GRPCWorkerStreamMaxBacklogSize)),
726+
dispatcher.WithWorkflowRunBufferSize(sc.Runtime.WorkflowRunBufferSize),
725727
)
726728

727729
if err != nil {

internal/services/dispatcher/dispatcher.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type DispatcherImpl struct {
4343
cache cache.Cacheable
4444
payloadSizeThreshold int
4545
defaultMaxWorkerBacklogSize int64
46+
workflowRunBufferSize int
4647

4748
dispatcherId string
4849
workers *workers
@@ -122,6 +123,7 @@ type DispatcherOpts struct {
122123
cache cache.Cacheable
123124
payloadSizeThreshold int
124125
defaultMaxWorkerBacklogSize int64
126+
workflowRunBufferSize int
125127
}
126128

127129
func defaultDispatcherOpts() *DispatcherOpts {
@@ -135,6 +137,7 @@ func defaultDispatcherOpts() *DispatcherOpts {
135137
alerter: alerter,
136138
payloadSizeThreshold: 3 * 1024 * 1024,
137139
defaultMaxWorkerBacklogSize: 20,
140+
workflowRunBufferSize: 1000,
138141
}
139142
}
140143

@@ -192,6 +195,12 @@ func WithDefaultMaxWorkerBacklogSize(size int64) DispatcherOpt {
192195
}
193196
}
194197

198+
func WithWorkflowRunBufferSize(size int) DispatcherOpt {
199+
return func(opts *DispatcherOpts) {
200+
opts.workflowRunBufferSize = size
201+
}
202+
}
203+
195204
func New(fs ...DispatcherOpt) (*DispatcherImpl, error) {
196205
opts := defaultDispatcherOpts()
197206

@@ -240,6 +249,7 @@ func New(fs ...DispatcherOpt) (*DispatcherImpl, error) {
240249
cache: opts.cache,
241250
payloadSizeThreshold: opts.payloadSizeThreshold,
242251
defaultMaxWorkerBacklogSize: opts.defaultMaxWorkerBacklogSize,
252+
workflowRunBufferSize: opts.workflowRunBufferSize,
243253
}, nil
244254
}
245255

internal/services/dispatcher/server_v1.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ func (s *DispatcherImpl) subscribeToWorkflowRunsV1(server contracts.Dispatcher_S
366366
iterCtx, iterSpan := telemetry.NewSpan(ctx, "subscribe_to_workflow_runs_v1.iter")
367367
defer iterSpan.End()
368368

369-
bufferSize := 1000
369+
bufferSize := s.workflowRunBufferSize
370370

371371
if len(workflowRunIds) > bufferSize {
372372
ringMu.Lock()

pkg/config/server/server.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,9 @@ type ConfigFileRuntime struct {
284284

285285
// TaskOperationLimits controls the limits for various task operations
286286
TaskOperationLimits TaskOperationLimitsConfigFile `mapstructure:"taskOperationLimits" json:"taskOperationLimits,omitempty"`
287+
288+
// WorkflowRunBufferSize is the buffer size for workflow run event batching in the dispatcher
289+
WorkflowRunBufferSize int `mapstructure:"workflowRunBufferSize" json:"workflowRunBufferSize,omitempty" default:"1000"`
287290
}
288291

289292
type InternalClientTLSConfigFile struct {
@@ -851,6 +854,9 @@ func BindAllEnv(v *viper.Viper) {
851854
_ = v.BindEnv("taskOperationLimits.retryQueueLimit", "SERVER_TASK_OPERATION_LIMITS_RETRY_QUEUE_LIMIT")
852855
_ = v.BindEnv("taskOperationLimits.durableSleepLimit", "SERVER_TASK_OPERATION_LIMITS_DURABLE_SLEEP_LIMIT")
853856

857+
// dispatcher options
858+
_ = v.BindEnv("runtime.workflowRunBufferSize", "SERVER_WORKFLOW_RUN_BUFFER_SIZE")
859+
854860
// payload store options
855861
_ = v.BindEnv("payloadStore.enablePayloadDualWrites", "SERVER_PAYLOAD_STORE_ENABLE_PAYLOAD_DUAL_WRITES")
856862
_ = v.BindEnv("payloadStore.enableTaskEventPayloadDualWrites", "SERVER_PAYLOAD_STORE_ENABLE_TASK_EVENT_PAYLOAD_DUAL_WRITES")

0 commit comments

Comments
 (0)