Skip to content

Commit 2eecb9d

Browse files
authored
fixed topic.UpdateOffsetsInTransaction (#1864)
1 parent ef44775 commit 2eecb9d

File tree

6 files changed

+41
-0
lines changed

6 files changed

+41
-0
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Fixed an issue where `topic.UpdateOffsetsInTransaction` was executed on a node different from the one where the transaction was running, which could lead to the error "Database coordinators are unavailable"
2+
13
## v3.116.0
24
* Added experimental support for query results in `Apache Arrow`
35
* Fix flaky unit test TestUnboundedChanContextTimeout

internal/query/transaction.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,10 @@ func (tx *Transaction) SessionID() string {
179179
return tx.s.ID()
180180
}
181181

182+
func (tx *Transaction) NodeID() uint32 {
183+
return tx.s.NodeID()
184+
}
185+
182186
func (tx *Transaction) txControl() *baseTx.Control {
183187
if tx.ID() != baseTx.LazyTxID {
184188
return baseTx.NewControl(baseTx.WithTxID(tx.ID()))

internal/topic/topicreaderinternal/stream_reader_impl.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
1616
"github.com/ydb-platform/ydb-go-sdk/v3/internal/background"
17+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1718
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic"
1819
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
1920
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
@@ -245,6 +246,10 @@ func (r *topicStreamReaderImpl) commitWithTransaction(
245246
onDone(err)
246247
}()
247248

249+
// UpdateOffsetsInTransaction operation must be executed on the same Node where the transaction was initiated.
250+
// Otherwise, errors such as `Database coordinators are unavailable` may occur.
251+
ctx = endpoint.WithNodeID(ctx, tx.NodeID())
252+
248253
err = r.topicClient.UpdateOffsetsInTransaction(ctx, req)
249254

250255
return err

internal/topic/topicreaderinternal/stream_reader_impl_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"go.uber.org/mock/gomock"
1515

1616
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
17+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1718
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic"
1819
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
1920
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
@@ -1420,4 +1421,27 @@ func TestUpdateCommitInTransaction(t *testing.T) {
14201421
require.True(t, txMock.RolledBack)
14211422
require.True(t, txMock.materialized)
14221423
})
1424+
t.Run("UpdateOffsetsInTransaction must be executed on the tx Node", func(t *testing.T) {
1425+
e := newTopicReaderTestEnv(t)
1426+
e.Start()
1427+
1428+
txMock := newMockTransactionWrapper("test-session-id", "test-tx-id")
1429+
txMock.nodeID = 123
1430+
1431+
e.TopicClient.EXPECT().UpdateOffsetsInTransaction(gomock.Any(), gomock.Any()).DoAndReturn(
1432+
func(ctx context.Context, _ *rawtopic.UpdateOffsetsInTransactionRequest) error {
1433+
nodeID, ok := endpoint.ContextNodeID(ctx)
1434+
1435+
require.True(t, ok)
1436+
require.Equal(t, uint32(123), nodeID)
1437+
1438+
return nil
1439+
})
1440+
1441+
batch, err := topicreadercommon.NewBatch(e.partitionSession, nil)
1442+
require.NoError(t, err)
1443+
1444+
err = e.reader.commitWithTransaction(e.ctx, txMock, batch)
1445+
require.NoError(t, err)
1446+
})
14231447
}

internal/topic/topicreaderinternal/transaction_mock_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type mockTransaction struct {
2121
materializedID tx.Identifier
2222
materialized bool
2323
sessionID string
24+
nodeID uint32
2425
onCompleted []tx.OnTransactionCompletedFunc
2526
RolledBack bool
2627
}
@@ -36,6 +37,10 @@ func (m *mockTransaction) SessionID() string {
3637
return m.sessionID
3738
}
3839

40+
func (m *mockTransaction) NodeID() uint32 {
41+
return m.nodeID
42+
}
43+
3944
func (m *mockTransaction) OnBeforeCommit(f tx.OnTransactionBeforeCommit) {
4045
}
4146

internal/tx/transaction.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ type Transaction interface {
1111
Identifier
1212
UnLazy(ctx context.Context) error
1313
SessionID() string
14+
NodeID() uint32
1415

1516
// OnBeforeCommit add callback, which will be called before commit transaction
1617
// the method will be not call the method if some error happen and transaction will not be committed

0 commit comments

Comments
 (0)