Skip to content

Commit 8bb300e

Browse files
committed
AdminClient: Default parameters support for AdminClient#createTopic (#616)
This is needed because librdkafka supports AdminClient.CreateTopics(...) with default value(-1) of NumPartitions and ReplicationFactor, but currently it's not possible to pass a value of -1 or 0 for ReplicationFactor to CreateTopics without specifying an explicit ReplicaAssignment.
1 parent 80c58f8 commit 8bb300e

File tree

3 files changed

+63
-4
lines changed

3 files changed

+63
-4
lines changed

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
11
# Confluent's Golang client for Apache Kafka
22

3+
## v1.7.0
4+
5+
### Fixes
6+
7+
* AdminClient.CreateTopics() previously did not accept default value(-1) of
8+
ReplicationFactor without specifying an explicit ReplicaAssignment, this is
9+
now fixed.
10+
11+
12+
313
## v1.6.1
414

515
v1.6.1 is a feature release:

kafka/adminapi.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -514,10 +514,6 @@ func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecificat
514514
return nil, newErrorFromString(ErrInvalidArg,
515515
"TopicSpecification.ReplicaAssignment must contain exactly TopicSpecification.NumPartitions partitions")
516516
}
517-
518-
} else if cReplicationFactor == -1 {
519-
return nil, newErrorFromString(ErrInvalidArg,
520-
"TopicSpecification.ReplicationFactor or TopicSpecification.ReplicaAssignment must be specified")
521517
}
522518

523519
cTopics[i] = C.rd_kafka_NewTopic_new(

kafka/adminapi_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,59 @@ import (
2323
"time"
2424
)
2525

26+
// TestAdminAPIWithDefaultValue tests CreateTopics with default
27+
// NumPartitions and ReplicationFactor values
28+
func TestAdminAPIWithDefaultValue(t *testing.T) {
29+
if !testconfRead() {
30+
t.Skipf("Missing testconf.json")
31+
}
32+
33+
topic := "testWithDefaultValue"
34+
35+
conf := ConfigMap{"bootstrap.servers": testconf.Brokers}
36+
if err := conf.updateFromTestconf(); err != nil {
37+
t.Fatalf("Failed to update test configuration: %v\n", err)
38+
}
39+
40+
expDuration, err := time.ParseDuration("30s")
41+
if err != nil {
42+
t.Fatalf("Failed to Parse Duration: %s", err)
43+
}
44+
45+
adminClient, err := NewAdminClient(&conf)
46+
if err != nil {
47+
t.Fatalf("Failed to create AdminClient %v", err)
48+
}
49+
50+
ctx, cancel := context.WithTimeout(context.Background(), expDuration)
51+
defer cancel()
52+
res, err := adminClient.CreateTopics(
53+
ctx,
54+
[]TopicSpecification{
55+
{
56+
Topic: topic,
57+
NumPartitions: -1,
58+
ReplicationFactor: -1,
59+
},
60+
})
61+
if err != nil {
62+
adminClient.Close()
63+
t.Fatalf("Failed to create topics %v\n", err)
64+
}
65+
t.Logf("Succeed to create topic %v\n", res)
66+
67+
ctx, cancel = context.WithTimeout(context.Background(), expDuration)
68+
defer cancel()
69+
res, err = adminClient.DeleteTopics(ctx, []string{topic})
70+
if err != nil {
71+
adminClient.Close()
72+
t.Fatalf("Failed to delete topic %v, err: %v", topic, err)
73+
}
74+
t.Logf("Succeed to delete topic %v\n", res)
75+
76+
adminClient.Close()
77+
}
78+
2679
func testAdminAPIs(what string, a *AdminClient, t *testing.T) {
2780
t.Logf("AdminClient API testing on %s: %s", a, what)
2881

0 commit comments

Comments
 (0)