Skip to content

Commit 5558999

Browse files
committed
feat(bigquery): add maximumBytesBilled source config
1 parent 81699a3 commit 5558999

File tree

8 files changed

+96
-5
lines changed

8 files changed

+96
-5
lines changed

docs/en/resources/sources/bigquery.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ project: "my-project-id"
135135
# - "https://www.googleapis.com/auth/bigquery"
136136
# - "https://www.googleapis.com/auth/drive.readonly"
137137
# maxQueryResultRows: 50 # Optional: Limits the number of rows returned by queries. Defaults to 50.
138+
# maximumBytesBilled: 10737418240 # Optional: Per-query bytes scanned cap (in bytes).
138139
```
139140

140141
Initialize a BigQuery source that uses the client's access token:
@@ -155,6 +156,7 @@ useClientOAuth: true
155156
# - "https://www.googleapis.com/auth/bigquery"
156157
# - "https://www.googleapis.com/auth/drive.readonly"
157158
# maxQueryResultRows: 50 # Optional: Limits the number of rows returned by queries. Defaults to 50.
159+
# maximumBytesBilled: 10737418240 # Optional: Per-query bytes scanned cap (in bytes).
158160
```
159161

160162
## Reference
@@ -170,3 +172,4 @@ useClientOAuth: true
170172
| scopes | []string | false | A list of OAuth 2.0 scopes to use for the credentials. If not provided, default scopes are used. |
171173
| impersonateServiceAccount | string | false | Service account email to impersonate when making BigQuery and Dataplex API calls. The authenticated principal must have the `roles/iam.serviceAccountTokenCreator` role on the target service account. [Learn More](https://cloud.google.com/iam/docs/service-account-impersonation) |
172174
| maxQueryResultRows | int | false | The maximum number of rows to return from a query. Defaults to 50. |
175+
| maximumBytesBilled | int64 | false | The maximum bytes billed per query. When set, queries that exceed this limit fail before executing. |

internal/sources/bigquery/bigquery.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ type Config struct {
9090
ImpersonateServiceAccount string `yaml:"impersonateServiceAccount"`
9191
Scopes StringOrStringSlice `yaml:"scopes"`
9292
MaxQueryResultRows int `yaml:"maxQueryResultRows"`
93+
MaximumBytesBilled int64 `yaml:"maximumBytesBilled"`
9394
}
9495

9596
// StringOrStringSlice is a custom type that can unmarshal both a single string
@@ -156,6 +157,7 @@ func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.So
156157
RestService: restService,
157158
TokenSource: tokenSource,
158159
MaxQueryResultRows: r.MaxQueryResultRows,
160+
MaximumBytesBilled: r.MaximumBytesBilled,
159161
ClientCreator: clientCreator,
160162
}
161163

