@@ -15,7 +15,6 @@ import (
1515 "github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
1616 "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic"
1717 "github.com/ydb-platform/ydb-go-sdk/v3/internal/value"
18- "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1918 "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
2019 "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
2120 "github.com/ydb-platform/ydb-go-sdk/v3/trace"
@@ -24,6 +23,7 @@ import (
2423var (
2524 errReconnectRequestOutdated = xerrors .Wrap (errors .New ("ydb: reconnect request outdated" ))
2625 errReconnect = xerrors .Wrap (errors .New ("ydb: reconnect to topic grpc stream" ))
26+ errConnectionTimeout = xerrors .Wrap (errors .New ("ydb: topic reader connection timeout for stream" ))
2727)
2828
2929type readerConnectFunc func (ctx context.Context ) (batchedStreamReader , error )
@@ -33,6 +33,7 @@ type readerReconnector struct {
3333 clock clockwork.Clock
3434 retrySettings topic.RetrySettings
3535 streamVal batchedStreamReader
36+ streamContextCancel context.CancelCauseFunc
3637 streamErr error
3738 closedErr error
3839 initErr error
@@ -148,6 +149,7 @@ func (r *readerReconnector) CloseWithError(ctx context.Context, err error) error
148149
149150 if r .streamVal != nil {
150151 streamCloseErr := r .streamVal .CloseWithError (ctx , xerrors .WithStackTrace (errReaderClosed ))
152+ r .streamContextCancel (errReaderClosed )
151153 if closeErr == nil {
152154 closeErr = streamCloseErr
153155 }
@@ -267,7 +269,7 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead
267269 _ = oldReader .CloseWithError (ctx , xerrors .WithStackTrace (errReconnect ))
268270 }
269271
270- newStream , err := r .connectWithTimeout ()
272+ newStream , newStreamClose , err := r .connectWithTimeout ()
271273
272274 if r .isRetriableError (err ) {
273275 go func (reason error ) {
@@ -281,6 +283,7 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead
281283 r .streamErr = err
282284 if err == nil {
283285 r .streamVal = newStream
286+ r .streamContextCancel = newStreamClose
284287 if ! r .initDone {
285288 r .initDone = true
286289 close (r .initDoneCh )
@@ -304,14 +307,14 @@ func (r *readerReconnector) checkErrRetryMode(err error, retriesDuration time.Du
304307 return topic .CheckRetryMode (err , r .retrySettings , retriesDuration )
305308}
306309
307- func (r * readerReconnector ) connectWithTimeout () (_ batchedStreamReader , err error ) {
310+ func (r * readerReconnector ) connectWithTimeout () (_ batchedStreamReader , _ context. CancelCauseFunc , err error ) {
308311 bgContext := r .background .Context ()
309312
310313 if err = bgContext .Err (); err != nil {
311- return nil , err
314+ return nil , nil , err
312315 }
313316
314- connectionContext , cancel := xcontext . WithCancel (context .Background ( ))
317+ connectionContext , cancel := context . WithCancelCause (context .WithoutCancel ( bgContext ))
315318
316319 type connectResult struct {
317320 stream batchedStreamReader
@@ -332,17 +335,17 @@ func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err err
332335 case <- connectionTimoutTimer .Chan ():
333336 // cancel connection context only if timeout exceed while connection
334337 // because if cancel context after connect - it will break
335- cancel ()
338+ cancel (xerrors . WithStackTrace ( errConnectionTimeout ) )
336339 res = <- result
337340 case res = <- result :
338341 // pass
339342 }
340343
341344 if res .err == nil {
342- return res .stream , nil
345+ return res .stream , cancel , nil
343346 }
344347
345- return nil , res .err
348+ return nil , nil , res .err
346349}
347350
348351func (r * readerReconnector ) WaitInit (ctx context.Context ) error {
0 commit comments