Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 193 additions & 0 deletions go/bigquery_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ package bigquery
import (
"context"
"fmt"
"net/url"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -53,6 +55,7 @@ type databaseImpl struct {
tableID string
location string
quotaProject string
endpoint string
}

func (d *databaseImpl) Open(ctx context.Context) (adbc.Connection, error) {
Expand All @@ -71,6 +74,7 @@ func (d *databaseImpl) Open(ctx context.Context) (adbc.Connection, error) {
catalog: d.projectID,
dbSchema: d.datasetID,
location: d.location,
endpoint: d.endpoint,
resultRecordBufferSize: defaultQueryResultBufferSize,
prefetchConcurrency: defaultQueryPrefetchConcurrency,
quotaProject: d.quotaProject,
Expand Down Expand Up @@ -113,6 +117,8 @@ func (d *databaseImpl) GetOption(key string) (string, error) {
return d.datasetID, nil
case OptionStringTableID:
return d.tableID, nil
case OptionStringEndpoint:
return d.endpoint, nil
case OptionStringImpersonateLifetime:
if d.impersonateLifetime == 0 {
// If no lifetime is set but impersonation is enabled, return the default
Expand Down Expand Up @@ -145,6 +151,18 @@ func (d *databaseImpl) hasImpersonationOptions() bool {

func (d *databaseImpl) SetOption(key string, value string) error {
switch key {
case "uri":
params, err := ParseBigQueryURIToParams(value)
if err != nil {
return err
}

for paramKey, paramValue := range params {
if err := d.SetOption(paramKey, paramValue); err != nil {
return err
}
}
return nil
case OptionStringAuthType:
switch value {
case OptionValueAuthTypeDefault,
Expand Down Expand Up @@ -190,10 +208,185 @@ func (d *databaseImpl) SetOption(key string, value string) error {
d.datasetID = value
case OptionStringTableID:
d.tableID = value
case OptionStringEndpoint:
d.endpoint = value
case OptionStringLocation:
d.location = value
default:
return d.DatabaseImplBase.SetOption(key, value)
}
return nil
}

// ParseBigQueryURIToParams parses a BigQuery URI and returns the extracted parameters
func ParseBigQueryURIToParams(uri string) (map[string]string, error) {
if uri == "" {
return nil, adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: "[bq] URI cannot be empty",
}
}

parsedURI, err := url.Parse(uri)
if err != nil {
return nil, adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: fmt.Sprintf("[bq] invalid BigQuery URI format: %v", err),
}
}

if parsedURI.Scheme != "bigquery" {
return nil, adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: fmt.Sprintf("[bq] invalid BigQuery URI scheme: expected 'bigquery', got '%s'", parsedURI.Scheme),
}
}

projectID := strings.TrimPrefix(parsedURI.Path, "/")
if projectID == "" {
return nil, adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: "[bq] project ID is required in URI path",
}
}

params := make(map[string]string)
params[OptionStringProjectID] = projectID

// Handle host and port
var endpoint string
if parsedURI.Host != "" && parsedURI.Hostname() != "" {
// Custom endpoint specified with valid hostname
if parsedURI.Port() != "" {
endpoint = parsedURI.Host
} else {
endpoint = fmt.Sprintf("%s:443", parsedURI.Hostname())
}
} else if parsedURI.Host != "" && parsedURI.Hostname() == "" && parsedURI.Port() != "" {
// Port without hostname. use default host with custom port
endpoint = fmt.Sprintf("bigquery.googleapis.com:%s", parsedURI.Port())
} else {
// No host specified, use default BigQuery endpoint
endpoint = "bigquery.googleapis.com:443"
}

// Store endpoint as hostname:port (Google client library handles https:// internally)
params[OptionStringEndpoint] = endpoint

queryParams := parsedURI.Query()

oauthTypeStr := queryParams.Get("OAuthType")
if oauthTypeStr != "" {
oauthType, err := strconv.Atoi(oauthTypeStr)
if err != nil {
return nil, adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: fmt.Sprintf("[bq] invalid OAuthType value: %s", oauthTypeStr),
}
}

switch oauthType {
case 0:
params[OptionStringAuthType] = OptionValueAuthTypeAppDefaultCredentials
case 1:
params[OptionStringAuthType] = OptionValueAuthTypeJSONCredentialFile

authCredentials := queryParams.Get("AuthCredentials")
if authCredentials == "" {
return nil, adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: "[bq] AuthCredentials required for service account authentication",
}
}

decodedCreds, err := url.QueryUnescape(authCredentials)
if err != nil {
return nil, adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: fmt.Sprintf("[bq] invalid AuthCredentials format: %v", err),
}
}
params[OptionStringAuthCredentials] = decodedCreds
queryParams.Del("AuthCredentials")
case 2:
params[OptionStringAuthType] = OptionValueAuthTypeJSONCredentialString

authCredentials := queryParams.Get("AuthCredentials")
if authCredentials == "" {
return nil, adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: "[bq] AuthCredentials required for service account authentication",
}
}

decodedCreds, err := url.QueryUnescape(authCredentials)
if err != nil {
return nil, adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: fmt.Sprintf("[bq] invalid AuthCredentials format: %v", err),
}
}
params[OptionStringAuthCredentials] = decodedCreds
queryParams.Del("AuthCredentials")
case 3:
params[OptionStringAuthType] = OptionValueAuthTypeUserAuthentication

clientID := queryParams.Get("AuthClientId")
clientSecret := queryParams.Get("AuthClientSecret")
refreshToken := queryParams.Get("AuthRefreshToken")
if clientID == "" || clientSecret == "" || refreshToken == "" {
return nil, adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: "[bq] AuthClientId, AuthClientSecret and AuthRefreshToken required for OAuth authentication",
}
}
params[OptionStringAuthClientID] = clientID
params[OptionStringAuthClientSecret] = clientSecret
params[OptionStringAuthRefreshToken] = refreshToken
queryParams.Del("AuthClientId")
queryParams.Del("AuthClientSecret")
queryParams.Del("AuthRefreshToken")
default:
return nil, adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: fmt.Sprintf("[bq] invalid OAuthType value: %d", oauthType),
}
}
queryParams.Del("OAuthType")
} else {
// if not provided default to ADC
params[OptionStringAuthType] = OptionValueAuthTypeAppDefaultCredentials
}

