@@ -11,23 +11,9 @@ import (
1111 "github.com/cloudquery/plugin-sdk/v4/helpers"
1212 "github.com/cloudquery/plugin-sdk/v4/schema"
1313 "github.com/getsentry/sentry-go"
14- "golang.org/x/sync/semaphore"
1514)
1615
17- func (s * Scheduler ) syncDfs (ctx context.Context , resolvedResources chan <- * schema.Resource , syncOpts * SyncOptions ) {
18- // This is very similar to the concurrent web crawler problem with some minor changes.
19- // We are using DFS to make sure memory usage is capped at O(h) where h is the height of the tree.
20- tableConcurrency := max (s .concurrency / minResourceConcurrency , minTableConcurrency )
21- resourceConcurrency := tableConcurrency * minResourceConcurrency
22-
23- s .tableSems = make ([]* semaphore.Weighted , s .maxDepth )
24- for i := uint64 (0 ); i < s .maxDepth ; i ++ {
25- s .tableSems [i ] = semaphore .NewWeighted (int64 (tableConcurrency ))
26- // reduce table concurrency logarithmically for every depth level
27- tableConcurrency = max (tableConcurrency / 2 , minTableConcurrency )
28- }
29- s .resourceSem = semaphore .NewWeighted (int64 (resourceConcurrency ))
30-
16+ func (s * syncClient ) syncDfs (ctx context.Context , resolvedResources chan <- * schema.Resource ) {
3117 // we have this because plugins can return sometimes clients in a random way which will cause
3218 // differences between this run and the next one.
3319 preInitialisedClients := make ([][]schema.ClientMeta , len (s .tables ))
@@ -61,18 +47,18 @@ func (s *Scheduler) syncDfs(ctx context.Context, resolvedResources chan<- *schem
6147 clients := preInitialisedClients [i ]
6248 for _ , client := range clients {
6349 client := client
64- if err := s .tableSems [0 ].Acquire (ctx , 1 ); err != nil {
50+ if err := s .scheduler . tableSems [0 ].Acquire (ctx , 1 ); err != nil {
6551 // This means context was cancelled
6652 wg .Wait ()
6753 return
6854 }
6955 wg .Add (1 )
7056 go func () {
7157 defer wg .Done ()
72- defer s .tableSems [0 ].Release (1 )
58+ defer s .scheduler . tableSems [0 ].Release (1 )
7359 // not checking for error here as nothing much todo.
7460 // the error is logged and this happens when context is cancelled
75- s .resolveTableDfs (ctx , table , client , nil , resolvedResources , 1 , syncOpts )
61+ s .resolveTableDfs (ctx , table , client , nil , resolvedResources , 1 )
7662 }()
7763 }
7864 }
@@ -81,7 +67,7 @@ func (s *Scheduler) syncDfs(ctx context.Context, resolvedResources chan<- *schem
8167 wg .Wait ()
8268}
8369
84- func (s * Scheduler ) resolveTableDfs (ctx context.Context , table * schema.Table , client schema.ClientMeta , parent * schema.Resource , resolvedResources chan <- * schema.Resource , depth int , syncOpts * SyncOptions ) {
70+ func (s * syncClient ) resolveTableDfs (ctx context.Context , table * schema.Table , client schema.ClientMeta , parent * schema.Resource , resolvedResources chan <- * schema.Resource , depth int ) {
8571 var validationErr * schema.ValidationError
8672 clientName := client .ID ()
8773 logger := s .logger .With ().Str ("table" , table .Name ).Str ("client" , clientName ).Logger ()
@@ -119,7 +105,7 @@ func (s *Scheduler) resolveTableDfs(ctx context.Context, table *schema.Table, cl
119105 }()
120106
121107 for r := range res {
122- s .resolveResourcesDfs (ctx , table , client , parent , r , resolvedResources , depth , syncOpts )
108+ s .resolveResourcesDfs (ctx , table , client , parent , r , resolvedResources , depth )
123109 }
124110
125111 // we don't need any waitgroups here because we are waiting for the channel to close
@@ -129,7 +115,7 @@ func (s *Scheduler) resolveTableDfs(ctx context.Context, table *schema.Table, cl
129115 }
130116}
131117
132- func (s * Scheduler ) resolveResourcesDfs (ctx context.Context , table * schema.Table , client schema.ClientMeta , parent * schema.Resource , resources any , resolvedResources chan <- * schema.Resource , depth int , syncOpts * SyncOptions ) {
118+ func (s * syncClient ) resolveResourcesDfs (ctx context.Context , table * schema.Table , client schema.ClientMeta , parent * schema.Resource , resources any , resolvedResources chan <- * schema.Resource , depth int ) {
133119 resourcesSlice := helpers .InterfaceSlice (resources )
134120 if len (resourcesSlice ) == 0 {
135121 return
@@ -141,23 +127,23 @@ func (s *Scheduler) resolveResourcesDfs(ctx context.Context, table *schema.Table
141127 sentValidationErrors := sync.Map {}
142128 for i := range resourcesSlice {
143129 i := i
144- if err := s .resourceSem .Acquire (ctx , 1 ); err != nil {
130+ if err := s .scheduler . resourceSem .Acquire (ctx , 1 ); err != nil {
145131 s .logger .Warn ().Err (err ).Msg ("failed to acquire semaphore. context cancelled" )
146132 wg .Wait ()
147133 // we have to continue emptying the channel to exit gracefully
148134 return
149135 }
150136 wg .Add (1 )
151137 go func () {
152- defer s .resourceSem .Release (1 )
138+ defer s .scheduler . resourceSem .Release (1 )
153139 defer wg .Done ()
154140 //nolint:all
155141 resolvedResource := s .resolveResource (ctx , table , client , parent , resourcesSlice [i ])
156142 if resolvedResource == nil {
157143 return
158144 }
159145
160- if err := resolvedResource .CalculateCQID (syncOpts . DeterministicCQID ); err != nil {
146+ if err := resolvedResource .CalculateCQID (s . deterministicCQID ); err != nil {
161147 tableMetrics := s .metrics .TableClient [table .Name ][client .ID ()]
162148 s .logger .Error ().Err (err ).Str ("table" , table .Name ).Str ("client" , client .ID ()).Msg ("resource resolver finished with primary key calculation error" )
163149 if _ , found := sentValidationErrors .LoadOrStore (table .Name , struct {}{}); ! found {
@@ -197,16 +183,16 @@ func (s *Scheduler) resolveResourcesDfs(ctx context.Context, table *schema.Table
197183 resolvedResources <- resource
198184 for _ , relation := range resource .Table .Relations {
199185 relation := relation
200- if err := s .tableSems [depth ].Acquire (ctx , 1 ); err != nil {
186+ if err := s .scheduler . tableSems [depth ].Acquire (ctx , 1 ); err != nil {
201187 // This means context was cancelled
202188 wg .Wait ()
203189 return
204190 }
205191 wg .Add (1 )
206192 go func () {
207193 defer wg .Done ()
208- defer s .tableSems [depth ].Release (1 )
209- s .resolveTableDfs (ctx , relation , client , resource , resolvedResources , depth + 1 , syncOpts )
194+ defer s .scheduler . tableSems [depth ].Release (1 )
195+ s .resolveTableDfs (ctx , relation , client , resource , resolvedResources , depth + 1 )
210196 }()
211197 }
212198 }
0 commit comments