Skip to content

Commit 98f75f0

Browse files
authored
Merge pull request #1479 Write message in transaction
2 parents daf61dd + 1c9e3b5 commit 98f75f0

29 files changed

+702
-184
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added write to topics within transactions
2+
13
## v3.80.10
24
* Added `ydb.WithSessionPoolSessionUsageLimit()` option for limitation max count of session usage
35
* Refactored experimental topic iterators in `topicsugar` package
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package topicwriter
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
8+
"github.com/ydb-platform/ydb-go-sdk/v3"
9+
"github.com/ydb-platform/ydb-go-sdk/v3/query"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
11+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter"
12+
)
13+
14+
func CopyMessagesBetweenTopicsTxWriter(
15+
ctx context.Context,
16+
db *ydb.Driver,
17+
reader *topicreader.Reader,
18+
topic string,
19+
) error {
20+
return db.Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error {
21+
writer, err := db.Topic().StartTransactionalWriter(tx, topic)
22+
if err != nil {
23+
return err
24+
}
25+
26+
batch, err := reader.PopMessagesBatchTx(ctx, tx)
27+
if err != nil {
28+
return err
29+
}
30+
for _, mess := range batch.Messages {
31+
if err = writer.Write(ctx, topicwriter.Message{Data: mess}); err != nil {
32+
return err
33+
}
34+
}
35+
36+
return nil
37+
}, query.WithIdempotent())
38+
}
39+
40+
func TableAndTopicWithinTransaction(
41+
ctx context.Context,
42+
db *ydb.Driver,
43+
topicPath string,
44+
id int64,
45+
) error {
46+
return db.Query().DoTx(ctx, func(ctx context.Context, t query.TxActor) error {
47+
row, err := t.QueryRow(ctx, "SELECT val FROM table WHERE id=$id", query.WithParameters(
48+
ydb.ParamsBuilder().
49+
Param("$id").Int64(id).
50+
Build()))
51+
if err != nil {
52+
return err
53+
}
54+
55+
var val int64
56+
if err = row.Scan(&val); err != nil {
57+
return err
58+
}
59+
60+
// the writer is dedicated for the transaction, it can't be used outside the transaction
61+
// it is no needs to close or flush the messages - it happened internally on transaction commit
62+
writer, err := db.Topic().StartTransactionalWriter(t, topicPath)
63+
if err != nil {
64+
return err
65+
}
66+
67+
err = writer.Write(ctx, topicwriter.Message{
68+
Data: strings.NewReader(fmt.Sprintf("val: %v processed", val)),
69+
})
70+
if err != nil {
71+
return err
72+
}
73+
74+
return nil
75+
})
76+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package rawtopiccommon
2+
3+
import "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
4+
5+
type TransactionIdentity struct {
6+
ID string
7+
Session string
8+
}
9+
10+
func (t TransactionIdentity) ToProto() *Ydb_Topic.TransactionIdentity {
11+
if t.ID == "" && t.Session == "" {
12+
return nil
13+
}
14+
15+
return &Ydb_Topic.TransactionIdentity{
16+
Id: t.ID,
17+
Session: t.Session,
18+
}
19+
}

internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ type WriteRequest struct {
145145

146146
Messages []MessageData
147147
Codec rawtopiccommon.Codec
148+
Tx rawtopiccommon.TransactionIdentity
148149
}
149150

150151
func (r *WriteRequest) toProto() (p *Ydb_Topic.StreamWriteMessage_FromClient_WriteRequest, err error) {
@@ -161,6 +162,7 @@ func (r *WriteRequest) toProto() (p *Ydb_Topic.StreamWriteMessage_FromClient_Wri
161162
WriteRequest: &Ydb_Topic.StreamWriteMessage_WriteRequest{
162163
Messages: messages,
163164
Codec: int32(r.Codec.ToProto()),
165+
Tx: r.Tx.ToProto(),
164166
},
165167
}
166168

@@ -231,11 +233,13 @@ func (r *WriteResult) GetAcks() (res traceAck) {
231233
}
232234
for i := range r.Acks {
233235
ack := &r.Acks[i]
234-
if ack.MessageWriteStatus.Type == WriteStatusTypeWritten {
236+
switch ack.MessageWriteStatus.Type {
237+
case WriteStatusTypeWritten:
235238
res.WrittenCount++
236-
}
237-
if ack.MessageWriteStatus.Type == WriteStatusTypeSkipped {
239+
case WriteStatusTypeSkipped:
238240
res.SkipCount++
241+
case WriteStatusTypeWrittenInTx:
242+
res.WrittenInTxCount++
239243
}
240244

241245
if ack.SeqNo < res.SeqNoMin {
@@ -261,6 +265,7 @@ type traceAck = struct {
261265
WrittenOffsetMin int64
262266
WrittenOffsetMax int64
263267
WrittenCount int
268+
WrittenInTxCount int
264269
SkipCount int
265270
}
266271

@@ -299,6 +304,12 @@ func (s *MessageWriteStatus) fromProto(status interface{}) error {
299304
s.SkippedReason = WriteStatusSkipReason(v.Skipped.GetReason())
300305

301306
return nil
307+
308+
case *Ydb_Topic.StreamWriteMessage_WriteResponse_WriteAck_WrittenInTx_:
309+
s.Type = WriteStatusTypeWrittenInTx
310+
311+
return nil
312+
302313
default:
303314
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: unexpected write status type: %v", reflect.TypeOf(v))))
304315
}
@@ -307,19 +318,19 @@ func (s *MessageWriteStatus) fromProto(status interface{}) error {
307318
type WriteStatusType int
308319

309320
const (
310-
WriteStatusTypeUnknown WriteStatusType = iota
311-
WriteStatusTypeWritten
321+
WriteStatusTypeWritten WriteStatusType = iota + 1
312322
WriteStatusTypeSkipped
323+
WriteStatusTypeWrittenInTx
313324
)
314325

315326
func (t WriteStatusType) String() string {
316327
switch t {
317-
case WriteStatusTypeUnknown:
318-
return "Unknown"
319328
case WriteStatusTypeSkipped:
320329
return "Skipped"
321330
case WriteStatusTypeWritten:
322331
return "Written"
332+
case WriteStatusTypeWrittenInTx:
333+
return "WrittenInTx"
323334
default:
324335
return strconv.Itoa(int(t))
325336
}

internal/grpcwrapper/rawtopic/update_offset_in_transaction.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,16 @@ import (
99

1010
type UpdateOffsetsInTransactionRequest struct {
1111
OperationParams rawydb.OperationParams
12-
Tx UpdateOffsetsInTransactionRequest_TransactionIdentity
12+
Tx rawtopiccommon.TransactionIdentity
1313
Topics []UpdateOffsetsInTransactionRequest_TopicOffsets
1414
Consumer string
1515
}
1616

1717
func (r *UpdateOffsetsInTransactionRequest) ToProto() *Ydb_Topic.UpdateOffsetsInTransactionRequest {
1818
req := &Ydb_Topic.UpdateOffsetsInTransactionRequest{
1919
OperationParams: r.OperationParams.ToProto(),
20-
Tx: &Ydb_Topic.TransactionIdentity{
21-
Id: r.Tx.ID,
22-
Session: r.Tx.Session,
23-
},
24-
Consumer: r.Consumer,
20+
Tx: r.Tx.ToProto(),
21+
Consumer: r.Consumer,
2522
}
2623

2724
req.Topics = make([]*Ydb_Topic.UpdateOffsetsInTransactionRequest_TopicOffsets, len(r.Topics))
@@ -56,11 +53,6 @@ func (r *UpdateOffsetsInTransactionRequest) ToProto() *Ydb_Topic.UpdateOffsetsIn
5653
return req
5754
}
5855

59-
type UpdateOffsetsInTransactionRequest_TransactionIdentity struct { //nolint:revive,stylecheck
60-
ID string
61-
Session string
62-
}
63-
6456
type UpdateOffsetsInTransactionRequest_TopicOffsets struct { //nolint:revive,stylecheck
6557
Path string // Topic path
6658
Partitions []UpdateOffsetsInTransactionRequest_PartitionOffsets

internal/grpcwrapper/rawtopic/update_offset_in_transaction_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func TestUpdateOffsetsInTransactionRequestToProto(t *testing.T) {
2727
HasValue: true,
2828
},
2929
},
30-
Tx: UpdateOffsetsInTransactionRequest_TransactionIdentity{
30+
Tx: rawtopiccommon.TransactionIdentity{
3131
ID: "tx-id",
3232
Session: "session-id",
3333
},

internal/query/transaction.go

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ type (
3535

3636
completed bool
3737

38-
onCompleted xsync.Set[*baseTx.OnTransactionCompletedFunc]
38+
onBeforeCommit xsync.Set[*baseTx.OnTransactionBeforeCommit]
39+
onCompleted xsync.Set[*baseTx.OnTransactionCompletedFunc]
3940
}
4041
)
4142

@@ -100,6 +101,11 @@ func (tx *Transaction) QueryResultSet(
100101
}),
101102
}
102103
if settings.TxControl().Commit {
104+
err = tx.waitOnBeforeCommit(ctx)
105+
if err != nil {
106+
return nil, err
107+
}
108+
103109
// notification about complete transaction must be sended for any error or for successfully read all result if
104110
// it was execution with commit flag
105111
resultOpts = append(resultOpts,
@@ -144,6 +150,11 @@ func (tx *Transaction) QueryRow(
144150
}),
145151
}
146152
if settings.TxControl().Commit {
153+
err := tx.waitOnBeforeCommit(ctx)
154+
if err != nil {
155+
return nil, err
156+
}
157+
147158
// notification about complete transaction must be sended for any error or for successfully read all result if
148159
// it was execution with commit flag
149160
resultOpts = append(resultOpts,
@@ -204,6 +215,11 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu
204215
}),
205216
}
206217
if settings.TxControl().Commit {
218+
err = tx.waitOnBeforeCommit(ctx)
219+
if err != nil {
220+
return err
221+
}
222+
207223
// notification about complete transaction must be sended for any error or for successfully read all result if
208224
// it was execution with commit flag
209225
resultOpts = append(resultOpts,
@@ -268,6 +284,11 @@ func (tx *Transaction) Query(ctx context.Context, q string, opts ...options.Exec
268284
}),
269285
}
270286
if settings.TxControl().Commit {
287+
err = tx.waitOnBeforeCommit(ctx)
288+
if err != nil {
289+
return nil, err
290+
}
291+
271292
// notification about complete transaction must be sended for any error or for successfully read all result if
272293
// it was execution with commit flag
273294
resultOpts = append(resultOpts,
@@ -310,7 +331,12 @@ func (tx *Transaction) CommitTx(ctx context.Context) (finalErr error) {
310331
tx.completed = true
311332
}()
312333

313-
err := commitTx(ctx, tx.s.client, tx.s.ID(), tx.ID())
334+
err := tx.waitOnBeforeCommit(ctx)
335+
if err != nil {
336+
return err
337+
}
338+
339+
err = commitTx(ctx, tx.s.client, tx.s.ID(), tx.ID())
314340
if err != nil {
315341
if xerrors.IsOperationError(err, Ydb.StatusIds_BAD_SESSION) {
316342
tx.s.SetStatus(session.StatusClosed)
@@ -360,10 +386,24 @@ func (tx *Transaction) Rollback(ctx context.Context) (finalErr error) {
360386
return nil
361387
}
362388

389+
func (tx *Transaction) OnBeforeCommit(f baseTx.OnTransactionBeforeCommit) {
390+
tx.onBeforeCommit.Add(&f)
391+
}
392+
363393
func (tx *Transaction) OnCompleted(f baseTx.OnTransactionCompletedFunc) {
364394
tx.onCompleted.Add(&f)
365395
}
366396

397+
func (tx *Transaction) waitOnBeforeCommit(ctx context.Context) (resErr error) {
398+
tx.onBeforeCommit.Range(func(f *baseTx.OnTransactionBeforeCommit) bool {
399+
resErr = (*f)(ctx)
400+
401+
return resErr == nil
402+
})
403+
404+
return resErr
405+
}
406+
367407
func (tx *Transaction) notifyOnCompleted(err error) {
368408
tx.completed = true
369409

0 commit comments

Comments
 (0)