@@ -281,6 +283,7 @@ type Source struct {
281283
RestService *bigqueryrestapi.Service
282284
TokenSource oauth2.TokenSource
283285
MaxQueryResultRows int
286+
MaximumBytesBilled int64
284287
ClientCreator BigqueryClientCreator
285288
AllowedDatasets map[string]struct{}
286289
sessionMutex sync.Mutex
@@ -458,6 +461,10 @@ func (s *Source) GetMaxQueryResultRows() int {
458461
return s.MaxQueryResultRows
459462
}
460463

464+
func (s *Source) GetMaximumBytesBilled() int64 {
465+
return s.MaximumBytesBilled
466+
}
467+
461468
func (s *Source) BigQueryClientCreator() BigqueryClientCreator {
462469
return s.ClientCreator
463470
}
@@ -558,6 +565,9 @@ func (s *Source) RunSQL(ctx context.Context, bqClient *bigqueryapi.Client, state
558565
if connProps != nil {
559566
query.ConnectionProperties = connProps
560567
}
568+
if s.MaximumBytesBilled > 0 {
569+
query.MaxBytesBilled = s.MaximumBytesBilled
570+
}
561571

562572
// This block handles SELECT statements, which return a row set.
563573
// We iterate through the results, convert each row into a map of

internal/sources/bigquery/bigquery_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,26 @@ func TestParseFromYamlBigQuery(t *testing.T) {
177177
},
178178
},
179179
},
180+
{
181+
desc: "with maximum bytes billed example",
182+
in: `
183+
kind: sources
184+
name: my-instance
185+
type: bigquery
186+
project: my-project
187+
location: us
188+
maximumBytesBilled: 10737418240
189+
`,
190+
want: map[string]sources.SourceConfig{
191+
"my-instance": bigquery.Config{
192+
Name: "my-instance",
193+
Type: bigquery.SourceType,
194+
Project: "my-project",
195+
Location: "us",
196+
MaximumBytesBilled: 10737418240,
197+
},
198+
},
199+
},
180200
}
181201
for _, tc := range tcs {
182202
t.Run(tc.desc, func(t *testing.T) {
@@ -287,6 +307,59 @@ func TestInitialize_MaxQueryResultRows(t *testing.T) {
287307
}
288308
}
289309

310+
func TestInitialize_MaximumBytesBilled(t *testing.T) {
311+
ctx, err := testutils.ContextWithNewLogger()
312+
if err != nil {
313+
t.Fatalf("unexpected error: %s", err)
314+
}
315+
ctx = util.WithUserAgent(ctx, "test-agent")
316+
tracer := noop.NewTracerProvider().Tracer("")
317+
318+
tcs := []struct {
319+
desc string
320+
cfg bigquery.Config
321+
want int64
322+
}{
323+
{
324+
desc: "default value",
325+
cfg: bigquery.Config{
326+
Name: "test-default",
327+
Type: bigquery.SourceType,
328+
Project: "test-project",
329+
UseClientOAuth: true,
330+
},
331+
want: 0,
332+
},
333+
{
334+
desc: "configured value",
335+
cfg: bigquery.Config{
336+
Name: "test-configured",
337+
Type: bigquery.SourceType,
338+
Project: "test-project",
339+
UseClientOAuth: true,
340+
MaximumBytesBilled: 10737418240,
341+
},
342+
want: 10737418240,
343+
},
344+
}
345+
346+
for _, tc := range tcs {
347+
t.Run(tc.desc, func(t *testing.T) {
348+
src, err := tc.cfg.Initialize(ctx, tracer)
349+
if err != nil {
350+
t.Fatalf("Initialize failed: %v", err)
351+
}
352+
bqSrc, ok := src.(*bigquery.Source)
353+
if !ok {
354+
t.Fatalf("Expected *bigquery.Source, got %T", src)
355+
}
356+
if bqSrc.MaximumBytesBilled != tc.want {
357+
t.Errorf("MaximumBytesBilled = %d, want %d", bqSrc.MaximumBytesBilled, tc.want)
358+
}
359+
})
360+
}
361+
}
362+
290363
func TestNormalizeValue(t *testing.T) {
291364
tests := []struct {
292365
name string

internal/tools/bigquery/bigqueryanalyzecontribution/bigqueryanalyzecontribution.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
5252
type compatibleSource interface {
5353
BigQueryClient() *bigqueryapi.Client
5454
UseClientAuthorization() bool
55+
GetMaximumBytesBilled() int64
5556
IsDatasetAllowed(projectID, datasetID string) bool
5657
BigQueryAllowedDatasets() []string
5758
BigQuerySession() bigqueryds.BigQuerySessionProvider
@@ -216,7 +217,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
216217
{Key: "session_id", Value: session.ID},
217218
}
218219
}
219-
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, source.BigQueryClient().Project(), source.BigQueryClient().Location, inputData, nil, connProps)
220+
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, source.BigQueryClient().Project(), source.BigQueryClient().Location, inputData, nil, connProps, source.GetMaximumBytesBilled())
220221
if err != nil {
221222
return nil, util.ProcessGcpError(err)
222223
}

internal/tools/bigquery/bigquerycommon/util.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
)
2727

