diff --git a/.gitignore b/.gitignore index be0c7399..fb9f3b28 100644 --- a/.gitignore +++ b/.gitignore @@ -65,3 +65,5 @@ _tmp* .vscode/ __debug_bin .DS_Store + +.claude/ diff --git a/connector_test.go b/connector_test.go index 57554b98..8b61e8bb 100644 --- a/connector_test.go +++ b/connector_test.go @@ -50,23 +50,24 @@ func TestNewConnector(t *testing.T) { CloudFetchSpeedThresholdMbps: 0.1, } expectedUserConfig := config.UserConfig{ - Host: host, - Port: port, - Protocol: "https", - AccessToken: accessToken, - Authenticator: &pat.PATAuth{AccessToken: accessToken}, - HTTPPath: "/" + httpPath, - MaxRows: maxRows, - QueryTimeout: timeout, - Catalog: catalog, - Schema: schema, - UserAgentEntry: userAgentEntry, - SessionParams: sessionParams, - RetryMax: 10, - RetryWaitMin: 3 * time.Second, - RetryWaitMax: 60 * time.Second, - Transport: roundTripper, - CloudFetchConfig: expectedCloudFetchConfig, + Host: host, + Port: port, + Protocol: "https", + AccessToken: accessToken, + Authenticator: &pat.PATAuth{AccessToken: accessToken}, + HTTPPath: "/" + httpPath, + MaxRows: maxRows, + QueryTimeout: timeout, + Catalog: catalog, + Schema: schema, + UserAgentEntry: userAgentEntry, + SessionParams: sessionParams, + RetryMax: 10, + RetryWaitMin: 3 * time.Second, + RetryWaitMax: 60 * time.Second, + Transport: roundTripper, + ExecutionProtocol: "thrift", + CloudFetchConfig: expectedCloudFetchConfig, } expectedCfg := config.WithDefaults() expectedCfg.DriverVersion = DriverVersion @@ -97,18 +98,19 @@ func TestNewConnector(t *testing.T) { CloudFetchSpeedThresholdMbps: 0.1, } expectedUserConfig := config.UserConfig{ - Host: host, - Port: port, - Protocol: "https", - AccessToken: accessToken, - Authenticator: &pat.PATAuth{AccessToken: accessToken}, - HTTPPath: "/" + httpPath, - MaxRows: maxRows, - SessionParams: sessionParams, - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: expectedCloudFetchConfig, + Host: host, + Port: port, + Protocol: "https", + AccessToken: accessToken, + Authenticator: &pat.PATAuth{AccessToken: accessToken}, + HTTPPath: "/" + httpPath, + MaxRows: maxRows, + SessionParams: sessionParams, + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: expectedCloudFetchConfig, } expectedCfg := config.WithDefaults() expectedCfg.UserConfig = expectedUserConfig @@ -139,18 +141,19 @@ func TestNewConnector(t *testing.T) { CloudFetchSpeedThresholdMbps: 0.1, } expectedUserConfig := config.UserConfig{ - Host: host, - Port: port, - Protocol: "https", - AccessToken: accessToken, - Authenticator: &pat.PATAuth{AccessToken: accessToken}, - HTTPPath: "/" + httpPath, - MaxRows: maxRows, - SessionParams: sessionParams, - RetryMax: -1, - RetryWaitMin: 0, - RetryWaitMax: 0, - CloudFetchConfig: expectedCloudFetchConfig, + Host: host, + Port: port, + Protocol: "https", + AccessToken: accessToken, + Authenticator: &pat.PATAuth{AccessToken: accessToken}, + HTTPPath: "/" + httpPath, + MaxRows: maxRows, + SessionParams: sessionParams, + RetryMax: -1, + RetryWaitMin: 0, + RetryWaitMax: 0, + ExecutionProtocol: "thrift", + CloudFetchConfig: expectedCloudFetchConfig, } expectedCfg := config.WithDefaults() expectedCfg.DriverVersion = DriverVersion diff --git a/internal/config/config.go b/internal/config/config.go index 67437a9c..3414395a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -30,6 +30,8 @@ type Config struct { TLSConfig *tls.Config // nil disables TLS ArrowConfig PollInterval time.Duration + MaxPollInterval time.Duration // maximum polling interval for exponential backoff + PollBackoffMultiplier float64 // multiplier for exponential backoff ClientTimeout time.Duration // max time the http request can last PingTimeout time.Duration // max time allowed for ping CanUseMultipleCatalogs bool @@ -68,6 +70,8 @@ func (c *Config) DeepCopy() *Config { TLSConfig: c.TLSConfig.Clone(), ArrowConfig: c.ArrowConfig.DeepCopy(), PollInterval: c.PollInterval, + MaxPollInterval: c.MaxPollInterval, + PollBackoffMultiplier: c.PollBackoffMultiplier, ClientTimeout: c.ClientTimeout, PingTimeout: c.PingTimeout, CanUseMultipleCatalogs: c.CanUseMultipleCatalogs, @@ -101,6 +105,8 @@ type UserConfig struct { Transport http.RoundTripper UseLz4Compression bool EnableMetricViewMetadata bool + ExecutionProtocol string // "thrift" (default) or "rest" + WarehouseID string // required when ExecutionProtocol is "rest" CloudFetchConfig } @@ -143,6 +149,8 @@ func (ucfg UserConfig) DeepCopy() UserConfig { Transport: ucfg.Transport, UseLz4Compression: ucfg.UseLz4Compression, EnableMetricViewMetadata: ucfg.EnableMetricViewMetadata, + ExecutionProtocol: ucfg.ExecutionProtocol, + WarehouseID: ucfg.WarehouseID, CloudFetchConfig: ucfg.CloudFetchConfig, } } @@ -176,6 +184,9 @@ func (ucfg UserConfig) WithDefaults() UserConfig { if ucfg.RetryWaitMax == 0 { ucfg.RetryWaitMax = 30 * time.Second } + if ucfg.ExecutionProtocol == "" { + ucfg.ExecutionProtocol = "thrift" + } ucfg.UseLz4Compression = false ucfg.CloudFetchConfig = CloudFetchConfig{}.WithDefaults() @@ -189,6 +200,8 @@ func WithDefaults() *Config { TLSConfig: &tls.Config{MinVersion: tls.VersionTLS12}, ArrowConfig: ArrowConfig{}.WithDefaults(), PollInterval: 1 * time.Second, + MaxPollInterval: 60 * time.Second, + PollBackoffMultiplier: 2.0, ClientTimeout: 900 * time.Second, PingTimeout: 60 * time.Second, CanUseMultipleCatalogs: true, @@ -282,6 +295,20 @@ func ParseDSN(dsn string) (UserConfig, error) { ucfg.EnableMetricViewMetadata = enableMetricViewMetadata } + // Execution protocol parameter (thrift or rest) + if protocol, ok := params.extract("protocol"); ok { + ucfg.ExecutionProtocol = protocol + } else if protocol, ok := params.extract("executionProtocol"); ok { + ucfg.ExecutionProtocol = protocol + } + + // Warehouse ID parameter (required for REST protocol) + if warehouseID, ok := params.extract("warehouse_id"); ok { + ucfg.WarehouseID = warehouseID + } else if warehouseID, ok := params.extract("warehouseId"); ok { + ucfg.WarehouseID = warehouseID + } + // for timezone we do a case insensitive key match. // We use getNoCase because we want to leave timezone in the params so that it will also // be used as a session param. @@ -298,6 +325,12 @@ func ParseDSN(dsn string) (UserConfig, error) { ucfg.SessionParams = sessionParams } + // Validate that warehouse_id is provided when using REST protocol + if ucfg.ExecutionProtocol == "rest" && ucfg.WarehouseID == "" { + return UserConfig{}, dbsqlerrint.NewRequestError(context.TODO(), dbsqlerr.ErrInvalidDSNFormat, + errors.New("warehouse_id is required when using REST protocol (protocol=rest)")) + } + return ucfg, err } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index c2821c78..b74db623 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -33,18 +33,19 @@ func TestParseConfig(t *testing.T) { name: "base case", args: args{dsn: "token:supersecret@example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 443, - MaxRows: defaultMaxRows, - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - AccessToken: "supersecret", - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 443, + MaxRows: defaultMaxRows, + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + AccessToken: "supersecret", + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -53,18 +54,19 @@ func TestParseConfig(t *testing.T) { name: "with https scheme", args: args{dsn: "https://token:supersecret@example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 443, - MaxRows: defaultMaxRows, - AccessToken: "supersecret", - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 443, + MaxRows: defaultMaxRows, + AccessToken: "supersecret", + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -73,17 +75,18 @@ func TestParseConfig(t *testing.T) { name: "with http scheme", args: args{dsn: "http://localhost:8080/sql/1.0/endpoints/12346a5b5b0e123a"}, wantCfg: UserConfig{ - Protocol: "http", - Host: "localhost", - Port: 8080, - MaxRows: defaultMaxRows, - Authenticator: &noop.NoopAuth{}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "http", + Host: "localhost", + Port: 8080, + MaxRows: defaultMaxRows, + Authenticator: &noop.NoopAuth{}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: defCloudConfig, }, wantErr: false, wantURL: "http://localhost:8080/sql/1.0/endpoints/12346a5b5b0e123a", @@ -92,16 +95,17 @@ func TestParseConfig(t *testing.T) { name: "with localhost", args: args{dsn: "http://localhost:8080"}, wantCfg: UserConfig{ - Protocol: "http", - Host: "localhost", - Port: 8080, - Authenticator: &noop.NoopAuth{}, - MaxRows: defaultMaxRows, - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "http", + Host: "localhost", + Port: 8080, + Authenticator: &noop.NoopAuth{}, + MaxRows: defaultMaxRows, + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: defCloudConfig, }, wantErr: false, wantURL: "http://localhost:8080", @@ -110,19 +114,20 @@ func TestParseConfig(t *testing.T) { name: "with query params", args: args{dsn: "token:supersecret@example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a?timeout=100&maxRows=1000"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - AccessToken: "supersecret", - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - QueryTimeout: 100 * time.Second, - MaxRows: 1000, - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + AccessToken: "supersecret", + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + QueryTimeout: 100 * time.Second, + MaxRows: 1000, + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -131,20 +136,21 @@ func TestParseConfig(t *testing.T) { name: "with query params and session params", args: args{dsn: "token:supersecret@example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a?timeout=100&maxRows=1000&timezone=America/Vancouver&QUERY_TAGS=team:testing,driver:go"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - AccessToken: "supersecret", - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - QueryTimeout: 100 * time.Second, - MaxRows: 1000, - Location: tz, - SessionParams: map[string]string{"timezone": "America/Vancouver", "QUERY_TAGS": "team:testing,driver:go"}, - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + AccessToken: "supersecret", + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + QueryTimeout: 100 * time.Second, + MaxRows: 1000, + Location: tz, + SessionParams: map[string]string{"timezone": "America/Vancouver", "QUERY_TAGS": "team:testing,driver:go"}, + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -153,17 +159,18 @@ func TestParseConfig(t *testing.T) { name: "bare", args: args{dsn: "example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Authenticator: &noop.NoopAuth{}, - Port: 8000, - MaxRows: defaultMaxRows, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Authenticator: &noop.NoopAuth{}, + Port: 8000, + MaxRows: defaultMaxRows, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -172,19 +179,20 @@ func TestParseConfig(t *testing.T) { name: "with catalog", args: args{dsn: "token:supersecret@example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123b?catalog=default"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - MaxRows: defaultMaxRows, - AccessToken: "supersecret", - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123b", - Catalog: "default", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + MaxRows: defaultMaxRows, + AccessToken: "supersecret", + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123b", + Catalog: "default", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123b", wantErr: false, @@ -193,19 +201,20 @@ func TestParseConfig(t *testing.T) { name: "with user agent entry", args: args{dsn: "token:supersecret@example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123b?userAgentEntry=partner-name"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - MaxRows: defaultMaxRows, - AccessToken: "supersecret", - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123b", - UserAgentEntry: "partner-name", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + MaxRows: defaultMaxRows, + AccessToken: "supersecret", + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123b", + UserAgentEntry: "partner-name", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123b", wantErr: false, @@ -214,19 +223,20 @@ func TestParseConfig(t *testing.T) { name: "with schema", args: args{dsn: "token:supersecret2@example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a?schema=system"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - MaxRows: defaultMaxRows, - AccessToken: "supersecret2", - Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - Schema: "system", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + MaxRows: defaultMaxRows, + AccessToken: "supersecret2", + Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + Schema: "system", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -235,17 +245,18 @@ func TestParseConfig(t *testing.T) { name: "with useCloudFetch", args: args{dsn: "token:supersecret@example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123b?useCloudFetch=true"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - MaxRows: defaultMaxRows, - AccessToken: "supersecret", - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123b", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + MaxRows: defaultMaxRows, + AccessToken: "supersecret", + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123b", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", CloudFetchConfig: CloudFetchConfig{ UseCloudFetch: true, MaxDownloadThreads: 10, @@ -260,17 +271,18 @@ func TestParseConfig(t *testing.T) { name: "with useCloudFetch and maxDownloadThreads", args: args{dsn: "token:supersecret@example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123b?useCloudFetch=true&maxDownloadThreads=15"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - MaxRows: defaultMaxRows, - AccessToken: "supersecret", - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123b", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + MaxRows: defaultMaxRows, + AccessToken: "supersecret", + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123b", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", CloudFetchConfig: CloudFetchConfig{ UseCloudFetch: true, MaxDownloadThreads: 15, @@ -285,21 +297,22 @@ func TestParseConfig(t *testing.T) { name: "with everything", args: args{dsn: "token:supersecret2@example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a?catalog=default&schema=system&userAgentEntry=partner-name&timeout=100&maxRows=1000&ANSI_MODE=true&useCloudFetch=true&maxDownloadThreads=15"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - AccessToken: "supersecret2", - Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - QueryTimeout: 100 * time.Second, - MaxRows: 1000, - UserAgentEntry: "partner-name", - Catalog: "default", - Schema: "system", - SessionParams: map[string]string{"ANSI_MODE": "true"}, - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + AccessToken: "supersecret2", + Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + QueryTimeout: 100 * time.Second, + MaxRows: 1000, + UserAgentEntry: "partner-name", + Catalog: "default", + Schema: "system", + SessionParams: map[string]string{"ANSI_MODE": "true"}, + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", CloudFetchConfig: CloudFetchConfig{ UseCloudFetch: true, MaxDownloadThreads: 15, @@ -314,17 +327,18 @@ func TestParseConfig(t *testing.T) { name: "missing http path", args: args{dsn: "token:supersecret@example.cloud.databricks.com:443"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 443, - MaxRows: defaultMaxRows, - AccessToken: "supersecret", - Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 443, + MaxRows: defaultMaxRows, + AccessToken: "supersecret", + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:443", wantErr: false, @@ -334,20 +348,21 @@ func TestParseConfig(t *testing.T) { name: "missing http path 2", args: args{dsn: "token:supersecret2@example.cloud.databricks.com:443?catalog=default&schema=system&timeout=100&maxRows=1000"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 443, - AccessToken: "supersecret2", - Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, - QueryTimeout: 100 * time.Second, - MaxRows: 1000, - Catalog: "default", - Schema: "system", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 443, + AccessToken: "supersecret2", + Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, + QueryTimeout: 100 * time.Second, + MaxRows: 1000, + Catalog: "default", + Schema: "system", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:443", wantErr: false, @@ -393,19 +408,20 @@ func TestParseConfig(t *testing.T) { name: "missing host", args: args{dsn: "token:supersecret2@:443?catalog=default&schema=system&timeout=100&maxRows=1000"}, wantCfg: UserConfig{ - Port: 443, - Protocol: "https", - AccessToken: "supersecret2", - Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, - MaxRows: 1000, - QueryTimeout: 100 * time.Second, - Catalog: "default", - Schema: "system", - SessionParams: make(map[string]string), - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Port: 443, + Protocol: "https", + AccessToken: "supersecret2", + Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, + MaxRows: 1000, + QueryTimeout: 100 * time.Second, + Catalog: "default", + Schema: "system", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: defCloudConfig, }, wantURL: "https://:443", wantErr: false, @@ -415,22 +431,23 @@ func TestParseConfig(t *testing.T) { name: "with accessToken param", args: args{dsn: "example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a?catalog=default&schema=system&userAgentEntry=partner-name&timeout=100&maxRows=1000&ANSI_MODE=true&accessToken=supersecret2"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - AccessToken: "supersecret2", - Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - QueryTimeout: 100 * time.Second, - MaxRows: 1000, - UserAgentEntry: "partner-name", - Catalog: "default", - Schema: "system", - SessionParams: map[string]string{"ANSI_MODE": "true"}, - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + AccessToken: "supersecret2", + Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + QueryTimeout: 100 * time.Second, + MaxRows: 1000, + UserAgentEntry: "partner-name", + Catalog: "default", + Schema: "system", + SessionParams: map[string]string{"ANSI_MODE": "true"}, + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -439,22 +456,23 @@ func TestParseConfig(t *testing.T) { name: "with accessToken param and client id/secret params", args: args{dsn: "example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a?catalog=default&schema=system&userAgentEntry=partner-name&timeout=100&maxRows=1000&ANSI_MODE=true&accessToken=supersecret2&clientId=client_id&clientSecret=client_secret"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - AccessToken: "supersecret2", - Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - QueryTimeout: 100 * time.Second, - MaxRows: 1000, - UserAgentEntry: "partner-name", - Catalog: "default", - Schema: "system", - SessionParams: map[string]string{"ANSI_MODE": "true"}, - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + AccessToken: "supersecret2", + Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + QueryTimeout: 100 * time.Second, + MaxRows: 1000, + UserAgentEntry: "partner-name", + Catalog: "default", + Schema: "system", + SessionParams: map[string]string{"ANSI_MODE": "true"}, + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -463,22 +481,23 @@ func TestParseConfig(t *testing.T) { name: "authType unknown with accessTokenParam", args: args{dsn: "example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a?authType=unknown&catalog=default&schema=system&userAgentEntry=partner-name&timeout=100&maxRows=1000&ANSI_MODE=true&accessToken=supersecret2&clientId=client_id&clientSecret=client_secret"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - AccessToken: "supersecret2", - Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - QueryTimeout: 100 * time.Second, - MaxRows: 1000, - UserAgentEntry: "partner-name", - Catalog: "default", - Schema: "system", - SessionParams: map[string]string{"ANSI_MODE": "true"}, - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + AccessToken: "supersecret2", + Authenticator: &pat.PATAuth{AccessToken: "supersecret2"}, + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + QueryTimeout: 100 * time.Second, + MaxRows: 1000, + UserAgentEntry: "partner-name", + Catalog: "default", + Schema: "system", + SessionParams: map[string]string{"ANSI_MODE": "true"}, + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -517,21 +536,22 @@ func TestParseConfig(t *testing.T) { name: "authType unknown with client id/secret", args: args{dsn: "example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a?authType=unknown&clientId=client_id&clientSecret=client_secret&catalog=default&schema=system&userAgentEntry=partner-name&timeout=100&maxRows=1000&ANSI_MODE=true"}, wantCfg: UserConfig{ - Protocol: "https", - Host: "example.cloud.databricks.com", - Port: 8000, - Authenticator: m2m.NewAuthenticator("client_id", "client_secret", "example.cloud.databricks.com"), - HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", - QueryTimeout: 100 * time.Second, - MaxRows: 1000, - UserAgentEntry: "partner-name", - Catalog: "default", - Schema: "system", - SessionParams: map[string]string{"ANSI_MODE": "true"}, - RetryMax: 4, - RetryWaitMin: 1 * time.Second, - RetryWaitMax: 30 * time.Second, - CloudFetchConfig: defCloudConfig, + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 8000, + Authenticator: m2m.NewAuthenticator("client_id", "client_secret", "example.cloud.databricks.com"), + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + QueryTimeout: 100 * time.Second, + MaxRows: 1000, + UserAgentEntry: "partner-name", + Catalog: "default", + Schema: "system", + SessionParams: map[string]string{"ANSI_MODE": "true"}, + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: defCloudConfig, }, wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a", wantErr: false, @@ -542,6 +562,142 @@ func TestParseConfig(t *testing.T) { wantCfg: UserConfig{}, wantErr: true, }, + { + name: "with protocol=thrift", + args: args{dsn: "token:supersecret@example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a?protocol=thrift"}, + wantCfg: UserConfig{ + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 443, + MaxRows: defaultMaxRows, + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + AccessToken: "supersecret", + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: defCloudConfig, + }, + wantURL: "https://example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a", + wantErr: false, + }, + { + name: "with protocol=rest and warehouse_id", + args: args{dsn: "token:supersecret@example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a?protocol=rest&warehouse_id=abc123def456"}, + wantCfg: UserConfig{ + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 443, + MaxRows: defaultMaxRows, + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + AccessToken: "supersecret", + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "rest", + WarehouseID: "abc123def456", + CloudFetchConfig: defCloudConfig, + }, + wantURL: "https://example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a", + wantErr: false, + }, + { + name: "with protocol=rest and warehouseId (camelCase)", + args: args{dsn: "token:supersecret@example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a?protocol=rest&warehouseId=xyz789abc"}, + wantCfg: UserConfig{ + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 443, + MaxRows: defaultMaxRows, + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + AccessToken: "supersecret", + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "rest", + WarehouseID: "xyz789abc", + CloudFetchConfig: defCloudConfig, + }, + wantURL: "https://example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a", + wantErr: false, + }, + { + name: "with executionProtocol (alternative param name)", + args: args{dsn: "token:supersecret@example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a?executionProtocol=rest&warehouse_id=test123"}, + wantCfg: UserConfig{ + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 443, + MaxRows: defaultMaxRows, + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + AccessToken: "supersecret", + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "rest", + WarehouseID: "test123", + CloudFetchConfig: defCloudConfig, + }, + wantURL: "https://example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a", + wantErr: false, + }, + { + name: "with protocol=rest but no warehouse_id (should fail)", + args: args{dsn: "token:supersecret@example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a?protocol=rest"}, + wantCfg: UserConfig{}, + wantErr: true, + }, + { + name: "with warehouse_id but thrift protocol (should succeed)", + args: args{dsn: "token:supersecret@example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a?protocol=thrift&warehouse_id=abc123"}, + wantCfg: UserConfig{ + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 443, + MaxRows: defaultMaxRows, + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + AccessToken: "supersecret", + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + WarehouseID: "abc123", + CloudFetchConfig: defCloudConfig, + }, + wantURL: "https://example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a", + wantErr: false, + }, + { + name: "default protocol is thrift (backward compatibility)", + args: args{dsn: "token:supersecret@example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a"}, + wantCfg: UserConfig{ + Protocol: "https", + Host: "example.cloud.databricks.com", + Port: 443, + MaxRows: defaultMaxRows, + Authenticator: &pat.PATAuth{AccessToken: "supersecret"}, + AccessToken: "supersecret", + HTTPPath: "/sql/1.0/endpoints/12346a5b5b0e123a", + SessionParams: make(map[string]string), + RetryMax: 4, + RetryWaitMin: 1 * time.Second, + RetryWaitMax: 30 * time.Second, + ExecutionProtocol: "thrift", + CloudFetchConfig: defCloudConfig, + }, + wantURL: "https://example.cloud.databricks.com:443/sql/1.0/endpoints/12346a5b5b0e123a", + wantErr: false, + }, } for i, tt := range tests { fmt.Println(i) @@ -644,6 +800,8 @@ func TestConfig_DeepCopy(t *testing.T) { UserConfig: UserConfig{}.WithDefaults(), TLSConfig: &tls.Config{MinVersion: tls.VersionTLS12}, PollInterval: 1 * time.Second, + MaxPollInterval: 60 * time.Second, + PollBackoffMultiplier: 2.0, ClientTimeout: 900 * time.Second, PingTimeout: 15 * time.Second, CanUseMultipleCatalogs: true, @@ -661,3 +819,27 @@ func TestConfig_DeepCopy(t *testing.T) { } }) } + +func TestConfig_WithDefaults(t *testing.T) { + t.Run("default values for polling configuration", func(t *testing.T) { + cfg := WithDefaults() + + if cfg.PollInterval != 1*time.Second { + t.Errorf("PollInterval = %v, want %v", cfg.PollInterval, 1*time.Second) + } + if cfg.MaxPollInterval != 60*time.Second { + t.Errorf("MaxPollInterval = %v, want %v", cfg.MaxPollInterval, 60*time.Second) + } + if cfg.PollBackoffMultiplier != 2.0 { + t.Errorf("PollBackoffMultiplier = %v, want %v", cfg.PollBackoffMultiplier, 2.0) + } + }) + + t.Run("default execution protocol is thrift", func(t *testing.T) { + cfg := WithDefaults() + + if cfg.ExecutionProtocol != "thrift" { + t.Errorf("ExecutionProtocol = %v, want %v", cfg.ExecutionProtocol, "thrift") + } + }) +}