Skip to content

Commit 57dadb4

Browse files
authored
CRE-1678: config option for maxConcurrency in wf syncer (#21324)
* cre-1678: config option for maxConcurrency in wf syncer * cre-1678: test data fix * cre-1678: minor
1 parent 0c3919a commit 57dadb4

25 files changed

+39
-1
lines changed

core/config/capabilities_config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type CapabilitiesWorkflowRegistry interface {
3232
MaxConfigSize() utils.FileSize
3333
RelayID() types.RelayID
3434
SyncStrategy() string
35+
MaxConcurrency() int
3536
WorkflowStorage() WorkflowStorage
3637
AdditionalSources() []AdditionalWorkflowSource
3738
}

core/config/docs/core.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,8 @@ MaxConfigSize = '50.00kb' # Default
519519
# SyncStrategy is the strategy that will be used to bring the node up to date with the latest Workflow Registry contract state.
520520
# Options are: event which watches for contract events or reconciliation which diffs workflow metadata state.
521521
SyncStrategy = 'event' # Default
522+
# MaxConcurrency controls the maximum number of concurrent event handlers in the workflow registry syncer.
523+
MaxConcurrency = 12 # Default
522524

523525
[Capabilities.WorkflowRegistry.WorkflowStorage]
524526
# URL is the location for the workflow storage service to be communicated with.

core/config/toml/types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2264,6 +2264,7 @@ type WorkflowRegistry struct {
22642264
MaxEncryptedSecretsSize *utils.FileSize
22652265
MaxConfigSize *utils.FileSize
22662266
SyncStrategy *string
2267+
MaxConcurrency *int
22672268
WorkflowStorage WorkflowStorage
22682269
AdditionalSourcesConfig []AdditionalWorkflowSource `toml:"AdditionalSources"`
22692270
}
@@ -2301,6 +2302,10 @@ func (r *WorkflowRegistry) setFrom(f *WorkflowRegistry) {
23012302
r.SyncStrategy = f.SyncStrategy
23022303
}
23032304

2305+
if f.MaxConcurrency != nil {
2306+
r.MaxConcurrency = f.MaxConcurrency
2307+
}
2308+
23042309
r.WorkflowStorage.setFrom(&f.WorkflowStorage)
23052310

23062311
if len(f.AdditionalSourcesConfig) > 0 {

core/services/chainlink/config_capabilities.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,10 @@ func (c *capabilitiesWorkflowRegistry) SyncStrategy() string {
229229
return *c.c.SyncStrategy
230230
}
231231

232+
func (c *capabilitiesWorkflowRegistry) MaxConcurrency() int {
233+
return *c.c.MaxConcurrency
234+
}
235+
232236
func (c *capabilitiesWorkflowRegistry) WorkflowStorage() config.WorkflowStorage {
233237
return &workflowStorage{
234238
c: c.c.WorkflowStorage,

core/services/chainlink/config_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,7 @@ func TestConfig_Marshal(t *testing.T) {
531531
MaxEncryptedSecretsSize: ptr(utils.FileSize(26.4 * utils.KB)),
532532
MaxConfigSize: ptr(utils.FileSize(50 * utils.KB)),
533533
SyncStrategy: ptr("event"),
534+
MaxConcurrency: ptr(12),
534535
WorkflowStorage: toml.WorkflowStorage{
535536
ArtifactStorageHost: ptr(""),
536537
URL: ptr(""),

core/services/chainlink/testdata/config-empty-effective.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ MaxBinarySize = '20.00mb'
326326
MaxEncryptedSecretsSize = '26.40kb'
327327
MaxConfigSize = '50.00kb'
328328
SyncStrategy = 'event'
329+
MaxConcurrency = 12
329330
AdditionalSources = []
330331

331332
[Capabilities.WorkflowRegistry.WorkflowStorage]

core/services/chainlink/testdata/config-full.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ MaxBinarySize = '20.00mb'
336336
MaxEncryptedSecretsSize = '26.40kb'
337337
MaxConfigSize = '50.00kb'
338338
SyncStrategy = 'event'
339+
MaxConcurrency = 12
339340

340341
[Capabilities.WorkflowRegistry.WorkflowStorage]
341342
ArtifactStorageHost = ''

core/services/chainlink/testdata/config-multi-chain-effective.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ MaxBinarySize = '20.00mb'
326326
MaxEncryptedSecretsSize = '26.40kb'
327327
MaxConfigSize = '50.00kb'
328328
SyncStrategy = 'event'
329+
MaxConcurrency = 12
329330
AdditionalSources = []
330331

331332
[Capabilities.WorkflowRegistry.WorkflowStorage]

core/services/cre/cre.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -936,6 +936,7 @@ func newWorkflowRegistrySyncerV2(
936936
registryOpts := []syncerV2.Option{
937937
syncerV2.WithAdditionalSources(addSourceConfigs),
938938
syncerV2.WithShardOrchestratorClient(shardOrchestratorClient),
939+
syncerV2.WithMaxConcurrency(capCfg.WorkflowRegistry().MaxConcurrency()),
939940
}
940941
if cfg.Sharding().ShardingEnabled() {
941942
registryOpts = append(registryOpts,

core/services/workflows/syncer/v2/workflow_registry.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ var (
3434
defaultTickInterval = 12 * time.Second
3535
defaultRetryInterval = 12 * time.Second
3636
defaultMaxRetryInterval = 5 * time.Minute
37-
defaultMaxConcurrency = 50
37+
defaultMaxConcurrency = 12
3838
WorkflowRegistryContractName = "WorkflowRegistry"
3939

4040
GetWorkflowsByDONMethodName = "getWorkflowListByDON"

0 commit comments

Comments
 (0)