Skip to content

Commit 50bb93f

Browse files
authored
Merge pull request #1354 Read messages within transaction
2 parents 84c5c58 + 722969f commit 50bb93f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1475
-78
lines changed

.github/workflows/tests.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ jobs:
116116
YDB_LOCAL_SURVIVE_RESTART: true
117117
YDB_USE_IN_MEMORY_PDISKS: true
118118
YDB_TABLE_ENABLE_PREPARED_DDL: true
119+
YDB_FEATURE_FLAGS: enable_topic_service_tx
119120
options: '-h localhost'
120121
env:
121122
OS: ubuntu-latest

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added PopMessages from topic within transaction
2+
13
## v3.76.3
24
* Changed interface `table.TransactionIdentifier` (added private method) for prohibition of any implementations outside ydb-go-sdk
35

examples/topic/topicreader/stubs.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func externalSystemUnlock(ctx context.Context, topic string, partition int64) er
2222
panic("example stub")
2323
}
2424

25-
func processBatch(ctx context.Context, batch *topicreader.Batch) {
25+
func processBatch(ctx context.Context, batch *topicreader.Batch) (result int, err error) {
2626
// recommend derive ctx from batch.Context() for handle signal about stop message processing
2727
panic("example stub")
2828
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package topicreaderexamples
2+
3+
import (
4+
"context"
5+
6+
"github.com/ydb-platform/ydb-go-sdk/v3"
7+
"github.com/ydb-platform/ydb-go-sdk/v3/query"
8+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
9+
)
10+
11+
func CommitMessagesToTransaction(ctx context.Context, db *ydb.Driver, reader *topicreader.Reader) error {
12+
for { // loop
13+
if ctx.Err() != nil {
14+
return ctx.Err()
15+
}
16+
17+
err := db.Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error {
18+
batch, err := reader.PopBatchTx(ctx, tx) // the batch will be committed with commit the tx
19+
if err != nil {
20+
return err
21+
}
22+
id := batch.Messages[0].MessageGroupID //nolint:staticcheck
23+
24+
batchResult, err := processBatch(batch.Context(), batch)
25+
if err != nil {
26+
return err
27+
}
28+
29+
_, err = tx.Execute(ctx, `
30+
$last = SELECT MAX(val) FROM table WHERE id=$id;
31+
UPSERT INTO t (id, val) VALUES($id, COALESCE($last, 0) + $value)
32+
`, query.WithParameters(
33+
ydb.ParamsBuilder().
34+
Param("$id").Text(id).
35+
Param("$value").Int64(int64(batchResult)).
36+
Build(),
37+
))
38+
39+
return err
40+
})
41+
if err != nil {
42+
return err
43+
}
44+
}
45+
}
46+
47+
func PopWithTransaction(ctx context.Context, db *ydb.Driver, reader *topicreader.Reader) error {
48+
for { // loop
49+
if ctx.Err() != nil {
50+
return ctx.Err()
51+
}
52+
53+
err := db.Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error {
54+
batch, err := reader.PopBatchTx(ctx, tx)
55+
if err != nil {
56+
return err
57+
}
58+
id := batch.Messages[0].MessageGroupID //nolint:staticcheck
59+
60+
batchResult, err := processBatch(batch.Context(), batch)
61+
if err != nil {
62+
return err
63+
}
64+
65+
_, err = tx.Execute(ctx, `
66+
$last = SELECT MAX(val) FROM table WHERE id=$id;
67+
UPSERT INTO t (id, val) VALUES($id, COALESCE($last, 0) + $value)
68+
`, query.WithParameters(
69+
ydb.ParamsBuilder().
70+
Param("$id").Text(id).
71+
Param("$value").Int64(int64(batchResult)).
72+
Build(),
73+
))
74+
if err != nil {
75+
return err
76+
}
77+
78+
return nil
79+
})
80+
if err != nil {
81+
return err
82+
}
83+
}
84+
}
85+
86+
func PopWithTransactionRecreateReader(
87+
ctx context.Context,
88+
db *ydb.Driver,
89+
readerFabric func(ctx context.Context, db *ydb.Driver) (reader *topicreader.Reader, err error),
90+
) error {
91+
// second loop - for retries
92+
err := db.Query().Do(ctx, func(ctx context.Context, s query.Session) error {
93+
reader, err := readerFabric(ctx, db)
94+
if err != nil {
95+
return err
96+
}
97+
defer reader.Close(ctx)
98+
99+
for { // loop
100+
tx, err := s.Begin(ctx, nil)
101+
if err != nil {
102+
return err
103+
}
104+
105+
batch, err := reader.PopBatchTx(ctx, tx)
106+
if err != nil {
107+
return err
108+
}
109+
id := batch.Messages[0].MessageGroupID //nolint:staticcheck
110+
111+
batchResult, err := processBatch(batch.Context(), batch)
112+
if err != nil {
113+
return err
114+
}
115+
116+
_, err = tx.Execute(ctx, `
117+
$last = SELECT MAX(val) FROM table WHERE id=$id;
118+
UPSERT INTO t (id, val) VALUES($id, COALESCE($last, 0) + $value)
119+
`,
120+
query.WithParameters(
121+
ydb.ParamsBuilder().
122+
Param("$id").Text(id).
123+
Param("$value").Int64(int64(batchResult)).
124+
Build(),
125+
))
126+
if err != nil {
127+
return err
128+
}
129+
130+
if err = tx.CommitTx(ctx); err != nil {
131+
return err
132+
}
133+
}
134+
})
135+
136+
return err
137+
}

