Skip to content

Commit 9e5787d

Browse files
authored
Merge pull request #836 remove a lot of expirements comments.
2 parents 90a231a + 3eae236 commit 9e5787d

File tree

26 files changed

+233
-867
lines changed

26 files changed

+233
-867
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
* Removed almost all experimental marks from topic api.
2+
* Rename some topic APIs (old names was deprecated and will be removed in one of next versions).
3+
* Deprecated topic options (the option will be removed): min size of read messages batch
4+
* Deprecated WithOnWriterFirstConnected callback, use Writer.WaitInitInfo instead.
5+
* Changed topic Codec base type from int to int32 (was experimental code)
16
* Added `WaitInit` and `WaitInitInfo` method to the topic reader and writer
27

38
## v3.52.2

examples/topic/topicreader/topicreader_advanced.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,10 @@ import (
1212

1313
// ReadMessagesWithCustomBatching example of custom of readed message batch
1414
func ReadMessagesWithCustomBatching(ctx context.Context, db *ydb.Driver) {
15-
reader, _ := db.Topic().StartReader("consumer", nil,
16-
topicoptions.WithBatchReadMinCount(1000),
17-
)
15+
reader, _ := db.Topic().StartReader("consumer", nil)
1816

1917
for {
20-
batch, _ := reader.ReadMessageBatch(ctx)
18+
batch, _ := reader.ReadMessagesBatch(ctx)
2119
processBatch(batch.Context(), batch)
2220
_ = reader.Commit(batch.Context(), batch)
2321
}

examples/topic/topicreader/topicreader_show.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,12 @@ import (
44
"context"
55

66
"github.com/ydb-platform/ydb-go-sdk/v3"
7-
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
87
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
98
)
109

1110
// PartitionStopHandled is example of sdk handle server signal about stop partition
1211
func PartitionStopHandled(ctx context.Context, reader *topicreader.Reader) {
13-
batch, _ := reader.ReadMessageBatch(ctx)
12+
batch, _ := reader.ReadMessagesBatch(ctx)
1413
if len(batch.Messages) == 0 {
1514
return
1615
}
@@ -21,12 +20,10 @@ func PartitionStopHandled(ctx context.Context, reader *topicreader.Reader) {
2120

2221
// PartitionGracefulStopHandled is example of sdk handle server signal about graceful stop partition
2322
func PartitionGracefulStopHandled(ctx context.Context, db *ydb.Driver) {
24-
reader, _ := db.Topic().StartReader("consumer", nil,
25-
topicoptions.WithBatchReadMinCount(1000),
26-
)
23+
reader, _ := db.Topic().StartReader("consumer", nil)
2724

2825
for {
29-
batch, _ := reader.ReadMessageBatch(ctx) // <- if partition soft stop batch can be less, then 1000
26+
batch, _ := reader.ReadMessagesBatch(ctx) // <- if partition soft stop batch can be less, then 1000
3027
processBatch(batch.Context(), batch)
3128
_ = reader.Commit(batch.Context(), batch)
3229
}

internal/grpcwrapper/rawtopic/rawtopiccommon/codec.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
)
66

77
// Codec any int value, for example for custom codec
8-
type Codec int
8+
type Codec int32
99

1010
const (
1111
CodecUNSPECIFIED Codec = iota

internal/topic/topicclientinternal/client.go

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -55,20 +55,12 @@ func newTopicConfig(opts ...topicoptions.TopicOption) topic.Config {
5555
return c
5656
}
5757

58-
// Close
59-
//
60-
// # Experimental
61-
//
62-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
58+
// Close the client
6359
func (c *Client) Close(_ context.Context) error {
6460
return nil
6561
}
6662

6763
// Alter topic options
68-
//
69-
// # Experimental
70-
//
71-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
7264
func (c *Client) Alter(ctx context.Context, path string, opts ...topicoptions.AlterOption) error {
7365
req := &rawtopic.AlterTopicRequest{}
7466
req.OperationParams = c.defaultOperationParams
@@ -91,10 +83,6 @@ func (c *Client) Alter(ctx context.Context, path string, opts ...topicoptions.Al
9183
}
9284

9385
// Create new topic
94-
//
95-
// # Experimental
96-
//
97-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
9886
func (c *Client) Create(
9987
ctx context.Context,
10088
path string,
@@ -122,10 +110,6 @@ func (c *Client) Create(
122110
}
123111

124112
// Describe topic
125-
//
126-
// # Experimental
127-
//
128-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
129113
func (c *Client) Describe(
130114
ctx context.Context,
131115
path string,
@@ -166,10 +150,6 @@ func (c *Client) Describe(
166150
}
167151

168152
// Drop topic
169-
//
170-
// # Experimental
171-
//
172-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
173153
func (c *Client) Drop(ctx context.Context, path string, opts ...topicoptions.DropOption) error {
174154
req := rawtopic.DropTopicRequest{}
175155
req.OperationParams = c.defaultOperationParams
@@ -195,10 +175,6 @@ func (c *Client) Drop(ctx context.Context, path string, opts ...topicoptions.Dro
195175

196176
// StartReader create new topic reader and start pull messages from server
197177
// it is fast non block call, connection will start in background
198-
//
199-
// # Experimental
200-
//
201-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
202178
func (c *Client) StartReader(
203179
consumer string,
204180
readSelectors topicoptions.ReadSelectors,
@@ -224,10 +200,6 @@ func (c *Client) StartReader(
224200
}
225201

226202
// StartWriter create new topic writer wrapper
227-
//
228-
// # Experimental
229-
//
230-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
231203
func (c *Client) StartWriter(topicPath string, opts ...topicoptions.WriterOption) (*topicwriter.Writer, error) {
232204
var connector topicwriterinternal.ConnectFunc = func(ctx context.Context) (
233205
topicwriterinternal.RawTopicWriterStream,

internal/topic/topicreaderinternal/batch.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@ var (
1515
)
1616

1717
// PublicBatch is ordered group of message from one partition
18-
//
19-
// # Experimental
20-
//
21-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
2218
type PublicBatch struct {
2319
empty.DoNotCopy
2420

@@ -100,28 +96,16 @@ func newBatchFromStream(
10096

10197
// Context is cancelled when code should stop to process messages batch
10298
// for example - lost connection to server or receive stop partition signal without graceful flag
103-
//
104-
// # Experimental
105-
//
106-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
10799
func (m *PublicBatch) Context() context.Context {
108100
return m.commitRange.partitionSession.Context()
109101
}
110102

111103
// Topic is path of source topic of the messages in the batch
112-
//
113-
// # Experimental
114-
//
115-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
116104
func (m *PublicBatch) Topic() string {
117105
return m.partitionSession().Topic
118106
}
119107

120108
// PartitionID of messages in the batch
121-
//
122-
// # Experimental
123-
//
124-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
125109
func (m *PublicBatch) PartitionID() int64 {
126110
return m.partitionSession().PartitionID
127111
}

internal/topic/topicreaderinternal/commit_range.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,6 @@ import (
88
)
99

1010
// PublicCommitRangeGetter return data piece for commit messages range
11-
//
12-
// # Experimental
13-
//
14-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
1511
type PublicCommitRangeGetter interface {
1612
getCommitRange() PublicCommitRange
1713
}

internal/topic/topicreaderinternal/committer.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,6 @@ var (
2323

2424
type sendMessageToServerFunc func(msg rawtopicreader.ClientMessage) error
2525

26-
// PublicCommitMode
27-
//
28-
// # Experimental
29-
//
30-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
3126
type PublicCommitMode int
3227

3328
const (

internal/topic/topicreaderinternal/decoders.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,4 @@ func (m *decoderMap) Decode(codec rawtopiccommon.Codec, input io.Reader) (io.Rea
3939
))
4040
}
4141

42-
// PublicCreateDecoderFunc
43-
//
44-
// # Experimental
45-
//
46-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
4742
type PublicCreateDecoderFunc func(input io.Reader) (io.Reader, error)

internal/topic/topicreaderinternal/message.go

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,9 @@ import (
1515
var errMessageWasReadEarly = xerrors.Wrap(errors.New("ydb: message was read early"))
1616

1717
// PublicErrUnexpectedCodec return when try to read message content with unknown codec
18-
//
19-
// # Experimental
20-
//
21-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
2218
var PublicErrUnexpectedCodec = errors.New("unexpected codec") //nolint:revive,stylecheck
2319

2420
// PublicMessage is representation of topic message
25-
//
26-
// # Experimental
27-
//
28-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
2921
type PublicMessage struct {
3022
empty.DoNotCopy
3123

@@ -81,19 +73,10 @@ func (m *PublicMessage) Read(p []byte) (n int, err error) {
8173
}
8274

8375
// PublicMessageContentUnmarshaler is interface for unmarshal message content
84-
//
85-
// # Experimental
86-
//
87-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
8876
type PublicMessageContentUnmarshaler interface {
8977
// UnmarshalYDBTopicMessage MUST NOT use data after return.
9078
// If you need content after return from Consume - copy data content to
9179
// own slice with copy(dst, data)
92-
//
93-
// Experimental
94-
//
95-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
96-
// later release.
9780
UnmarshalYDBTopicMessage(data []byte) error
9881
}
9982

0 commit comments

Comments
 (0)