@@ -8,24 +8,21 @@ import (
88 "sync/atomic"
99 "time"
1010
11+ "github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
1112 "google.golang.org/grpc"
12- "google.golang.org/grpc/codes"
1313 "google.golang.org/grpc/connectivity"
14- grpcStatus "google.golang.org/grpc/status"
15-
16- "github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
1714
1815 "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1916 "github.com/ydb-platform/ydb-go-sdk/v3/internal/errors"
2017 "github.com/ydb-platform/ydb-go-sdk/v3/internal/response"
2118 "github.com/ydb-platform/ydb-go-sdk/v3/trace"
2219)
2320
24- // nolint:gofumpt
25- // nolint:nolintlint
2621var (
27- // ErrOperationNotReady specified error when operation is not ready
28- ErrOperationNotReady = fmt .Errorf ("operation is not ready yet" )
22+ // errOperationNotReady specified error when operation is not ready
23+ errOperationNotReady = fmt .Errorf ("operation is not ready yet" )
24+ // errClosedConnection specified error when connection are closed early
25+ errClosedConnection = fmt .Errorf ("connection closed early" )
2926)
3027
3128type Conn interface {
@@ -189,10 +186,7 @@ func (c *conn) take(ctx context.Context) (cc *grpc.ClientConn, err error) {
189186 }()
190187
191188 if c .isClosed () {
192- return nil , errors .NewGrpcError (
193- errors .WithStatus (grpcStatus .New (codes .Unavailable , "ydb driver conn closed early" )),
194- errors .WithErr (err ),
195- )
189+ return nil , errors .WithStackTrace (errClosedConnection )
196190 }
197191
198192 c .mtx .Lock ()
@@ -283,7 +277,7 @@ func (c *conn) close() (err error) {
283277 err = c .cc .Close ()
284278 c .cc = nil
285279 c .setState (Offline )
286- return err
280+ return errors . WithStackTrace ( err )
287281}
288282
289283func (c * conn ) isClosed () bool {
@@ -319,42 +313,7 @@ func (c *conn) Close(ctx context.Context) (err error) {
319313 f (c )
320314 }
321315
322- return err
323- }
324-
325- // invoke have behavior like grpc call
326- func (c * conn ) invoke (
327- ctx context.Context ,
328- method string ,
329- req interface {},
330- res interface {},
331- opts ... grpc.CallOption ,
332- ) (err error ) {
333- var cc * grpc.ClientConn
334- cc , err = c .take (ctx )
335- if err != nil {
336- return errors .NewGrpcError (
337- errors .WithStatus (grpcStatus .New (codes .Unavailable , "ydb driver conn take failed" )),
338- errors .WithErr (err ),
339- )
340- }
341-
342- c .changeUsages (1 )
343- defer c .changeUsages (- 1 )
344-
345- err = cc .Invoke (ctx , method , req , res , opts ... )
346-
347- if err != nil {
348- if s , ok := grpcStatus .FromError (err ); ok {
349- return errors .NewGrpcError (
350- errors .WithStatus (s ),
351- errors .WithErr (errors .WithStackTrace (err )),
352- )
353- }
354- return errors .WithStackTrace (err )
355- }
356-
357- return nil
316+ return errors .WithStackTrace (err )
358317}
359318
360319func (c * conn ) Invoke (
@@ -374,13 +333,22 @@ func (c *conn) Invoke(
374333 c .endpoint ,
375334 trace .Method (method ),
376335 )
336+ cc * grpc.ClientConn
377337 )
378338
379339 defer func () {
380340 onDone (err , issues , opID , c .GetState ())
381341 }()
382342
383- err = c .invoke (ctx , method , req , res , opts ... )
343+ cc , err = c .take (ctx )
344+ if err != nil {
345+ return errors .WithStackTrace (err )
346+ }
347+
348+ c .changeUsages (1 )
349+ defer c .changeUsages (- 1 )
350+
351+ err = cc .Invoke (ctx , method , req , res , opts ... )
384352
385353 if err != nil {
386354 if wrapping {
@@ -391,7 +359,7 @@ func (c *conn) Invoke(
391359 ),
392360 )
393361 }
394- return err
362+ return errors . WithStackTrace ( err )
395363 }
396364
397365 if o , ok := res .(response.Response ); ok {
@@ -402,52 +370,23 @@ func (c *conn) Invoke(
402370 if wrapping {
403371 switch {
404372 case ! o .GetOperation ().GetReady ():
405- return errors .WithStackTrace (ErrOperationNotReady )
373+ return errors .WithStackTrace (errOperationNotReady )
406374
407375 case o .GetOperation ().GetStatus () != Ydb .StatusIds_SUCCESS :
408- return errors .WithStackTrace (errors .NewOpError (errors .WithOEOperation (o .GetOperation ())))
376+ return errors .WithStackTrace (
377+ errors .NewOpError (
378+ errors .WithOEOperation (
379+ o .GetOperation (),
380+ ),
381+ ),
382+ )
409383 }
410384 }
411385 }
412386
413387 return err
414388}
415389
416- // newStream have behavior like grpc call
417- func (c * conn ) newStream (
418- ctx context.Context ,
419- desc * grpc.StreamDesc ,
420- method string ,
421- opts ... grpc.CallOption ,
422- ) (_ grpc.ClientStream , err error ) {
423- var cc * grpc.ClientConn
424- cc , err = c .take (ctx )
425- if err != nil {
426- return nil , errors .NewGrpcError (
427- errors .WithStatus (grpcStatus .New (codes .Unavailable , "ydb driver conn take failed" )),
428- errors .WithErr (err ),
429- )
430- }
431-
432- c .changeStreamUsages (1 )
433- defer c .changeStreamUsages (- 1 )
434-
435- var client grpc.ClientStream
436- client , err = cc .NewStream (ctx , desc , method , opts ... )
437-
438- if err != nil {
439- if s , ok := grpcStatus .FromError (err ); ok {
440- return nil , errors .NewGrpcError (
441- errors .WithStatus (s ),
442- errors .WithErr (errors .WithStackTrace (err )),
443- )
444- }
445- return nil , errors .WithStackTrace (err )
446- }
447-
448- return client , nil
449- }
450-
451390func (c * conn ) NewStream (
452391 ctx context.Context ,
453392 desc * grpc.StreamDesc ,
@@ -462,6 +401,8 @@ func (c *conn) NewStream(
462401 trace .Method (method ),
463402 )
464403 wrapping = needWrapping (ctx )
404+ cc * grpc.ClientConn
405+ s grpc.ClientStream
465406 )
466407
467408 defer func () {
@@ -479,13 +420,15 @@ func (c *conn) NewStream(
479420 }
480421 }()
481422
482- var s grpc.ClientStream
483- s , err = c .newStream (
484- ctx ,
485- desc ,
486- method ,
487- opts ... ,
488- )
423+ cc , err = c .take (ctx )
424+ if err != nil {
425+ return nil , errors .WithStackTrace (err )
426+ }
427+
428+ c .changeStreamUsages (1 )
429+ defer c .changeStreamUsages (- 1 )
430+
431+ s , err = cc .NewStream (ctx , desc , method , opts ... )
489432
490433 if err != nil {
491434 if wrapping {
@@ -496,13 +439,13 @@ func (c *conn) NewStream(
496439 ),
497440 )
498441 }
499- return s , err
442+ return s , errors . WithStackTrace ( err )
500443 }
501444
502445 return & grpcClientStream {
503- c : c ,
504- s : s ,
505- wrapping : wrapping ,
446+ ClientStream : s ,
447+ c : c ,
448+ wrapping : wrapping ,
506449 onDone : func (ctx context.Context ) {
507450 cancel ()
508451 },
0 commit comments