Skip to content

Commit 7ba94b0

Browse files
committed
Fix close connection lifetime context
1 parent 9d49511 commit 7ba94b0

File tree

1 file changed

+20
-4
lines changed

1 file changed

+20
-4
lines changed

internal/topic/topicreaderinternal/stream_reconnector.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
1616
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic"
1717
"github.com/ydb-platform/ydb-go-sdk/v3/internal/value"
18-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1918
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
2019
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
2120
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
@@ -24,6 +23,8 @@ import (
2423
var (
2524
errReconnectRequestOutdated = xerrors.Wrap(errors.New("ydb: reconnect request outdated"))
2625
errReconnect = xerrors.Wrap(errors.New("ydb: reconnect to topic grpc stream"))
26+
errStreamClosed = xerrors.Wrap(errors.New("ydb: topic reader stream closed"))
27+
errConnectionTimeout = xerrors.Wrap(errors.New("ydb: topic reader connection timeout for stream"))
2728
)
2829

2930
type readerConnectFunc func(ctx context.Context) (batchedStreamReader, error)
@@ -311,7 +312,7 @@ func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err err
311312
return nil, err
312313
}
313314

314-
connectionContext, cancel := xcontext.WithCancel(context.Background())
315+
connectionContext, cancel := context.WithCancelCause(context.WithoutCancel(bgContext))
315316

316317
type connectResult struct {
317318
stream batchedStreamReader
@@ -332,14 +333,18 @@ func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err err
332333
case <-connectionTimoutTimer.Chan():
333334
// cancel connection context only if timeout exceed while connection
334335
// because if cancel context after connect - it will break
335-
cancel()
336+
cancel(xerrors.WithStackTrace(errConnectionTimeout))
336337
res = <-result
337338
case res = <-result:
338339
// pass
339340
}
340341

341342
if res.err == nil {
342-
return res.stream, nil
343+
stream := batchedStreamReaderHook{
344+
batchedStreamReader: res.stream,
345+
contextCancel: cancel,
346+
}
347+
return stream, nil
343348
}
344349

345350
return nil, res.err
@@ -417,6 +422,17 @@ func (r *readerReconnector) handlePanic() {
417422
}
418423
}
419424

425+
type batchedStreamReaderHook struct {
426+
batchedStreamReader
427+
contextCancel context.CancelCauseFunc
428+
}
429+
430+
func (b batchedStreamReaderHook) CloseWithError(ctx context.Context, err error) error {
431+
defer b.contextCancel(xerrors.WithStackTrace(errStreamClosed))
432+
433+
return b.batchedStreamReader.CloseWithError(ctx, err)
434+
}
435+
420436
type reconnectRequest struct {
421437
oldReader batchedStreamReader
422438
reason error

0 commit comments

Comments
 (0)