Skip to content

Commit d2839b8

Browse files
committed
allow set force partition for topic writer
(cherry picked from commit a699028)
1 parent 13260fc commit d2839b8

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ type WriterReconnectorConfig struct {
6666
}
6767

6868
func (cfg *WriterReconnectorConfig) validate() error {
69-
if cfg.producerID != cfg.defaultPartitioning.MessageGroupID {
69+
if cfg.defaultPartitioning.Type == rawtopicwriter.PartitioningMessageGroupID && cfg.producerID != cfg.defaultPartitioning.MessageGroupID {
7070
return xerrors.WithStackTrace(errProducerIDNotEqualMessageGroupID)
7171
}
7272
return nil

tests/integration/topic_read_writer_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,22 @@ func TestUpdateToken(t *testing.T) {
388388
xtest.WaitChannelClosed(t, activityStopped)
389389
}
390390

391+
func TestTopicWriterWithManualPartitionSelect(t *testing.T) {
392+
ctx := xtest.Context(t)
393+
db := connect(t)
394+
topicPath := createTopic(ctx, t, db)
395+
396+
writer, err := db.Topic().StartWriter(
397+
"producer-id",
398+
topicPath,
399+
topicoptions.WithPartitionID(0),
400+
topicoptions.WithSyncWrite(true),
401+
)
402+
require.NoError(t, err)
403+
err = writer.Write(ctx, topicwriter.Message{Data: strings.NewReader("asd")})
404+
require.NoError(t, err)
405+
}
406+
391407
var topicCounter int
392408

393409
func createTopic(ctx context.Context, t testing.TB, db ydb.Connection) (topicPath string) {

0 commit comments

Comments
 (0)