examples/topic/topicreader/topicreader_advanced.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ func ReadMessagesWithCustomBatching(ctx context.Context, db *ydb.Driver) {
1616

1717
for {
1818
batch, _ := reader.ReadMessagesBatch(ctx)
19-
processBatch(batch.Context(), batch)
19+
_, _ = processBatch(batch.Context(), batch)
2020
_ = reader.Commit(batch.Context(), batch)
2121
}
2222
}
@@ -58,7 +58,7 @@ func ProcessMessagesWithSyncCommit(ctx context.Context, db *ydb.Driver) {
5858

5959
for {
6060
batch, _ := reader.ReadMessagesBatch(ctx)
61-
processBatch(batch.Context(), batch)
61+
_, _ = processBatch(batch.Context(), batch)
6262
_ = reader.Commit(ctx, batch) // will wait response about commit from server
6363
}
6464
}
@@ -86,7 +86,7 @@ func OwnReadProgressStorage(ctx context.Context, db *ydb.Driver) {
8686
for {
8787
batch, _ := reader.ReadMessagesBatch(ctx)
8888

89-
processBatch(batch.Context(), batch)
89+
_, _ = processBatch(batch.Context(), batch)
9090
_ = externalSystemCommit(
9191
batch.Context(),
9292
batch.Topic(),

examples/topic/topicreader/topicreader_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func (h *TopicEventsHandler) OnReadMessages(
4848
log.Printf("Receive message: %v/%v/%v", mess.Topic(), mess.PartitionID(), mess.SeqNo)
4949
}
5050

51-
processBatch(ctx, event.Batch)
51+
_, _ = processBatch(ctx, event.Batch)
5252

5353
return nil
5454
}

examples/topic/topicreader/topicreader_show.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ func PartitionStopHandled(ctx context.Context, reader *topicreader.Reader) {
1515
}
1616

1717
batchContext := batch.Context() // batch.Context() will cancel when partition revoke by server or connection broke
18-
processBatch(batchContext, batch)
18+
_, _ = processBatch(batchContext, batch)
1919
}
2020

2121
// PartitionGracefulStopHandled is example of sdk handle server signal about graceful stop partition
@@ -24,7 +24,7 @@ func PartitionGracefulStopHandled(ctx context.Context, db *ydb.Driver) {
2424

2525
for {
2626
batch, _ := reader.ReadMessagesBatch(ctx) // <- if partition soft stop batch can be less, then 1000
27-
processBatch(batch.Context(), batch)
27+
_, _ = processBatch(batch.Context(), batch)
2828
_ = reader.Commit(batch.Context(), batch)
2929
}
3030
}

examples/topic/topicreader/topicreader_simple.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func PrintMessageContent(ctx context.Context, reader *topicreader.Reader) {
2525
func ReadMessagesByBatch(ctx context.Context, reader *topicreader.Reader) {
2626
for {
2727
batch, _ := reader.ReadMessagesBatch(ctx)
28-
processBatch(batch.Context(), batch)
28+
_, _ = processBatch(batch.Context(), batch)
2929
_ = reader.Commit(batch.Context(), batch)
3030
}
3131
}

examples/topic/topicreader/topicreader_trace.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func ExplicitPartitionStartStopHandler(ctx context.Context, db *ydb.Driver) {
7373
for {
7474
batch, _ := reader.ReadMessagesBatch(readContext)
7575

76-
processBatch(batch.Context(), batch)
76+
_, _ = processBatch(batch.Context(), batch)
7777
_ = externalSystemCommit(
7878
batch.Context(),
7979
batch.Topic(),
@@ -145,7 +145,7 @@ func PartitionStartStopHandlerAndOwnReadProgressStorage(ctx context.Context, db
145145
for {
146146
batch, _ := r.ReadMessagesBatch(readContext)
147147

148-
processBatch(batch.Context(), batch)
148+
_, _ = processBatch(batch.Context(), batch)
149149
_ = externalSystemCommit(batch.Context(), batch.Topic(), batch.PartitionID(), getEndOffset(batch))
150150
}
151151
}

internal/grpcwrapper/rawtopic/client.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
1010
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter"
11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1213
)
1314

@@ -92,3 +93,17 @@ func (c *Client) StreamWrite(ctxStreamLifeTime context.Context) (*rawtopicwriter
9293

9394
return &rawtopicwriter.StreamWriter{Stream: protoResp}, nil
9495
}
96+
97+
func (c *Client) UpdateOffsetsInTransaction(
98+
ctx context.Context,
99+
req *UpdateOffsetsInTransactionRequest,
100+
) error {
101+
protoResp, err := c.service.UpdateOffsetsInTransaction(ctx, req.ToProto())
102+
if err != nil {
103+
return xerrors.WithStackTrace(fmt.Errorf("ydb: update offsets in transaction failed: %w", err))
104+
}
105+
106+
var operation rawydb.Operation
107+
108+
return operation.FromProtoWithStatusCheck(protoResp.GetOperation())
109+
}

0 commit comments

Comments
 (0)