Skip to content

Commit d93481d

Browse files
authored
Merge pull request #1723 add flag for manage split-merge support
2 parents 4ea3d2c + 1e61c08 commit d93481d

File tree

5 files changed

+27
-12
lines changed

5 files changed

+27
-12
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Added option WithReaderSupportSplitMergePartitions for topic manage support of split-merge partitions on client side (enabled by default).
12
* Allowed overflow queue limit for one goroutine at time for topic writer
23
* Removed delay before send commit in sync mode of a topic reader
34

internal/topic/topiclistenerinternal/stream_listener.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func (l *streamListener) initStream(ctx context.Context, client TopicClient) err
174174
}
175175
l.stream = topicreadercommon.NewSyncedStream(stream)
176176

177-
initMessage := topicreadercommon.CreateInitMessage(l.cfg.Consumer, l.cfg.Selectors)
177+
initMessage := topicreadercommon.CreateInitMessage(l.cfg.Consumer, false, l.cfg.Selectors)
178178
err = stream.Send(initMessage)
179179
if err != nil {
180180
return xerrors.WithStackTrace(fmt.Errorf("ydb: failed to send init request for read stream in the listener: %w", err))

internal/topic/topicreadercommon/init_message.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@ package topicreadercommon
22

33
import "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
44

5-
func CreateInitMessage(consumer string, selectors []*PublicReadSelector) *rawtopicreader.InitRequest {
5+
func CreateInitMessage(
6+
consumer string,
7+
supportAutoPartition bool,
8+
selectors []*PublicReadSelector,
9+
) *rawtopicreader.InitRequest {
610
res := &rawtopicreader.InitRequest{
711
Consumer: consumer,
8-
AutoPartitioningSupport: true,
12+
AutoPartitioningSupport: supportAutoPartition,
913
}
1014

1115
res.TopicsReadSettings = make([]rawtopicreader.TopicReadSettings, len(selectors))

internal/topic/topicreaderinternal/stream_reader_impl.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -80,18 +80,20 @@ type topicStreamReaderConfig struct {
8080
GetPartitionStartOffsetCallback PublicGetPartitionStartOffsetFunc
8181
CommitMode topicreadercommon.PublicCommitMode
8282
Decoders topicreadercommon.DecoderMap
83+
EnableSplitMergeSupport bool
8384
}
8485

8586
func newTopicStreamReaderConfig() topicStreamReaderConfig {
8687
return topicStreamReaderConfig{
87-
BaseContext: context.Background(),
88-
BufferSizeProtoBytes: topicreadercommon.DefaultBufferSize,
89-
Cred: credentials.NewAnonymousCredentials(),
90-
CredUpdateInterval: time.Hour,
91-
CommitMode: topicreadercommon.CommitModeAsync,
92-
CommitterBatchTimeLag: time.Second,
93-
Decoders: topicreadercommon.NewDecoderMap(),
94-
Trace: &trace.Topic{},
88+
BaseContext: context.Background(),
89+
BufferSizeProtoBytes: topicreadercommon.DefaultBufferSize,
90+
Cred: credentials.NewAnonymousCredentials(),
91+
CredUpdateInterval: time.Hour,
92+
CommitMode: topicreadercommon.CommitModeAsync,
93+
CommitterBatchTimeLag: time.Second,
94+
Decoders: topicreadercommon.NewDecoderMap(),
95+
Trace: &trace.Topic{},
96+
EnableSplitMergeSupport: true,
9597
}
9698
}
9799

@@ -611,7 +613,7 @@ func (r *topicStreamReaderImpl) setStarted() error {
611613
}
612614

613615
func (r *topicStreamReaderImpl) initSession() (err error) {
614-
initMessage := topicreadercommon.CreateInitMessage(r.cfg.Consumer, r.cfg.ReadSelectors)
616+
initMessage := topicreadercommon.CreateInitMessage(r.cfg.Consumer, r.cfg.EnableSplitMergeSupport, r.cfg.ReadSelectors)
615617

616618
onDone := trace.TopicOnReaderInit(r.cfg.Trace, r.readConnectionID, initMessage)
617619
defer func() {

topic/topicoptions/topicoptions_reader.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,3 +281,11 @@ func WithReaderWithoutConsumer(saveStateOnReconnection bool) ReaderOption {
281281
cfg.CommitMode = CommitModeNone
282282
}
283283
}
284+
285+
// WithReaderSupportSplitMergePartitions set support of split and merge partitions on client side.
286+
// Default is true, set false for disable the support.
287+
func WithReaderSupportSplitMergePartitions(enableSupport bool) ReaderOption {
288+
return func(cfg *topicreaderinternal.ReaderConfig) {
289+
cfg.EnableSplitMergeSupport = enableSupport
290+
}
291+
}

0 commit comments

Comments
 (0)