Skip to content

Commit 4b40c7b

Browse files
authored
Merge branch 'master' into explicit_ip_support_rc1
2 parents 2f8cd43 + 14ec2f2 commit 4b40c7b

36 files changed

+757
-211
lines changed

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
* Support for ip discovery. Server can show own ip address and target hostname in the ListEndpoint message. These fields are used to bypass DNS resolving.
1+
* Added ip discovery. Server can show own ip address and target hostname in the ListEndpoint message. These fields are used to bypass DNS resolving.
2+
3+
## v3.81.0
4+
* Added error ErrMessagesPutToInternalQueueBeforeError to topic writer
5+
* Added write to topics within transactions
26

37
## v3.80.10
48
* Added `ydb.WithSessionPoolSessionUsageLimit()` option for limitation max count of session usage

examples/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ require (
5151
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
5252
github.com/syndtr/goleveldb v1.0.0 // indirect
5353
github.com/yandex-cloud/go-genproto v0.0.0-20220815090733-4c139c0154e2 // indirect
54-
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240528144234-5d5a685e41f7 // indirect
54+
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240920120314-0fed943b0136 // indirect
5555
github.com/ydb-platform/ydb-go-yc-metadata v0.6.1 // indirect
5656
golang.org/x/crypto v0.24.0 // indirect
5757
golang.org/x/mod v0.17.0 // indirect

examples/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1842,6 +1842,8 @@ github.com/ydb-platform/xorm v0.0.3 h1:MXk42lANB6r/MMLg/XdJfyXJycGUDlCeLiMlLGDKV
18421842
github.com/ydb-platform/xorm v0.0.3/go.mod h1:hFsU7EUF0o3S+l5c0eyP2yPVjJ0d4gsFdqCsyazzwBc=
18431843
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240528144234-5d5a685e41f7 h1:nL8XwD6fSst7xFUirkaWJmE7kM0CdWRYgu6+YQer1d4=
18441844
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240528144234-5d5a685e41f7/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
1845+
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240920120314-0fed943b0136 h1:MO32/Cba3XpNYWcoz3y13eHZG+RzDHmFPry3ren6BmE=
1846+
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240920120314-0fed943b0136/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
18451847
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.3.0 h1:JxSvw+Moont8qCmibP2MjSEIHfkWJLkw0fHZemAk+d0=
18461848
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.3.0/go.mod h1:YzCPoNrTbrXZg9bO2YkbjI6eQLkaRIE9Bq8ponu0g8A=
18471849
github.com/ydb-platform/ydb-go-sdk-prometheus/v2 v2.0.1 h1:Lsir3AC2VQOTlp8UjZY9zQdCVfWvBNHT3hZn+jSGoo0=
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package topicwriter
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
8+
"github.com/ydb-platform/ydb-go-sdk/v3"
9+
"github.com/ydb-platform/ydb-go-sdk/v3/query"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
11+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter"
12+
)
13+
14+
func CopyMessagesBetweenTopicsTxWriter(
15+
ctx context.Context,
16+
db *ydb.Driver,
17+
reader *topicreader.Reader,
18+
topic string,
19+
) error {
20+
return db.Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error {
21+
writer, err := db.Topic().StartTransactionalWriter(tx, topic)
22+
if err != nil {
23+
return err
24+
}
25+
26+
batch, err := reader.PopMessagesBatchTx(ctx, tx)
27+
if err != nil {
28+
return err
29+
}
30+
for _, mess := range batch.Messages {
31+
if err = writer.Write(ctx, topicwriter.Message{Data: mess}); err != nil {
32+
return err
33+
}
34+
}
35+
36+
return nil
37+
}, query.WithIdempotent())
38+
}
39+
40+
func TableAndTopicWithinTransaction(
41+
ctx context.Context,
42+
db *ydb.Driver,
43+
topicPath string,
44+
id int64,
45+
) error {
46+
return db.Query().DoTx(ctx, func(ctx context.Context, t query.TxActor) error {
47+
row, err := t.QueryRow(ctx, "SELECT val FROM table WHERE id=$id", query.WithParameters(
48+
ydb.ParamsBuilder().
49+
Param("$id").Int64(id).
50+
Build()))
51+
if err != nil {
52+
return err
53+
}
54+
55+
var val int64
56+
if err = row.Scan(&val); err != nil {
57+
return err
58+
}
59+
60+
// the writer is dedicated for the transaction, it can't be used outside the transaction
61+
// it is no needs to close or flush the messages - it happened internally on transaction commit
62+
writer, err := db.Topic().StartTransactionalWriter(t, topicPath)
63+
if err != nil {
64+
return err
65+
}
66+
67+
err = writer.Write(ctx, topicwriter.Message{
68+
Data: strings.NewReader(fmt.Sprintf("val: %v processed", val)),
69+
})
70+
if err != nil {
71+
return err
72+
}
73+
74+
return nil
75+
})
76+
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ require (
66
github.com/golang-jwt/jwt/v4 v4.4.1
77
github.com/google/uuid v1.6.0
88
github.com/jonboulle/clockwork v0.3.0
9-
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240528144234-5d5a685e41f7
9+
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240920120314-0fed943b0136
1010
golang.org/x/net v0.23.0
1111
golang.org/x/sync v0.6.0
1212
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
6565
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
6666
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
6767
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
68-
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240528144234-5d5a685e41f7 h1:nL8XwD6fSst7xFUirkaWJmE7kM0CdWRYgu6+YQer1d4=
69-
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240528144234-5d5a685e41f7/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
68+
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240920120314-0fed943b0136 h1:MO32/Cba3XpNYWcoz3y13eHZG+RzDHmFPry3ren6BmE=
69+
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240920120314-0fed943b0136/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
7070
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
7171
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
7272
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package rawtopiccommon
2+
3+
import "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
4+
5+
type TransactionIdentity struct {
6+
ID string
7+
Session string
8+
}
9+
10+
func (t TransactionIdentity) ToProto() *Ydb_Topic.TransactionIdentity {
11+
if t.ID == "" && t.Session == "" {
12+
return nil
13+
}
14+
15+
return &Ydb_Topic.TransactionIdentity{
16+
Id: t.ID,
17+
Session: t.Session,
18+
}
19+
}

internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ type WriteRequest struct {
145145

146146
Messages []MessageData
147147
Codec rawtopiccommon.Codec
148+
Tx rawtopiccommon.TransactionIdentity
148149
}
149150

150151
func (r *WriteRequest) toProto() (p *Ydb_Topic.StreamWriteMessage_FromClient_WriteRequest, err error) {
@@ -161,6 +162,7 @@ func (r *WriteRequest) toProto() (p *Ydb_Topic.StreamWriteMessage_FromClient_Wri
161162
WriteRequest: &Ydb_Topic.StreamWriteMessage_WriteRequest{
162163
Messages: messages,
163164
Codec: int32(r.Codec.ToProto()),
165+
Tx: r.Tx.ToProto(),
164166
},
165167
}
166168

@@ -231,11 +233,13 @@ func (r *WriteResult) GetAcks() (res traceAck) {
231233
}
232234
for i := range r.Acks {
233235
ack := &r.Acks[i]
234-
if ack.MessageWriteStatus.Type == WriteStatusTypeWritten {
236+
switch ack.MessageWriteStatus.Type {
237+
case WriteStatusTypeWritten:
235238
res.WrittenCount++
236-
}
237-
if ack.MessageWriteStatus.Type == WriteStatusTypeSkipped {
239+
case WriteStatusTypeSkipped:
238240
res.SkipCount++
241+
case WriteStatusTypeWrittenInTx:
242+
res.WrittenInTxCount++
239243
}
240244

241245
if ack.SeqNo < res.SeqNoMin {
@@ -261,6 +265,7 @@ type traceAck = struct {
261265
WrittenOffsetMin int64
262266
WrittenOffsetMax int64
263267
WrittenCount int
268+
WrittenInTxCount int
264269
SkipCount int
265270
}
266271

@@ -299,6 +304,12 @@ func (s *MessageWriteStatus) fromProto(status interface{}) error {
299304
s.SkippedReason = WriteStatusSkipReason(v.Skipped.GetReason())
300305

301306
return nil
307+
308+
case *Ydb_Topic.StreamWriteMessage_WriteResponse_WriteAck_WrittenInTx_:
309+
s.Type = WriteStatusTypeWrittenInTx
310+
311+
return nil
312+
302313
default:
303314
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: unexpected write status type: %v", reflect.TypeOf(v))))
304315
}
@@ -307,19 +318,19 @@ func (s *MessageWriteStatus) fromProto(status interface{}) error {
307318
type WriteStatusType int
308319

309320
const (
310-
WriteStatusTypeUnknown WriteStatusType = iota
311-
WriteStatusTypeWritten
321+
WriteStatusTypeWritten WriteStatusType = iota + 1
312322
WriteStatusTypeSkipped
323+
WriteStatusTypeWrittenInTx
313324
)
314325

315326
func (t WriteStatusType) String() string {
316327
switch t {
317-
case WriteStatusTypeUnknown:
318-
return "Unknown"
319328
case WriteStatusTypeSkipped:
320329
return "Skipped"
321330
case WriteStatusTypeWritten:
322331
return "Written"
332+
case WriteStatusTypeWrittenInTx:
333+
return "WrittenInTx"
323334
default:
324335
return strconv.Itoa(int(t))
325336
}

internal/grpcwrapper/rawtopic/update_offset_in_transaction.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,16 @@ import (
99

1010
type UpdateOffsetsInTransactionRequest struct {
1111
OperationParams rawydb.OperationParams
12-
Tx UpdateOffsetsInTransactionRequest_TransactionIdentity
12+
Tx rawtopiccommon.TransactionIdentity
1313
Topics []UpdateOffsetsInTransactionRequest_TopicOffsets
1414
Consumer string
1515
}
1616

1717
func (r *UpdateOffsetsInTransactionRequest) ToProto() *Ydb_Topic.UpdateOffsetsInTransactionRequest {
1818
req := &Ydb_Topic.UpdateOffsetsInTransactionRequest{
1919
OperationParams: r.OperationParams.ToProto(),
20-
Tx: &Ydb_Topic.TransactionIdentity{
21-
Id: r.Tx.ID,
22-
Session: r.Tx.Session,
23-
},
24-
Consumer: r.Consumer,
20+
Tx: r.Tx.ToProto(),
21+
Consumer: r.Consumer,
2522
}
2623

2724
req.Topics = make([]*Ydb_Topic.UpdateOffsetsInTransactionRequest_TopicOffsets, len(r.Topics))
@@ -56,11 +53,6 @@ func (r *UpdateOffsetsInTransactionRequest) ToProto() *Ydb_Topic.UpdateOffsetsIn
5653
return req
5754
}
5855

59-
type UpdateOffsetsInTransactionRequest_TransactionIdentity struct { //nolint:revive,stylecheck
60-
ID string
61-
Session string
62-
}
63-
6456
type UpdateOffsetsInTransactionRequest_TopicOffsets struct { //nolint:revive,stylecheck
6557
Path string // Topic path
6658
Partitions []UpdateOffsetsInTransactionRequest_PartitionOffsets

internal/grpcwrapper/rawtopic/update_offset_in_transaction_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func TestUpdateOffsetsInTransactionRequestToProto(t *testing.T) {
2727
HasValue: true,
2828
},
2929
},
30-
Tx: UpdateOffsetsInTransactionRequest_TransactionIdentity{
30+
Tx: rawtopiccommon.TransactionIdentity{
3131
ID: "tx-id",
3232
Session: "session-id",
3333
},

0 commit comments

Comments
 (0)