Skip to content

Commit 73934fc

Browse files
End2end (#53)
- Added some good end 2 end tests with way to read server responses from json files. One can now just create an example, set the `recordResults` flag in client, and all server responses will be saved to JSON. - Fix a few bugs regarding maxRows (can't be zero) and timezone. - Added DSN parsing of userAgentEntry - Adds some docs - Improved a few logs - Added a very comprehensive example under example/workflow Signed-off-by: Andre Furlan <[email protected]>
1 parent 2292c9f commit 73934fc

32 files changed

+2461
-513
lines changed

connection.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (c *conn) IsValid() bool {
8989
// Statement ExecContext is the same as connection ExecContext
9090
func (c *conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
9191
log := logger.WithContext(c.id, driverctx.CorrelationIdFromContext(ctx), "")
92-
defer log.Duration(logger.Track("ExecContext"))
92+
msg, start := logger.Track("ExecContext")
9393
ctx = driverctx.NewContextWithConnId(ctx, c.id)
9494
if len(args) > 0 {
9595
return nil, errors.New(ErrParametersNotSupported)
@@ -99,6 +99,7 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
9999
if exStmtResp != nil && exStmtResp.OperationHandle != nil {
100100
log = logger.WithContext(c.id, driverctx.CorrelationIdFromContext(ctx), client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID))
101101
}
102+
defer log.Duration(msg, start)
102103

103104
if err != nil {
104105
log.Err(err).Msgf("databricks: failed to execute query: query %s", query)
@@ -243,13 +244,15 @@ func logBadQueryState(log *logger.DBSQLLogger, opStatus *cli_service.TGetOperati
243244
}
244245

245246
func (c *conn) executeStatement(ctx context.Context, query string, args []driver.NamedValue) (*cli_service.TExecuteStatementResp, error) {
247+
corrId := driverctx.CorrelationIdFromContext(ctx)
248+
log := logger.WithContext(c.id, corrId, "")
246249
sentinel := sentinel.Sentinel{
247250
OnDoneFn: func(statusResp any) (any, error) {
248251
req := cli_service.TExecuteStatementReq{
249252
SessionHandle: c.session.SessionHandle,
250253
Statement: query,
251254
RunAsync: c.cfg.RunAsync,
252-
QueryTimeout: int64(c.cfg.QueryTimeoutSeconds),
255+
QueryTimeout: int64(c.cfg.QueryTimeout / time.Second),
253256
// this is specific for databricks. It shortcuts server roundtrips
254257
GetDirectResults: &cli_service.TSparkGetDirectResults{
255258
MaxRows: int64(c.cfg.MaxRows),
@@ -262,8 +265,12 @@ func (c *conn) executeStatement(ctx context.Context, query string, args []driver
262265
resp, err := c.client.ExecuteStatement(ctx, &req)
263266
return resp, wrapErr(err, "failed to execute statement")
264267
},
268+
OnCancelFn: func() (any, error) {
269+
log.Warn().Msg("databricks: execute statement canceled while creation operation")
270+
return nil, nil
271+
},
265272
}
266-
_, res, err := sentinel.Watch(ctx, c.cfg.PollInterval, 0)
273+
_, res, err := sentinel.Watch(ctx, c.cfg.PollInterval, c.cfg.QueryTimeout)
267274
if err != nil {
268275
return nil, err
269276
}

connector.go

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"database/sql/driver"
66
"fmt"
7+
"strings"
8+
"time"
79

810
"github.com/databricks/databricks-sql-go/driverctx"
911
"github.com/databricks/databricks-sql-go/internal/cli_service"
@@ -48,7 +50,7 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
4850
},
4951
}
5052
// default timeout in here in addition to potential context timeout
51-
_, res, err := sentinel.Watch(ctx, c.cfg.PollInterval, c.cfg.DefaultTimeout)
53+
_, res, err := sentinel.Watch(ctx, c.cfg.PollInterval, c.cfg.ConnectTimeout)
5254
if err != nil {
5355
return nil, wrapErrf(err, "error connecting: host=%s port=%d, httpPath=%s", c.cfg.Host, c.cfg.Port, c.cfg.HTTPPath)
5456
}
@@ -86,6 +88,8 @@ var _ driver.Connector = (*connector)(nil)
8688

8789
type connOption func(*config.Config)
8890

91+
// NewConnector creates a connection that can be used with sql.OpenDB().
92+
// This is an easier way to set up the DB instead of having to construct a DSN string.
8993
func NewConnector(options ...connOption) (driver.Connector, error) {
9094
// config with default options
9195
cfg := config.WithDefaults()
@@ -98,30 +102,38 @@ func NewConnector(options ...connOption) (driver.Connector, error) {
98102
return &connector{cfg}, nil
99103
}
100104

105+
// WithServerHostname sets up the server hostname. Mandatory.
101106
func WithServerHostname(host string) connOption {
102107
return func(c *config.Config) {
108+
if host == "localhost" {
109+
c.Protocol = "http"
110+
}
103111
c.Host = host
104112
}
105113
}
106114

115+
// WithPort sets up the server port. Mandatory.
107116
func WithPort(port int) connOption {
108117
return func(c *config.Config) {
109118
c.Port = port
110119
}
111120
}
112121

122+
// WithAccessToken sets up the Personal Access Token. Mandatory for now.
113123
func WithAccessToken(token string) connOption {
114124
return func(c *config.Config) {
115125
c.AccessToken = token
116126
}
117127
}
118128

129+
// WithHTTPPath sets up the endpoint to the warehouse. Mandatory.
119130
func WithHTTPPath(path string) connOption {
120131
return func(c *config.Config) {
121132
c.HTTPPath = path
122133
}
123134
}
124135

136+
// WithMaxRows sets up the max rows fetched per request. Default is 10000
125137
func WithMaxRows(n int) connOption {
126138
return func(c *config.Config) {
127139
if n != 0 {
@@ -130,32 +142,43 @@ func WithMaxRows(n int) connOption {
130142
}
131143
}
132144

133-
// This will add a timeout for the server execution.
134-
// In seconds.
135-
func WithTimeout(n int) connOption {
145+
// WithTimeout adds timeout for the server query execution. Default is no timeout.
146+
func WithTimeout(n time.Duration) connOption {
136147
return func(c *config.Config) {
137-
c.QueryTimeoutSeconds = n
148+
c.QueryTimeout = n
138149
}
139150
}
140151

152+
// Sets the initial catalog name and schema name in the session.
153+
// Use <select * from foo> instead of <select * from catalog.schema.foo>
141154
func WithInitialNamespace(catalog, schema string) connOption {
142155
return func(c *config.Config) {
143156
c.Catalog = catalog
144157
c.Schema = schema
145158
}
146159
}
147160

161+
// Used to identify partners. Set as a string with format <isv-name+product-name>.
148162
func WithUserAgentEntry(entry string) connOption {
149163
return func(c *config.Config) {
150164
c.UserAgentEntry = entry
151165
}
152-
153166
}
154167

155-
// Sessions params will be set upon opening the session
168+
// Sessions params will be set upon opening the session by calling SET function.
156169
// If using connection pool, session params can avoid successive calls of "SET ..."
157170
func WithSessionParams(params map[string]string) connOption {
158171
return func(c *config.Config) {
172+
for k, v := range params {
173+
if strings.ToLower(k) == "timezone" {
174+
if loc, err := time.LoadLocation(v); err != nil {
175+
logger.Error().Msgf("timezone %s is not valid", v)
176+
} else {
177+
c.Location = loc
178+
}
179+
180+
}
181+
}
159182
c.SessionParams = params
160183
}
161184
}

doc.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/*
2+
Package dbsql implements the go driver to Databricks SQL
3+
*/
4+
package dbsql

0 commit comments

Comments
 (0)