Skip to content

Commit 7717d6f

Browse files
authored
feat!: Support table_concurrency and resource_concurrency (#268)
#### Summary Closes cloudquery/cloudquery#2313 cloudquery/cloudquery#159 #264 This renames `concurrency` to `table_concurrency` (this has the same behaviour as previously but the name was too generic). Also this introduce a new option `resource_concurrency` - this limits the number of go routines that resolve a specific resource (useful when a lot of column resolvers that have api calls and `PreResourceResolver`). Importe Note: both options are only for top-level concurrency control i.e we only spawn go-routines for parent tables and for resources of parent table. The reason for that is that there is no concurrency model that can work with one variable for recursive calls otherwise it will get deadlock. So right now the SDK will support only for one level - in the future if we want to support additional level we can do `table_concurrency_1` and `resource_concurrency_1` but I think for now we can skip this.
1 parent f8ca238 commit 7717d6f

File tree

9 files changed

+75
-43
lines changed

9 files changed

+75
-43
lines changed

plugins/source.go

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,6 @@ type SourcePlugin struct {
3131
tables schema.Tables
3232
}
3333

34-
const (
35-
defaultConcurrency = 500000
36-
)
37-
3834
// Add internal columns
3935
func addInternalColumns(tables []*schema.Table) {
4036
for _, table := range tables {
@@ -106,25 +102,25 @@ func (p *SourcePlugin) Version() string {
106102

107103
// Sync is syncing data from the requested tables in spec to the given channel
108104
func (p *SourcePlugin) Sync(ctx context.Context, logger zerolog.Logger, spec specs.Source, res chan<- *schema.Resource) (*schema.SyncSummary, error) {
109-
c, err := p.newExecutionClient(ctx, logger, spec)
105+
spec.SetDefaults()
106+
if err := spec.Validate(); err != nil {
107+
return nil, fmt.Errorf("invalid spec: %w", err)
108+
}
109+
tableNames, err := p.listAndValidateTables(spec.Tables, spec.SkipTables)
110110
if err != nil {
111-
return nil, fmt.Errorf("failed to create execution client for source plugin %s: %w", p.name, err)
111+
return nil, err
112112
}
113113

114-
// limiter used to limit the amount of resources fetched concurrently
115-
concurrency := spec.Concurrency
116-
if concurrency == 0 {
117-
concurrency = defaultConcurrency
114+
c, err := p.newExecutionClient(ctx, logger, spec)
115+
if err != nil {
116+
return nil, fmt.Errorf("failed to create execution client for source plugin %s: %w", p.name, err)
118117
}
119-
logger.Info().Uint64("concurrency", concurrency).Msg("starting source plugin sync")
120-
goroutinesSem := semaphore.NewWeighted(helpers.Uint64ToInt64(concurrency))
118+
logger.Info().Interface("spec", spec).Msg("starting sync")
119+
tableSem := semaphore.NewWeighted(helpers.Uint64ToInt64(spec.TableConcurrency))
121120
wg := sync.WaitGroup{}
121+
resourceSem := semaphore.NewWeighted(helpers.Uint64ToInt64(spec.ResourceConcurrency))
122122
summary := schema.SyncSummary{}
123123
startTime := time.Now()
124-
tableNames, err := p.listAndValidateTables(spec.Tables, spec.SkipTables)
125-
if err != nil {
126-
return nil, err
127-
}
128124

129125
logger.Debug().Interface("tables", tableNames).Msg("got table names")
130126

@@ -141,16 +137,16 @@ func (p *SourcePlugin) Sync(ctx context.Context, logger zerolog.Logger, spec spe
141137
for _, client := range clients {
142138
client := client
143139
wg.Add(1)
144-
if err := goroutinesSem.Acquire(ctx, 1); err != nil {
140+
if err := tableSem.Acquire(ctx, 1); err != nil {
145141
// This means context was cancelled
146142
return nil, err
147143
}
148144
go func() {
149145
defer wg.Done()
150-
defer goroutinesSem.Release(1)
146+
defer tableSem.Release(1)
151147
// TODO: prob introduce client.Identify() to be used in logs
152148

153-
tableSummary := table.Resolve(ctx, client, nil, res)
149+
tableSummary := table.Resolve(ctx, client, nil, resourceSem, res)
154150
atomic.AddUint64(&summary.Resources, tableSummary.Resources)
155151
atomic.AddUint64(&summary.Errors, tableSummary.Errors)
156152
atomic.AddUint64(&summary.Panics, tableSummary.Panics)

plugins/source_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,10 @@ func TestSync(t *testing.T) {
5353
)
5454

5555
spec := specs.Source{
56-
Name: "testSource",
57-
Tables: []string{"*"},
56+
Name: "testSource",
57+
Tables: []string{"*"},
58+
Version: "v1.0.0",
59+
Destinations: []string{"test"},
5860
}
5961

6062
resources := make(chan *schema.Resource)

schema/table.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/cloudquery/plugin-sdk/helpers"
1313
"github.com/getsentry/sentry-go"
1414
"github.com/thoas/go-funk"
15+
"golang.org/x/sync/semaphore"
1516
)
1617

1718
// TableResolver is the main entry point when a table is sync is called.
@@ -64,9 +65,6 @@ type Table struct {
6465
// Parent is the parent table in case this table is called via parent table (i.e. relation)
6566
Parent *Table `json:"-"`
6667

67-
// Serial is used to force a signature change, which forces new table creation and cascading removal of old table and relations
68-
Serial string `json:"-"`
69-
7068
columnsMap map[string]int
7169
}
7270

@@ -165,13 +163,16 @@ func (t Table) TableNames() []string {
165163
}
166164

167165
// Call the table resolver with with all of it's relation for every reolved resource
168-
func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, resolvedResources chan<- *Resource) (summary SyncSummary) {
166+
func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, resourcesSem *semaphore.Weighted, resolvedResources chan<- *Resource) (summary SyncSummary) {
169167
tableStartTime := time.Now()
170168
meta.Logger().Info().Str("table", t.Name).Msg("table resolver started")
171169

172170
res := make(chan interface{})
173171
startTime := time.Now()
172+
wg := sync.WaitGroup{}
173+
wg.Add(1)
174174
go func(sum *SyncSummary) {
175+
defer wg.Done()
175176
defer func() {
176177
if err := recover(); err != nil {
177178
stack := fmt.Sprintf("%s\n%s", err, string(debug.Stack()))
@@ -199,9 +200,26 @@ func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, r
199200
continue
200201
}
201202
for i := range objects {
202-
summary.Merge(t.resolveObject(ctx, meta, parent, objects[i], resolvedResources))
203+
i := i
204+
205+
// right now we support concurrency only for objects/resources of parent tables
206+
if resourcesSem == nil {
207+
summary.Merge(t.resolveObject(ctx, meta, parent, objects[i], resolvedResources))
208+
} else {
209+
if err := resourcesSem.Acquire(ctx, 1); err != nil {
210+
meta.Logger().Error().Err(err).Msg("failed to acquire semaphore")
211+
return summary
212+
}
213+
wg.Add(1)
214+
go func() {
215+
defer wg.Done()
216+
//nolint:all
217+
summary.Merge(t.resolveObject(ctx, meta, parent, objects[i], resolvedResources))
218+
}()
219+
}
203220
}
204221
}
222+
wg.Wait()
205223
meta.Logger().Info().Str("table", t.Name).Int("total_resources", tableResources).TimeDiff("duration", time.Now(), tableStartTime).Msg("fetch table finished")
206224

207225
return summary
@@ -270,7 +288,7 @@ func (t Table) resolveObject(ctx context.Context, meta ClientMeta, parent *Resou
270288
resolvedResources <- resource
271289

272290
for _, rel := range t.Relations {
273-
summary.Merge(rel.Resolve(ctx, meta, resource, resolvedResources))
291+
summary.Merge(rel.Resolve(ctx, meta, resource, nil, resolvedResources))
274292
}
275293

276294
return summary

schema/table_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ func TestTableExecution(t *testing.T) {
217217
var summary SyncSummary
218218
go func() {
219219
defer close(resources)
220-
summary = tc.Table.Resolve(ctx, m, nil, resources)
220+
summary = tc.Table.Resolve(ctx, m, nil, nil, resources)
221221
}()
222222
var i = uint64(0)
223223
for resource := range resources {

serve/source_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,12 @@ func TestServeSource(t *testing.T) {
142142
resources := make(chan []byte, 1)
143143
if err := c.Sync(ctx,
144144
specs.Source{
145-
Name: "testSourcePlugin",
146-
Version: "v1.0.0",
147-
Registry: specs.RegistryGithub,
148-
Tables: []string{"*"},
149-
Spec: TestSourcePluginSpec{Accounts: []string{"cloudquery/plugin-sdk"}},
145+
Name: "testSourcePlugin",
146+
Version: "v1.0.0",
147+
Registry: specs.RegistryGithub,
148+
Tables: []string{"*"},
149+
Spec: TestSourcePluginSpec{Accounts: []string{"cloudquery/plugin-sdk"}},
150+
Destinations: []string{"test"},
150151
},
151152
resources); err != nil {
152153
t.Fatal(err)

specs/source.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ import (
77
"strings"
88
)
99

10+
const (
11+
defaultTableConcurrency = 500000
12+
defaultResourceConcurrency = 500000
13+
)
14+
1015
// Source is the spec for a source plugin
1116
type Source struct {
1217
// Name of the source plugin to use
@@ -20,8 +25,9 @@ type Source struct {
2025
// For the gRPC registry the path will be the address of the gRPC server: host:port
2126
Path string `json:"path,omitempty"`
2227
// Registry can be github,local,grpc.
23-
Registry Registry `json:"registry,omitempty"`
24-
Concurrency uint64 `json:"concurrency,omitempty"`
28+
Registry Registry `json:"registry,omitempty"`
29+
TableConcurrency uint64 `json:"table_concurrency,omitempty"`
30+
ResourceConcurrency uint64 `json:"resource_concurrency,omitempty"`
2531
// Tables to sync from the source plugin
2632
Tables []string `json:"tables,omitempty"`
2733
// SkipTables defines tables to skip when syncing data. Useful if a glob pattern is used in Tables
@@ -46,6 +52,13 @@ func (s *Source) SetDefaults() {
4652
if s.Tables == nil {
4753
s.Tables = []string{"*"}
4854
}
55+
56+
if s.TableConcurrency == 0 {
57+
s.TableConcurrency = defaultTableConcurrency
58+
}
59+
if s.ResourceConcurrency == 0 {
60+
s.ResourceConcurrency = defaultResourceConcurrency
61+
}
4962
}
5063

5164
// UnmarshalSpec unmarshals the internal spec into the given interface

specs/source_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,14 @@ spec:
117117
`,
118118
"",
119119
&Source{
120-
Name: "test",
121-
Registry: RegistryGithub,
122-
Path: "cloudquery/test",
123-
Version: "v1.1.0",
124-
Tables: []string{"*"},
125-
Destinations: []string{"test"},
120+
Name: "test",
121+
Registry: RegistryGithub,
122+
Path: "cloudquery/test",
123+
TableConcurrency: defaultTableConcurrency,
124+
ResourceConcurrency: defaultResourceConcurrency,
125+
Version: "v1.1.0",
126+
Tables: []string{"*"},
127+
Destinations: []string{"test"},
126128
},
127129
},
128130
}

specs/testdata/dir/aws.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@ kind: source
22
spec:
33
name: aws
44
version: v1.0.0
5-
concurrency: 10
5+
table_concurrency: 10
66
registry: local
77
destinations: [postgresql]

specs/testdata/gcp.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ kind: source
22
spec:
33
name: gcp
44
version: v1.0.0
5-
concurrency: 10
5+
table_concurrency: 10
66
registry: local
77
destinations: [postgresqlv2]
88
---

0 commit comments

Comments
 (0)