@@ -3,16 +3,19 @@ package dbsql
33import (
44 "context"
55 "database/sql/driver"
6- "fmt"
76 "time"
87
8+ "github.com/databricks/databricks-sql-go/driverctx"
99 "github.com/databricks/databricks-sql-go/internal/cli_service"
10+ "github.com/databricks/databricks-sql-go/internal/client"
1011 "github.com/databricks/databricks-sql-go/internal/config"
1112 "github.com/databricks/databricks-sql-go/internal/sentinel"
1213 "github.com/databricks/databricks-sql-go/logger"
14+ "github.com/pkg/errors"
1315)
1416
1517type conn struct {
18+ id string
1619 cfg * config.Config
1720 client cli_service.TCLIService
1821 session * cli_service.TOpenSessionResp
@@ -29,32 +32,41 @@ func (c *conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, e
2932}
3033
3134func (c * conn ) Close () error {
35+ log := logger .WithContext (c .id , "" , "" )
36+ ctx := driverctx .NewContextWithConnId (context .Background (), c .id )
3237 sentinel := sentinel.Sentinel {
3338 OnDoneFn : func (statusResp any ) (any , error ) {
34- return c .client .CloseSession (context . Background () , & cli_service.TCloseSessionReq {
39+ return c .client .CloseSession (ctx , & cli_service.TCloseSessionReq {
3540 SessionHandle : c .session .SessionHandle ,
3641 })
3742 },
3843 }
39- _ , _ , err := sentinel .Watch (context .Background (), c .cfg .PollInterval , 15 * time .Second )
40-
41- return err
44+ _ , _ , err := sentinel .Watch (ctx , c .cfg .PollInterval , 15 * time .Second )
45+ if err != nil {
46+ log .Err (err ).Msg ("databricks: failed to close connection" )
47+ return wrapErr (err , "failed to close connection" )
48+ }
49+ return nil
4250}
4351
4452// Not supported in Databricks
4553func (c * conn ) Begin () (driver.Tx , error ) {
46- return nil , fmt . Errorf ( "databricks: transactions are not supported" )
54+ return nil , errors . New ( ErrTransactionsNotSupported )
4755}
4856
4957// Not supported in Databricks
5058func (c * conn ) BeginTx (ctx context.Context , opts driver.TxOptions ) (driver.Tx , error ) {
51- return nil , fmt . Errorf ( "databricks: transactions are not supported" )
59+ return nil , errors . New ( ErrTransactionsNotSupported )
5260}
5361
5462func (c * conn ) Ping (ctx context.Context ) error {
55- _ , err := c .QueryContext (ctx , "select 1" , nil )
63+ log := logger .WithContext (c .id , driverctx .CorrelationIdFromContext (ctx ), "" )
64+ ctx = driverctx .NewContextWithConnId (ctx , c .id )
65+ ctx1 , cancel := context .WithTimeout (ctx , 15 * time .Second )
66+ defer cancel ()
67+ _ , err := c .QueryContext (ctx1 , "select 1" , nil )
5668 if err != nil {
57- logger .Err (err ).Msg ("ping error " )
69+ log .Err (err ).Msg ("databricks: failed to ping " )
5870 return driver .ErrBadConn
5971 }
6072 return nil
@@ -76,13 +88,21 @@ func (c *conn) IsValid() bool {
7688// ExecContext honors the context timeout and return when it is canceled.
7789// Statement ExecContext is the same as connection ExecContext
7890func (c * conn ) ExecContext (ctx context.Context , query string , args []driver.NamedValue ) (driver.Result , error ) {
91+ log := logger .WithContext (c .id , driverctx .CorrelationIdFromContext (ctx ), "" )
92+ defer log .Duration (logger .Track ("ExecContext" ))
93+ ctx = driverctx .NewContextWithConnId (ctx , c .id )
7994 if len (args ) > 0 {
80- return nil , fmt .Errorf ("databricks: query parameters are not supported" )
95+ return nil , errors .New (ErrParametersNotSupported )
96+ }
97+ exStmtResp , opStatusResp , err := c .runQuery (ctx , query , args )
98+
99+ if exStmtResp != nil && exStmtResp .OperationHandle != nil {
100+ log = logger .WithContext (c .id , driverctx .CorrelationIdFromContext (ctx ), client .SprintGuid (exStmtResp .OperationHandle .OperationId .GUID ))
81101 }
82- _ , opStatusResp , err := c .runQuery (ctx , query , args )
83102
84103 if err != nil {
85- return nil , err
104+ log .Err (err ).Msgf ("databricks: failed to execute query: query %s" , query )
105+ return nil , wrapErrf (err , "failed to execute query" )
86106 }
87107 res := result {AffectedRows : opStatusResp .GetNumModifiedRows ()}
88108
@@ -95,24 +115,37 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
95115// QueryContext honors the context timeout and return when it is canceled.
96116// Statement QueryContext is the same as connection QueryContext
97117func (c * conn ) QueryContext (ctx context.Context , query string , args []driver.NamedValue ) (driver.Rows , error ) {
118+ corrId := driverctx .CorrelationIdFromContext (ctx )
119+ log := logger .WithContext (c .id , corrId , "" )
120+ msg , start := log .Track ("QueryContext" )
121+
122+ ctx = driverctx .NewContextWithConnId (ctx , c .id )
98123 if len (args ) > 0 {
99- return nil , fmt . Errorf ( "databricks: query parameters are not supported" )
124+ return nil , errors . New ( ErrParametersNotSupported )
100125 }
101126 // first we try to get the results synchronously.
102127 // at any point in time that the context is done we must cancel and return
103128 exStmtResp , _ , err := c .runQuery (ctx , query , args )
104129
130+ if exStmtResp != nil && exStmtResp .OperationHandle != nil {
131+ log = logger .WithContext (c .id , driverctx .CorrelationIdFromContext (ctx ), client .SprintGuid (exStmtResp .OperationHandle .OperationId .GUID ))
132+ }
133+ defer log .Duration (msg , start )
134+
105135 if err != nil {
106- return nil , err
136+ log .Err (err ).Msgf ("databricks: failed to run query: query %s" , query )
137+ return nil , wrapErrf (err , "failed to run query" )
107138 }
108139 // hold on to the operation handle
109140 opHandle := exStmtResp .OperationHandle
110141
111142 rows := rows {
112- client : c .client ,
113- opHandle : opHandle ,
114- pageSize : int64 (c .cfg .MaxRows ),
115- location : c .cfg .Location ,
143+ connId : c .id ,
144+ correlationId : corrId ,
145+ client : c .client ,
146+ opHandle : opHandle ,
147+ pageSize : int64 (c .cfg .MaxRows ),
148+ location : c .cfg .Location ,
116149 }
117150
118151 if exStmtResp .DirectResults != nil {
@@ -126,15 +159,19 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
126159}
127160
128161func (c * conn ) runQuery (ctx context.Context , query string , args []driver.NamedValue ) (* cli_service.TExecuteStatementResp , * cli_service.TGetOperationStatusResp , error ) {
162+ log := logger .WithContext (c .id , driverctx .CorrelationIdFromContext (ctx ), "" )
129163 // first we try to get the results synchronously.
130164 // at any point in time that the context is done we must cancel and return
131165 exStmtResp , err := c .executeStatement (ctx , query , args )
132166
133167 if err != nil {
134- return nil , nil , err
168+ return exStmtResp , nil , err
135169 }
136170 // hold on to the operation handle
137171 opHandle := exStmtResp .OperationHandle
172+ if opHandle != nil && opHandle .OperationId != nil {
173+ log = logger .WithContext (c .id , driverctx .CorrelationIdFromContext (ctx ), client .SprintGuid (opHandle .OperationId .GUID ))
174+ }
138175
139176 if exStmtResp .DirectResults != nil {
140177 opStatus := exStmtResp .DirectResults .GetOperationStatus ()
@@ -148,15 +185,13 @@ func (c *conn) runQuery(ctx context.Context, query string, args []driver.NamedVa
148185 // bad
149186 case cli_service .TOperationState_CANCELED_STATE , cli_service .TOperationState_CLOSED_STATE , cli_service .TOperationState_ERROR_STATE , cli_service .TOperationState_TIMEDOUT_STATE :
150187 // do we need to close the operation in these cases?
151- logger .Error ().Msg (opStatus .GetErrorMessage ())
152- logger .Debug ().Msgf ("bad state: %s" , opStatus .GetOperationState ())
153-
154- return exStmtResp , opStatus , fmt .Errorf (opStatus .GetDisplayMessage ())
188+ logBadQueryState (log , opStatus )
189+ return exStmtResp , opStatus , errors .New (opStatus .GetDisplayMessage ())
155190 // live states
156191 case cli_service .TOperationState_INITIALIZED_STATE , cli_service .TOperationState_PENDING_STATE , cli_service .TOperationState_RUNNING_STATE :
157192 statusResp , err := c .pollOperation (ctx , opHandle )
158193 if err != nil {
159- return nil , statusResp , err
194+ return exStmtResp , statusResp , err
160195 }
161196 switch statusResp .GetOperationState () {
162197 // terminal states
@@ -166,20 +201,17 @@ func (c *conn) runQuery(ctx context.Context, query string, args []driver.NamedVa
166201 return exStmtResp , opStatus , nil
167202 // bad
168203 case cli_service .TOperationState_CANCELED_STATE , cli_service .TOperationState_CLOSED_STATE , cli_service .TOperationState_ERROR_STATE , cli_service .TOperationState_TIMEDOUT_STATE :
169- logger .Debug ().Msgf ("bad state: %s" , statusResp .GetOperationState ())
170- logger .Error ().Msg (statusResp .GetErrorMessage ())
171- return exStmtResp , opStatus , fmt .Errorf (statusResp .GetDisplayMessage ())
204+ logBadQueryState (log , statusResp )
205+ return exStmtResp , opStatus , errors .New (statusResp .GetDisplayMessage ())
172206 // live states
173207 default :
174- logger .Debug ().Msgf ("bad state: %s" , statusResp .GetOperationState ())
175- logger .Error ().Msg (statusResp .GetErrorMessage ())
176- return exStmtResp , opStatus , fmt .Errorf ("invalid operation state. This should not have happened" )
208+ logBadQueryState (log , statusResp )
209+ return exStmtResp , opStatus , errors .New ("invalid operation state. This should not have happened" )
177210 }
178211 // weird states
179212 default :
180- logger .Debug ().Msgf ("bad state: %s" , opStatus .GetOperationState ())
181- logger .Error ().Msg (opStatus .GetErrorMessage ())
182- return exStmtResp , opStatus , fmt .Errorf ("invalid operation state. This should not have happened" )
213+ logBadQueryState (log , opStatus )
214+ return exStmtResp , opStatus , errors .New ("invalid operation state. This should not have happened" )
183215 }
184216
185217 } else {
@@ -195,18 +227,21 @@ func (c *conn) runQuery(ctx context.Context, query string, args []driver.NamedVa
195227 return exStmtResp , statusResp , nil
196228 // bad
197229 case cli_service .TOperationState_CANCELED_STATE , cli_service .TOperationState_CLOSED_STATE , cli_service .TOperationState_ERROR_STATE , cli_service .TOperationState_TIMEDOUT_STATE :
198- logger .Debug ().Msgf ("bad state: %s" , statusResp .GetOperationState ())
199- logger .Error ().Msg (statusResp .GetErrorMessage ())
200- return exStmtResp , statusResp , fmt .Errorf (statusResp .GetDisplayMessage ())
230+ logBadQueryState (log , statusResp )
231+ return exStmtResp , statusResp , errors .New (statusResp .GetDisplayMessage ())
201232 // live states
202233 default :
203- logger .Debug ().Msgf ("bad state: %s" , statusResp .GetOperationState ())
204- logger .Error ().Msg (statusResp .GetErrorMessage ())
205- return exStmtResp , statusResp , fmt .Errorf ("invalid operation state. This should not have happened" )
234+ logBadQueryState (log , statusResp )
235+ return exStmtResp , statusResp , errors .New ("invalid operation state. This should not have happened" )
206236 }
207237 }
208238}
209239
240+ func logBadQueryState (log * logger.DBSQLLogger , opStatus * cli_service.TGetOperationStatusResp ) {
241+ log .Error ().Msgf ("databricks: query state: %s" , opStatus .GetOperationState ())
242+ log .Error ().Msg (opStatus .GetErrorMessage ())
243+ }
244+
210245func (c * conn ) executeStatement (ctx context.Context , query string , args []driver.NamedValue ) (* cli_service.TExecuteStatementResp , error ) {
211246 sentinel := sentinel.Sentinel {
212247 OnDoneFn : func (statusResp any ) (any , error ) {
@@ -223,8 +258,9 @@ func (c *conn) executeStatement(ctx context.Context, query string, args []driver
223258 // CanDecompressLZ4Result_: &f,
224259 // CanDownloadResult_: &t,
225260 }
261+ ctx = driverctx .NewContextWithConnId (ctx , c .id )
226262 resp , err := c .client .ExecuteStatement (ctx , & req )
227- return resp , err
263+ return resp , wrapErr ( err , "failed to execute statement" )
228264 },
229265 }
230266 _ , res , err := sentinel .Watch (ctx , c .cfg .PollInterval , 0 )
@@ -233,51 +269,61 @@ func (c *conn) executeStatement(ctx context.Context, query string, args []driver
233269 }
234270 exStmtResp , ok := res .(* cli_service.TExecuteStatementResp )
235271 if ! ok {
236- return nil , fmt . Errorf ("databricks: invalid execute statement response" )
272+ return exStmtResp , errors . New ("databricks: invalid execute statement response" )
237273 }
238274 return exStmtResp , err
239275}
240276
241277func (c * conn ) pollOperation (ctx context.Context , opHandle * cli_service.TOperationHandle ) (* cli_service.TGetOperationStatusResp , error ) {
278+ corrId := driverctx .CorrelationIdFromContext (ctx )
279+ log := logger .WithContext (c .id , corrId , client .SprintGuid (opHandle .OperationId .GUID ))
242280 var statusResp * cli_service.TGetOperationStatusResp
281+ ctx = driverctx .NewContextWithConnId (ctx , c .id )
282+ newCtx := driverctx .NewContextWithCorrelationId (driverctx .NewContextWithConnId (context .Background (), c .id ), corrId )
243283 pollSentinel := sentinel.Sentinel {
244284 OnDoneFn : func (statusResp any ) (any , error ) {
245285 return statusResp , nil
246286 },
247287 StatusFn : func () (sentinel.Done , any , error ) {
248288 var err error
249- logger .Debug ().Msg ("databricks: polling status" )
250- statusResp , err = c .client .GetOperationStatus (context . Background () , & cli_service.TGetOperationStatusReq {
289+ log .Debug ().Msg ("databricks: polling status" )
290+ statusResp , err = c .client .GetOperationStatus (newCtx , & cli_service.TGetOperationStatusReq {
251291 OperationHandle : opHandle ,
252292 })
293+ if statusResp != nil && statusResp .OperationState != nil {
294+ log .Debug ().Msgf ("databricks: status %s" , statusResp .GetOperationState ().String ())
295+ }
253296 return func () bool {
254297 // which other states?
298+ if err != nil {
299+ return true
300+ }
255301 switch statusResp .GetOperationState () {
256302 case cli_service .TOperationState_INITIALIZED_STATE , cli_service .TOperationState_PENDING_STATE , cli_service .TOperationState_RUNNING_STATE :
257303 return false
258304 default :
259- logger .Debug ().Msg ("databricks: polling done" )
305+ log .Debug ().Msg ("databricks: polling done" )
260306 return true
261307 }
262308 }, statusResp , err
263309 },
264310 OnCancelFn : func () (any , error ) {
265- logger .Debug ().Msgf ("databricks: canceling operation %s" , opHandle . OperationId )
266- ret , err := c .client .CancelOperation (context . Background () , & cli_service.TCancelOperationReq {
311+ log .Debug ().Msg ("databricks: canceling query" )
312+ ret , err := c .client .CancelOperation (newCtx , & cli_service.TCancelOperationReq {
267313 OperationHandle : opHandle ,
268314 })
269315 return ret , err
270316 },
271317 }
272318 _ , resp , err := pollSentinel .Watch (ctx , c .cfg .PollInterval , 0 )
273319 if err != nil {
274- return nil , err
320+ return nil , wrapErr ( err , "failed to poll query state" )
275321 }
276322 statusResp , ok := resp .(* cli_service.TGetOperationStatusResp )
277323 if ! ok {
278- return nil , fmt . Errorf ("could not read operation status" )
324+ return nil , errors . New ("could not read query status" )
279325 }
280- return statusResp , err
326+ return statusResp , nil
281327}
282328
283329var _ driver.Conn = (* conn )(nil )
0 commit comments