@@ -23,7 +23,6 @@ import (
2323var (
2424 errReconnectRequestOutdated = xerrors .Wrap (errors .New ("ydb: reconnect request outdated" ))
2525 errReconnect = xerrors .Wrap (errors .New ("ydb: reconnect to topic grpc stream" ))
26- errStreamClosed = xerrors .Wrap (errors .New ("ydb: topic reader stream closed" ))
2726 errConnectionTimeout = xerrors .Wrap (errors .New ("ydb: topic reader connection timeout for stream" ))
2827)
2928
@@ -34,6 +33,7 @@ type readerReconnector struct {
3433 clock clockwork.Clock
3534 retrySettings topic.RetrySettings
3635 streamVal batchedStreamReader
36+ streamContextCancel context.CancelCauseFunc
3737 streamErr error
3838 closedErr error
3939 initErr error
@@ -149,6 +149,7 @@ func (r *readerReconnector) CloseWithError(ctx context.Context, err error) error
149149
150150 if r .streamVal != nil {
151151 streamCloseErr := r .streamVal .CloseWithError (ctx , xerrors .WithStackTrace (errReaderClosed ))
152+ r .streamContextCancel (errReaderClosed )
152153 if closeErr == nil {
153154 closeErr = streamCloseErr
154155 }
@@ -268,7 +269,7 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead
268269 _ = oldReader .CloseWithError (ctx , xerrors .WithStackTrace (errReconnect ))
269270 }
270271
271- newStream , err := r .connectWithTimeout ()
272+ newStream , newStreamClose , err := r .connectWithTimeout ()
272273
273274 if r .isRetriableError (err ) {
274275 go func (reason error ) {
@@ -282,6 +283,7 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead
282283 r .streamErr = err
283284 if err == nil {
284285 r .streamVal = newStream
286+ r .streamContextCancel = newStreamClose
285287 if ! r .initDone {
286288 r .initDone = true
287289 close (r .initDoneCh )
@@ -305,11 +307,11 @@ func (r *readerReconnector) checkErrRetryMode(err error, retriesDuration time.Du
305307 return topic .CheckRetryMode (err , r .retrySettings , retriesDuration )
306308}
307309
308- func (r * readerReconnector ) connectWithTimeout () (_ batchedStreamReader , err error ) {
310+ func (r * readerReconnector ) connectWithTimeout () (_ batchedStreamReader , _ context. CancelCauseFunc , err error ) {
309311 bgContext := r .background .Context ()
310312
311313 if err = bgContext .Err (); err != nil {
312- return nil , err
314+ return nil , nil , err
313315 }
314316
315317 connectionContext , cancel := context .WithCancelCause (context .WithoutCancel (bgContext ))
@@ -340,14 +342,10 @@ func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err err
340342 }
341343
342344 if res .err == nil {
343- stream := batchedStreamReaderHook {
344- batchedStreamReader : res .stream ,
345- contextCancel : cancel ,
346- }
347- return stream , nil
345+ return res .stream , cancel , nil
348346 }
349347
350- return nil , res .err
348+ return nil , nil , res .err
351349}
352350
353351func (r * readerReconnector ) WaitInit (ctx context.Context ) error {
@@ -422,17 +420,6 @@ func (r *readerReconnector) handlePanic() {
422420 }
423421}
424422
425- type batchedStreamReaderHook struct {
426- batchedStreamReader
427- contextCancel context.CancelCauseFunc
428- }
429-
430- func (b batchedStreamReaderHook ) CloseWithError (ctx context.Context , err error ) error {
431- defer b .contextCancel (xerrors .WithStackTrace (errStreamClosed ))
432-
433- return b .batchedStreamReader .CloseWithError (ctx , err )
434- }
435-
436423type reconnectRequest struct {
437424 oldReader batchedStreamReader
438425 reason error
0 commit comments