Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 27 additions & 25 deletions scheduler/queue/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/rs/zerolog"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -140,37 +141,38 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien
go func() {
defer close(resourcesChan)
var wg sync.WaitGroup
for i := range resourcesSlice {
i := i
chunks := [][]any{resourcesSlice}
if table.PreResourceChunkResolver != nil {
chunks = lo.Chunk(resourcesSlice, table.PreResourceChunkResolver.ChunkSize)
}
for i := range chunks {
wg.Add(1)
go func() {
defer wg.Done()
resolvedResource := resolvers.ResolveSingleResource(ctx, w.logger, w.metrics, table, client, parent, resourcesSlice[i], w.caser)
if resolvedResource == nil {
return
}

if err := resolvedResource.CalculateCQID(w.deterministicCQID); err != nil {
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error")
w.metrics.AddErrors(ctx, 1, selector)
return
}
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
}
if err := resolvedResource.Validate(); err != nil {
switch err.(type) {
case *schema.PKError:
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
resolvedResources := resolvers.ResolveResourcesChunk(ctx, w.logger, w.metrics, table, client, parent, chunks[i], w.caser)
for _, resolvedResource := range resolvedResources {
if err := resolvedResource.CalculateCQID(w.deterministicCQID); err != nil {
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error")
w.metrics.AddErrors(ctx, 1, selector)
return
case *schema.PKComponentError:
w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning")
}
}
select {
case resourcesChan <- resolvedResource:
case <-ctx.Done():
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
}
if err := resolvedResource.Validate(); err != nil {
switch err.(type) {
case *schema.PKError:
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
w.metrics.AddErrors(ctx, 1, selector)
return
case *schema.PKComponentError:
w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning")
}
}
select {
case resourcesChan <- resolvedResource:
case <-ctx.Done():
}
}
}()
}
Expand Down
41 changes: 29 additions & 12 deletions scheduler/resolvers/resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ func resolveColumn(ctx context.Context, logger zerolog.Logger, m *metrics.Metric
}
}

func ResolveSingleResource(ctx context.Context, logger zerolog.Logger, m *metrics.Metrics, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, item any, c *caser.Caser) *schema.Resource {
func ResolveResourcesChunk(ctx context.Context, logger zerolog.Logger, m *metrics.Metrics, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, chunk []any, c *caser.Caser) []*schema.Resource {
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()

resource := schema.NewResourceData(table, parent, item)
resources := make([]*schema.Resource, len(chunk))
for i, item := range chunk {
resources[i] = schema.NewResourceData(table, parent, item)
}
objectStartTime := time.Now()

clientID := client.ID()
Expand All @@ -60,25 +63,39 @@ func ResolveSingleResource(ctx context.Context, logger zerolog.Logger, m *metric
m.AddPanics(ctx, 1, selector)
}
}()
if table.PreResourceResolver != nil {
if err := table.PreResourceResolver(ctx, client, resource); err != nil {
tableLogger.Error().Err(err).Msg("pre resource resolver failed")

if table.PreResourceChunkResolver != nil {
if err := table.PreResourceChunkResolver.RowsResolver(ctx, client, resources); err != nil {
tableLogger.Error().Stack().Err(err).Msg("pre resource chunk resolver finished with error")
m.AddErrors(ctx, 1, selector)
return nil
}
}

for _, column := range table.Columns {
resolveColumn(ctx, tableLogger, m, selector, client, resource, column, c)
if table.PreResourceResolver != nil {
for _, resource := range resources {
if err := table.PreResourceResolver(ctx, client, resource); err != nil {
tableLogger.Error().Err(err).Msg("pre resource resolver failed")
m.AddErrors(ctx, 1, selector)
return nil
}
}
}
for _, resource := range resources {
for _, column := range table.Columns {
resolveColumn(ctx, tableLogger, m, selector, client, resource, column, c)
}
}

if table.PostResourceResolver != nil {
if err := table.PostResourceResolver(ctx, client, resource); err != nil {
tableLogger.Error().Stack().Err(err).Msg("post resource resolver finished with error")
m.AddErrors(ctx, 1, selector)
for _, resource := range resources {
if err := table.PostResourceResolver(ctx, client, resource); err != nil {
tableLogger.Error().Stack().Err(err).Msg("post resource resolver finished with error")
m.AddErrors(ctx, 1, selector)
}
}
}

m.AddResources(ctx, 1, selector)
return resource
m.AddResources(ctx, int64(len(resources)), selector)
return resources
}
51 changes: 28 additions & 23 deletions scheduler/scheduler_dfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
"github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -157,8 +158,11 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
go func() {
defer close(resourcesChan)
var wg sync.WaitGroup
for i := range resourcesSlice {
i := i
chunks := [][]any{resourcesSlice}
if table.PreResourceChunkResolver != nil {
chunks = lo.Chunk(resourcesSlice, table.PreResourceChunkResolver.ChunkSize)
}
for i := range chunks {
resourceConcurrencyKey := table.Name + "-" + client.ID() + "-" + "resource"
resourceSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(resourceConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleResourceMaxConcurrency))
resourceSem := resourceSemVal.(*semaphore.Weighted)
Expand All @@ -183,33 +187,34 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
defer resourceSem.Release(1)
defer s.scheduler.resourceSem.Release(1)
defer wg.Done()
//nolint:all
resolvedResource := resolvers.ResolveSingleResource(ctx, s.logger, s.metrics, table, client, parent, resourcesSlice[i], s.scheduler.caser)
if resolvedResource == nil {
resolvedResources := resolvers.ResolveResourcesChunk(ctx, s.logger, s.metrics, table, client, parent, chunks[i], s.scheduler.caser)
if len(resolvedResources) == 0 {
return
}

if err := resolvedResource.CalculateCQID(s.deterministicCQID); err != nil {
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error")
s.metrics.AddErrors(ctx, 1, selector)
return
}
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
}
if err := resolvedResource.Validate(); err != nil {
switch err.(type) {
case *schema.PKError:
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
for _, resolvedResource := range resolvedResources {
if err := resolvedResource.CalculateCQID(s.deterministicCQID); err != nil {
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error")
s.metrics.AddErrors(ctx, 1, selector)
return
case *schema.PKComponentError:
s.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning")
}
}
select {
case resourcesChan <- resolvedResource:
case <-ctx.Done():
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
}
if err := resolvedResource.Validate(); err != nil {
switch err.(type) {
case *schema.PKError:
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
s.metrics.AddErrors(ctx, 1, selector)
return
case *schema.PKComponentError:
s.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning")
}
}
select {
case resourcesChan <- resolvedResource:
case <-ctx.Done():
}
}
}()
}
Expand Down
8 changes: 8 additions & 0 deletions schema/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type TableResolver func(ctx context.Context, meta ClientMeta, parent *Resource,

type RowResolver func(ctx context.Context, meta ClientMeta, resource *Resource) error

type RowsChunkResolver struct {
ChunkSize int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a way of ensuring ChunkSize is always less than 100 as that is what BatchSender uses

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or make BatchSender batch limit configurable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter:

  1. If you pass a slice on the channel BatchSender sends the slice as is
  2. If you pass individual items and whatever batch BatchSender sends is smaller than ChunkSize, you'll have a single chunk with all the data

RowsResolver func(ctx context.Context, meta ClientMeta, resourcesChunk []*Resource) error
}

type Multiplexer func(meta ClientMeta) []ClientMeta

type Transform func(table *Table) error
Expand Down Expand Up @@ -86,6 +91,9 @@ type Table struct {
// PreResourceResolver is called before all columns are resolved but after Resource is created. The ordering of resolvers is:
// (Table) Resolver β†’ PreResourceResolver β†’ ColumnResolvers β†’ PostResourceResolver
PreResourceResolver RowResolver `json:"-"`

PreResourceChunkResolver *RowsChunkResolver `json:"-"`

// IsIncremental is a flag that indicates if the table is incremental or not. This flag mainly affects how the table is
// documented.
IsIncremental bool `json:"is_incremental"`
Expand Down