parameterMap := map[string]string{
"DatasetId": OptionStringDatasetID,
"Location": OptionStringLocation,
"TableId": OptionStringTableID,
"QuotaProject": OptionStringAuthQuotaProject,

// Auth parameters - processed in OAuthType switch above, here for consistency
"AuthCredentials": OptionStringAuthCredentials,
"AuthClientId": OptionStringAuthClientID,
"AuthClientSecret": OptionStringAuthClientSecret,
"AuthRefreshToken": OptionStringAuthRefreshToken,

"ImpersonateTargetPrincipal": OptionStringImpersonateTargetPrincipal,
"ImpersonateDelegates": OptionStringImpersonateDelegates,
"ImpersonateScopes": OptionStringImpersonateScopes,
"ImpersonateLifetime": OptionStringImpersonateLifetime,
}

// Process all query parameters to convert URI params to option constants
for paramName, paramValues := range queryParams {
if optionName, exists := parameterMap[paramName]; exists {
params[optionName] = paramValues[0]
} else {
return nil, adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: fmt.Sprintf("[bq] unknown parameter '%s' in URI", paramName),
}
}
}

return params, nil
}
11 changes: 10 additions & 1 deletion go/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type connectionImpl struct {
dbSchema string
// tableID is the default table for statement
tableID string
// endpoint is the custom BigQuery API endpoint
endpoint string

sessionID *string

Expand Down Expand Up @@ -730,7 +732,13 @@ func (c *connectionImpl) newClient(ctx context.Context) error {
authOptions = []option.ClientOption{option.WithTokenSource(tokenSource)}
}

client, err := bigquery.NewClient(ctx, c.catalog, authOptions...)
// Add custom endpoint if specified for BigQuery API client
bigQueryAuthOptions := authOptions
if c.endpoint != "" {
bigQueryAuthOptions = append(bigQueryAuthOptions, option.WithEndpoint(c.endpoint))
}

client, err := bigquery.NewClient(ctx, c.catalog, bigQueryAuthOptions...)
if err != nil {
return errToAdbcErr(adbc.StatusIO, err, "create client")
}
Expand All @@ -739,6 +747,7 @@ func (c *connectionImpl) newClient(ctx context.Context) error {
client.Location = c.location
}

// Use original authOptions without custom endpoint for Storage Read API
err = client.EnableStorageReadClient(ctx, authOptions...)
if err != nil {
return errToAdbcErr(adbc.StatusIO, err, "enable storage read client")
Expand Down
1 change: 1 addition & 0 deletions go/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
OptionStringProjectID = "adbc.bigquery.sql.project_id"
OptionStringDatasetID = "adbc.bigquery.sql.dataset_id"
OptionStringTableID = "adbc.bigquery.sql.table_id"
OptionStringEndpoint = "adbc.bigquery.sql.endpoint"

OptionValueAuthTypeDefault = "adbc.bigquery.sql.auth_type.auth_bigquery"

Expand Down
Loading
Loading