2828
// DryRunQuery performs a dry run of the SQL query to validate it and get metadata.
29-
func DryRunQuery(ctx context.Context, restService *bigqueryrestapi.Service, projectID string, location string, sql string, params []*bigqueryrestapi.QueryParameter, connProps []*bigqueryapi.ConnectionProperty) (*bigqueryrestapi.Job, error) {
29+
func DryRunQuery(ctx context.Context, restService *bigqueryrestapi.Service, projectID string, location string, sql string, params []*bigqueryrestapi.QueryParameter, connProps []*bigqueryapi.ConnectionProperty, maximumBytesBilled int64) (*bigqueryrestapi.Job, error) {
3030
useLegacySql := false
3131

3232
restConnProps := make([]*bigqueryrestapi.ConnectionProperty, len(connProps))
@@ -46,6 +46,7 @@ func DryRunQuery(ctx context.Context, restService *bigqueryrestapi.Service, proj
4646
UseLegacySql: &useLegacySql,
4747
ConnectionProperties: restConnProps,
4848
QueryParameters: params,
49+
MaximumBytesBilled: maximumBytesBilled,
4950
},
5051
},
5152
}

internal/tools/bigquery/bigqueryexecutesql/bigqueryexecutesql.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type compatibleSource interface {
5454
BigQuerySession() bigqueryds.BigQuerySessionProvider
5555
BigQueryWriteMode() string
5656
UseClientAuthorization() bool
57+
GetMaximumBytesBilled() int64
5758
IsDatasetAllowed(projectID, datasetID string) bool
5859
BigQueryAllowedDatasets() []string
5960
RetrieveClientAndService(tools.AccessToken) (*bigqueryapi.Client, *bigqueryrestapi.Service, error)
@@ -186,7 +187,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
186187
}
187188
}
188189

189-
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, bqClient.Project(), bqClient.Location, sql, nil, connProps)
190+
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, bqClient.Project(), bqClient.Location, sql, nil, connProps, source.GetMaximumBytesBilled())
190191
if err != nil {
191192
return nil, util.NewClientServerError("query validation failed", http.StatusInternalServerError, err)
192193
}

internal/tools/bigquery/bigqueryforecast/bigqueryforecast.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
5151
type compatibleSource interface {
5252
BigQueryClient() *bigqueryapi.Client
5353
UseClientAuthorization() bool
54+
GetMaximumBytesBilled() int64
5455
IsDatasetAllowed(projectID, datasetID string) bool
5556
BigQueryAllowedDatasets() []string
5657
BigQuerySession() bigqueryds.BigQuerySessionProvider
@@ -193,7 +194,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
193194
{Key: "session_id", Value: session.ID},
194195
}
195196
}
196-
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, source.BigQueryClient().Project(), source.BigQueryClient().Location, historyData, nil, connProps)
197+
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, source.BigQueryClient().Project(), source.BigQueryClient().Location, historyData, nil, connProps, source.GetMaximumBytesBilled())
197198
if err != nil {
198199
return nil, util.ProcessGcpError(err)
199200
}

internal/tools/bigquery/bigquerysql/bigquerysql.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
5252
type compatibleSource interface {
5353
BigQuerySession() bigqueryds.BigQuerySessionProvider
5454
UseClientAuthorization() bool
55+
GetMaximumBytesBilled() int64
5556
RetrieveClientAndService(tools.AccessToken) (*bigqueryapi.Client, *bigqueryrestapi.Service, error)
5657
RunSQL(context.Context, *bigqueryapi.Client, string, string, []bigqueryapi.QueryParameter, []*bigqueryapi.ConnectionProperty) (any, error)
5758
}
@@ -204,7 +205,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
204205
return nil, util.NewClientServerError("failed to retrieve BigQuery client", http.StatusInternalServerError, err)
205206
}
206207

207-
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, bqClient.Project(), bqClient.Location, newStatement, lowLevelParams, connProps)
208+
dryRunJob, err := bqutil.DryRunQuery(ctx, restService, bqClient.Project(), bqClient.Location, newStatement, lowLevelParams, connProps, source.GetMaximumBytesBilled())
208209
if err != nil {
209210
return nil, util.ProcessGcpError(err)
210211
}

0 commit comments

Comments
 (0)