Skip to content

Commit 972e100

Browse files
committed
Update scheduler_dfs.go
1 parent 78221e7 commit 972e100

File tree

1 file changed

+22
-4
lines changed

1 file changed

+22
-4
lines changed

scheduler/scheduler_dfs.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,17 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
163163
chunks = lo.Chunk(resourcesSlice, table.PreResourceChunkResolver.ChunkSize)
164164
}
165165
for i := range chunks {
166-
resourceConcurrencyKey := table.Name + "-" + client.ID() + "-" + "resource"
167-
resourceSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(resourceConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleResourceMaxConcurrency))
166+
resourceConcurrencyKey := table.Name
167+
if table.ConcurrencySettings != nil && lo.FromPtr(table.ConcurrencySettings.ConcurrencyKey) != "" {
168+
resourceConcurrencyKey = lo.FromPtr(table.ConcurrencySettings.ConcurrencyKey)
169+
}
170+
resourceConcurrencyKey = resourceConcurrencyKey + "-" + client.ID() + "-" + "resource"
171+
resourceConcurrency := s.scheduler.singleResourceMaxConcurrency
172+
if table.ConcurrencySettings != nil && lo.FromPtr(table.ConcurrencySettings.MaxResourceConcurrency) > 0 {
173+
resourceConcurrency = int64(lo.FromPtr(table.ConcurrencySettings.MaxResourceConcurrency))
174+
}
175+
176+
resourceSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(resourceConcurrencyKey, semaphore.NewWeighted(resourceConcurrency))
168177
resourceSem := resourceSemVal.(*semaphore.Weighted)
169178
if err := resourceSem.Acquire(ctx, 1); err != nil {
170179
s.logger.Warn().Err(err).Msg("failed to acquire semaphore. context cancelled")
@@ -227,9 +236,18 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
227236
resolvedResources <- resource
228237
for _, relation := range resource.Table.Relations {
229238
relation := relation
230-
tableConcurrencyKey := table.Name + "-" + client.ID()
239+
tableConcurrencyKey := table.Name
240+
if table.ConcurrencySettings != nil && lo.FromPtr(table.ConcurrencySettings.ConcurrencyKey) != "" {
241+
tableConcurrencyKey = lo.FromPtr(table.ConcurrencySettings.ConcurrencyKey)
242+
}
243+
tableConcurrencyKey = tableConcurrencyKey + "-" + client.ID()
244+
tableConcurrency := s.scheduler.singleNestedTableMaxConcurrency
245+
if table.ConcurrencySettings != nil && lo.FromPtr(table.ConcurrencySettings.MaxTableConcurrency) > 0 {
246+
tableConcurrency = int64(lo.FromPtr(table.ConcurrencySettings.MaxTableConcurrency))
247+
}
248+
231249
// Acquire the semaphore for the table
232-
tableSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(tableConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleNestedTableMaxConcurrency))
250+
tableSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(tableConcurrencyKey, semaphore.NewWeighted(tableConcurrency))
233251
tableSem := tableSemVal.(*semaphore.Weighted)
234252
if err := tableSem.Acquire(ctx, 1); err != nil {
235253
// This means context was cancelled

0 commit comments

Comments
 (0)