From d43f412a5d050489b8376cb5f98421dd81166d8f Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Thu, 30 Oct 2025 12:53:15 +0300 Subject: [PATCH 1/6] connectTimeout include initStream --- .../topicwriterinternal/writer_reconnector.go | 50 +++++-------------- .../context_with_stoppable_timeout.go | 21 ++++++++ .../context_with_temporary_timeout_test.go | 35 +++++++++++++ 3 files changed, 68 insertions(+), 38 deletions(-) create mode 100644 internal/xcontext/context_with_stoppable_timeout.go create mode 100644 internal/xcontext/context_with_temporary_timeout_test.go diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index 443024db0..6ec13e174 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -489,7 +489,10 @@ func (w *WriterReconnector) startWriteStream(ctx, streamCtx context.Context) ( writer *SingleStreamWriter, err error, ) { - stream, err := w.connectWithTimeout(streamCtx) + connectCtx, stopConnectCtx := xcontext.WithStoppableTimeout(streamCtx, w.cfg.connectTimeout) + defer stopConnectCtx() + + stream, err := w.connectWithTimeout(connectCtx) if err != nil { return nil, err } @@ -505,45 +508,16 @@ func (w *WriterReconnector) needReceiveLastSeqNo() bool { return res } -func (w *WriterReconnector) connectWithTimeout(streamLifetimeContext context.Context) (RawTopicWriterStream, error) { - connectCtx, connectCancel := xcontext.WithCancel(streamLifetimeContext) - - type resT struct { - stream RawTopicWriterStream - err error - } - resCh := make(chan resT, 1) - - go func() { - defer func() { - p := recover() - if p != nil { - resCh <- resT{ - stream: nil, - err: xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: panic while connect to topic writer: %+v", p))), - } - } - }() - - stream, err := w.cfg.Connect(connectCtx, w.cfg.Tracer) - resCh <- resT{stream: stream, err: err} +func (w *WriterReconnector) connectWithTimeout(ctx context.Context) (stream RawTopicWriterStream, err error) { + defer func() { + p := recover() + if p != nil { + stream = nil + err = xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: panic while connect to topic writer: %+v", p))) + } }() - timer := time.NewTimer(w.cfg.connectTimeout) - defer timer.Stop() - - select { - case <-timer.C: - connectCancel() - - return nil, xerrors.WithStackTrace(errConnTimeout) - case res := <-resCh: - // force no cancel connect context - because it will break stream - // context will cancel by cancel streamLifetimeContext while reconnect or stop connection - _ = connectCancel - - return res.stream, res.err - } + return w.cfg.Connect(ctx, w.cfg.Tracer) } func (w *WriterReconnector) onAckReceived(count int) { diff --git a/internal/xcontext/context_with_stoppable_timeout.go b/internal/xcontext/context_with_stoppable_timeout.go new file mode 100644 index 000000000..9bdeb5a3d --- /dev/null +++ b/internal/xcontext/context_with_stoppable_timeout.go @@ -0,0 +1,21 @@ +package xcontext + +import ( + "context" + "time" +) + +// WithStoppableTimeout returns a copy of the parent context that is cancelled after +// timeout elapses, and a stop function. Calling the stop function prevents the +// timeout from cancelling the context and releases resources associated with it. +func WithStoppableTimeout(ctx context.Context, timeout time.Duration) (context.Context, func()) { + ctxWithCancel, cancel := WithCancel(ctx) + timeoutCtx, cancelTimeout := WithTimeout(ctx, timeout) + + stop := context.AfterFunc(timeoutCtx, cancel) + + return ctxWithCancel, func() { + stop() + cancelTimeout() + } +} diff --git a/internal/xcontext/context_with_temporary_timeout_test.go b/internal/xcontext/context_with_temporary_timeout_test.go new file mode 100644 index 000000000..f997faefc --- /dev/null +++ b/internal/xcontext/context_with_temporary_timeout_test.go @@ -0,0 +1,35 @@ +//go:build go1.25 + +package xcontext_test + +import ( + "context" + "testing" + "testing/synctest" + "time" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" +) + +func TestWithStoppableTimeout(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx, _ := xcontext.WithStoppableTimeout(context.Background(), 10*time.Second) + select { + case <-time.After(100500 * time.Second): + t.Fatal("context should be done") + case <-ctx.Done(): + } + }) + + synctest.Test(t, func(t *testing.T) { + ctx, stop := xcontext.WithStoppableTimeout(context.Background(), 10*time.Second) + + stop() + + select { + case <-time.After(100500 * time.Second): + case <-ctx.Done(): + t.Fatal("context shouldn't be canceled") + } + }) +} From 7c9e1e1625bd08e9b89cadbd2df444dfa896df48 Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Thu, 30 Oct 2025 19:20:44 +0300 Subject: [PATCH 2/6] add changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f1175363..3e8712f2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ * Added method `query.WithIssuesHandler` to get query issues +* Fixed connection timeout issue in topics writer ## v3.117.2 * Added support for `Result.RowsAffected()` for YDB `database/sql` driver From 6374144c927205238209731f81aac8f9b63f80d0 Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Thu, 30 Oct 2025 19:49:40 +0300 Subject: [PATCH 3/6] fix lint --- .../topicwriterinternal/writer_reconnector.go | 2 +- .../xcontext/context_with_stoppable_timeout.go | 13 +++++++------ ...t.go => context_with_stoppable_timeout_test.go} | 14 +++++++++----- 3 files changed, 17 insertions(+), 12 deletions(-) rename internal/xcontext/{context_with_temporary_timeout_test.go => context_with_stoppable_timeout_test.go} (56%) diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index 6ec13e174..fad264a43 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -489,7 +489,7 @@ func (w *WriterReconnector) startWriteStream(ctx, streamCtx context.Context) ( writer *SingleStreamWriter, err error, ) { - connectCtx, stopConnectCtx := xcontext.WithStoppableTimeout(streamCtx, w.cfg.connectTimeout) + connectCtx, stopConnectCtx := xcontext.WithStoppableTimeoutCause(streamCtx, w.cfg.connectTimeout, errConnTimeout) defer stopConnectCtx() stream, err := w.connectWithTimeout(connectCtx) diff --git a/internal/xcontext/context_with_stoppable_timeout.go b/internal/xcontext/context_with_stoppable_timeout.go index 9bdeb5a3d..a06cf1680 100644 --- a/internal/xcontext/context_with_stoppable_timeout.go +++ b/internal/xcontext/context_with_stoppable_timeout.go @@ -5,14 +5,15 @@ import ( "time" ) -// WithStoppableTimeout returns a copy of the parent context that is cancelled after -// timeout elapses, and a stop function. Calling the stop function prevents the -// timeout from cancelling the context and releases resources associated with it. -func WithStoppableTimeout(ctx context.Context, timeout time.Duration) (context.Context, func()) { - ctxWithCancel, cancel := WithCancel(ctx) +// WithStoppableTimeoutCause returns a copy of the parent context that is cancelled with +// the specified cause after timeout elapses, and a stop function. Calling the stop function +// prevents the timeout from canceling the context and releases resources associated with it. +// The cause error will be used when the timeout triggers cancellation. +func WithStoppableTimeoutCause(ctx context.Context, timeout time.Duration, cause error) (context.Context, func()) { + ctxWithCancel, cancel := context.WithCancelCause(ctx) timeoutCtx, cancelTimeout := WithTimeout(ctx, timeout) - stop := context.AfterFunc(timeoutCtx, cancel) + stop := context.AfterFunc(timeoutCtx, func() { cancel(cause) }) return ctxWithCancel, func() { stop() diff --git a/internal/xcontext/context_with_temporary_timeout_test.go b/internal/xcontext/context_with_stoppable_timeout_test.go similarity index 56% rename from internal/xcontext/context_with_temporary_timeout_test.go rename to internal/xcontext/context_with_stoppable_timeout_test.go index f997faefc..c02cd7ce5 100644 --- a/internal/xcontext/context_with_temporary_timeout_test.go +++ b/internal/xcontext/context_with_stoppable_timeout_test.go @@ -1,28 +1,32 @@ -//go:build go1.25 - package xcontext_test import ( "context" + "errors" "testing" "testing/synctest" "time" + "github.com/stretchr/testify/assert" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" ) -func TestWithStoppableTimeout(t *testing.T) { +func TestWithStoppableTimeoutCause(t *testing.T) { + wantErr := errors.New("some error") + synctest.Test(t, func(t *testing.T) { - ctx, _ := xcontext.WithStoppableTimeout(context.Background(), 10*time.Second) + ctx, _ := xcontext.WithStoppableTimeoutCause(context.Background(), 10*time.Second, wantErr) select { case <-time.After(100500 * time.Second): t.Fatal("context should be done") case <-ctx.Done(): + assert.ErrorIs(t, context.Cause(ctx), wantErr) } }) synctest.Test(t, func(t *testing.T) { - ctx, stop := xcontext.WithStoppableTimeout(context.Background(), 10*time.Second) + ctx, stop := xcontext.WithStoppableTimeoutCause(context.Background(), 10*time.Second, wantErr) stop() From 1c1be1cb19ebfc5fdc665d05d0dc8bc320272044 Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Thu, 30 Oct 2025 23:03:50 +0300 Subject: [PATCH 4/6] fix --- internal/xcontext/context_with_stoppable_timeout_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/xcontext/context_with_stoppable_timeout_test.go b/internal/xcontext/context_with_stoppable_timeout_test.go index c02cd7ce5..6c7dd3374 100644 --- a/internal/xcontext/context_with_stoppable_timeout_test.go +++ b/internal/xcontext/context_with_stoppable_timeout_test.go @@ -1,3 +1,5 @@ +//go:build go1.25 + package xcontext_test import ( From a7140ed213ade0fe21a09168483f0f060cf415b8 Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Fri, 31 Oct 2025 17:20:06 +0300 Subject: [PATCH 5/6] fix from review --- .../topicwriterinternal/writer_reconnector.go | 21 +++++++++------ .../context_with_stoppable_timeout.go | 13 +++++++--- .../context_with_stoppable_timeout_test.go | 26 +++++++++++++++++++ 3 files changed, 48 insertions(+), 12 deletions(-) diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index fad264a43..701f759af 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -440,7 +440,7 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) { attempt, ) - writer, err := w.startWriteStream(ctx, streamCtx) + writer, err := w.startWriteStream(streamCtx) w.onWriterChange(writer) onStreamError := onWriterStarted(err) if err == nil { @@ -485,12 +485,17 @@ func (w *WriterReconnector) handleReconnectRetry( return false } -func (w *WriterReconnector) startWriteStream(ctx, streamCtx context.Context) ( - writer *SingleStreamWriter, - err error, -) { - connectCtx, stopConnectCtx := xcontext.WithStoppableTimeoutCause(streamCtx, w.cfg.connectTimeout, errConnTimeout) - defer stopConnectCtx() +func (w *WriterReconnector) startWriteStream(ctx context.Context) (writer *SingleStreamWriter, err error) { + // connectCtx with timeout applies only to the connection phase, + // allowing the main stream context to remain active after exiting this method + connectCtx, stopConnectCtx := xcontext.WithStoppableTimeoutCause(ctx, w.cfg.connectTimeout, errConnTimeout) + defer func() { + // If the context was cancelled during connection (the stream was cancelled), + // we should return a timeout error + if err == nil && !stopConnectCtx() { + err = context.Cause(connectCtx) + } + }() stream, err := w.connectWithTimeout(connectCtx) if err != nil { @@ -499,7 +504,7 @@ func (w *WriterReconnector) startWriteStream(ctx, streamCtx context.Context) ( w.queue.ResetSentProgress() - return NewSingleStreamWriter(ctx, w.createWriterStreamConfig(stream)) + return NewSingleStreamWriter(connectCtx, w.createWriterStreamConfig(stream)) } func (w *WriterReconnector) needReceiveLastSeqNo() bool { diff --git a/internal/xcontext/context_with_stoppable_timeout.go b/internal/xcontext/context_with_stoppable_timeout.go index a06cf1680..d3b9a940f 100644 --- a/internal/xcontext/context_with_stoppable_timeout.go +++ b/internal/xcontext/context_with_stoppable_timeout.go @@ -9,14 +9,19 @@ import ( // the specified cause after timeout elapses, and a stop function. Calling the stop function // prevents the timeout from canceling the context and releases resources associated with it. // The cause error will be used when the timeout triggers cancellation. -func WithStoppableTimeoutCause(ctx context.Context, timeout time.Duration, cause error) (context.Context, func()) { +// +// The returned stop function returns a boolean value: +// - true if the timeout was successfully stopped before it fired (context was not cancelled by timeout) +// - false if the timeout already fired and the context was cancelled with the specified cause +func WithStoppableTimeoutCause(ctx context.Context, timeout time.Duration, cause error) (context.Context, func() bool) { ctxWithCancel, cancel := context.WithCancelCause(ctx) timeoutCtx, cancelTimeout := WithTimeout(ctx, timeout) stop := context.AfterFunc(timeoutCtx, func() { cancel(cause) }) - return ctxWithCancel, func() { - stop() - cancelTimeout() + return ctxWithCancel, func() bool { + defer cancelTimeout() + + return stop() } } diff --git a/internal/xcontext/context_with_stoppable_timeout_test.go b/internal/xcontext/context_with_stoppable_timeout_test.go index 6c7dd3374..84ba9de8f 100644 --- a/internal/xcontext/context_with_stoppable_timeout_test.go +++ b/internal/xcontext/context_with_stoppable_timeout_test.go @@ -38,4 +38,30 @@ func TestWithStoppableTimeoutCause(t *testing.T) { t.Fatal("context shouldn't be canceled") } }) + + synctest.Test(t, func(t *testing.T) { + _, stop := xcontext.WithStoppableTimeoutCause(context.Background(), 10*time.Second, wantErr) + + time.Sleep(1 * time.Second) + + assert.True(t, stop()) + }) + + synctest.Test(t, func(t *testing.T) { + _, stop := xcontext.WithStoppableTimeoutCause(context.Background(), 10*time.Second, wantErr) + + time.Sleep(1 * time.Second) + + stop() + + assert.False(t, stop()) + }) + + synctest.Test(t, func(t *testing.T) { + _, stop := xcontext.WithStoppableTimeoutCause(context.Background(), 10*time.Second, wantErr) + + time.Sleep(15 * time.Second) + + assert.False(t, stop()) + }) } From 28476894d9555e2f60f637012fcd2710f81f5212 Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Sat, 1 Nov 2025 15:56:08 +0300 Subject: [PATCH 6/6] fix from review --- internal/topic/topicwriterinternal/writer_reconnector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index 701f759af..266c12e74 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -492,7 +492,7 @@ func (w *WriterReconnector) startWriteStream(ctx context.Context) (writer *Singl defer func() { // If the context was cancelled during connection (the stream was cancelled), // we should return a timeout error - if err == nil && !stopConnectCtx() { + if !stopConnectCtx() && err == nil { err = context.Cause(connectCtx) } }()