Skip to content

Commit 40074fd

Browse files
authored
Merge pull request #434 Removed message level partitioning from experimental topic API
2 parents 1ee6a64 + b0513df commit 40074fd

File tree

6 files changed

+22
-27
lines changed

6 files changed

+22
-27
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Removed message level partitioning from experimental topic API. It is unavailable on server side yet.
12
* Supported `NullValue` type as received type from `YDB`
23
* Supported `types.SetValue` type
34
* Added `types.CastTo(types.Value, destination)` public method for cast `types.Value` to golang native type value destination

internal/topic/topicwriterinternal/message.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,38 @@ import (
1515
var errNoRawContent = xerrors.Wrap(errors.New("ydb: internal state error - no raw message content"))
1616

1717
type Message struct {
18-
SeqNo int64
19-
CreatedAt time.Time
20-
Data io.Reader
21-
Partitioning PublicPartitioning
18+
SeqNo int64
19+
CreatedAt time.Time
20+
Data io.Reader
21+
22+
// partitioning at level message available by protocol, but doesn't available by current server implementation
23+
// the field hidden from public access for prevent runtime errors.
24+
// it will be published after implementation on server side.
25+
futurePartitioning PublicFuturePartitioning
2226
}
2327

24-
type PublicPartitioning struct {
28+
// PublicFuturePartitioning will be published in feature, after server implementation completed.
29+
type PublicFuturePartitioning struct {
2530
messageGroupID string
2631
partitionID int64
2732
hasPartitionID bool
2833
}
2934

30-
func (p PublicPartitioning) ToRaw() rawtopicwriter.Partitioning {
35+
func (p PublicFuturePartitioning) ToRaw() rawtopicwriter.Partitioning {
3136
if p.hasPartitionID {
3237
return rawtopicwriter.NewPartitioningPartitionID(p.partitionID)
3338
}
3439
return rawtopicwriter.NewPartitioningMessageGroup(p.messageGroupID)
3540
}
3641

37-
func NewPartitioningWithMessageGroupID(id string) PublicPartitioning {
38-
return PublicPartitioning{
42+
func NewPartitioningWithMessageGroupID(id string) PublicFuturePartitioning {
43+
return PublicFuturePartitioning{
3944
messageGroupID: id,
4045
}
4146
}
4247

43-
func NewPartitioningWithPartitionID(id int64) PublicPartitioning {
44-
return PublicPartitioning{
48+
func NewPartitioningWithPartitionID(id int64) PublicFuturePartitioning {
49+
return PublicFuturePartitioning{
4550
partitionID: id,
4651
hasPartitionID: true,
4752
}

internal/topic/topicwriterinternal/writer_options.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func WithMaxQueueLen(num int) PublicWriterOption {
9494
}
9595
}
9696

97-
func WithPartitioning(partitioning PublicPartitioning) PublicWriterOption {
97+
func WithPartitioning(partitioning PublicFuturePartitioning) PublicWriterOption {
9898
return func(cfg *WriterReconnectorConfig) {
9999
cfg.defaultPartitioning = partitioning.ToRaw()
100100
}

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -573,12 +573,12 @@ func createRawMessageData(
573573
res.SeqNo = mess.SeqNo
574574

575575
switch {
576-
case mess.Partitioning.hasPartitionID:
576+
case mess.futurePartitioning.hasPartitionID:
577577
res.Partitioning.Type = rawtopicwriter.PartitioningPartitionID
578-
res.Partitioning.PartitionID = mess.Partitioning.partitionID
579-
case mess.Partitioning.messageGroupID != "":
578+
res.Partitioning.PartitionID = mess.futurePartitioning.partitionID
579+
case mess.futurePartitioning.messageGroupID != "":
580580
res.Partitioning.Type = rawtopicwriter.PartitioningMessageGroupID
581-
res.Partitioning.MessageGroupID = mess.Partitioning.messageGroupID
581+
res.Partitioning.MessageGroupID = mess.futurePartitioning.messageGroupID
582582
default:
583583
// pass
584584
}

topic/topicoptions/topicoptions_writer.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
55
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicwriterinternal"
66
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes"
7-
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter"
87
)
98

109
type WriterOption = topicwriterinternal.PublicWriterOption
@@ -91,15 +90,6 @@ func WithSyncWrite(sync bool) WriterOption {
9190
return topicwriterinternal.WithWaitAckOnWrite(sync)
9291
}
9392

94-
// WithWriterPartitioning explicit set partitioning for write session
95-
//
96-
// # Experimental
97-
//
98-
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
99-
func WithWriterPartitioning(partitioning topicwriter.Partitioning) WriterOption {
100-
return topicwriterinternal.WithPartitioning(partitioning)
101-
}
102-
10393
type (
10494
// WithOnWriterConnectedInfo present information, received from server
10595
//

topic/topicwriter/topicwriter.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ import (
77
)
88

99
type (
10-
Message = topicwriterinternal.Message
11-
Partitioning = topicwriterinternal.PublicPartitioning
10+
Message = topicwriterinternal.Message
1211
)
1312

1413
var ErrQueueLimitExceed = topicwriterinternal.PublicErrQueueIsFull

0 commit comments

Comments
 (0)