Skip to content

Commit f057b1d

Browse files
authored
Merge pull request #1136 from rhansen2/issue-1092
CreateTopics: only suppress topic already exists errors
2 parents 01b2e66 + c96cc0e commit f057b1d

File tree

2 files changed

+80
-13
lines changed

2 files changed

+80
-13
lines changed

createtopics.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package kafka
33
import (
44
"bufio"
55
"context"
6-
"errors"
76
"fmt"
87
"net"
98
"time"
@@ -65,7 +64,6 @@ func (c *Client) CreateTopics(ctx context.Context, req *CreateTopicsRequest) (*C
6564
TimeoutMs: c.timeoutMs(ctx, defaultCreateTopicsTimeout),
6665
ValidateOnly: req.ValidateOnly,
6766
})
68-
6967
if err != nil {
7068
return nil, fmt.Errorf("kafka.(*Client).CreateTopics: %w", err)
7169
}
@@ -363,6 +361,9 @@ func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponse
363361
return response, err
364362
}
365363
for _, tr := range response.TopicErrors {
364+
if tr.ErrorCode == int16(TopicAlreadyExists) {
365+
continue
366+
}
366367
if tr.ErrorCode != 0 {
367368
return response, Error(tr.ErrorCode)
368369
}
@@ -385,14 +386,5 @@ func (c *Conn) CreateTopics(topics ...TopicConfig) error {
385386
_, err := c.createTopics(createTopicsRequestV0{
386387
Topics: requestV0Topics,
387388
})
388-
if err != nil {
389-
if errors.Is(err, TopicAlreadyExists) {
390-
// ok
391-
return nil
392-
}
393-
394-
return err
395-
}
396-
397-
return nil
389+
return err
398390
}

createtopics_test.go

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,86 @@ import (
44
"bufio"
55
"bytes"
66
"context"
7+
"errors"
8+
"net"
79
"reflect"
10+
"strconv"
811
"testing"
912
)
1013

14+
func TestConnCreateTopics(t *testing.T) {
15+
topic1 := makeTopic()
16+
topic2 := makeTopic()
17+
18+
conn, err := DialContext(context.Background(), "tcp", "localhost:9092")
19+
if err != nil {
20+
t.Fatal(err)
21+
}
22+
23+
defer func() {
24+
err := conn.Close()
25+
if err != nil {
26+
t.Fatalf("failed to close connection: %v", err)
27+
}
28+
}()
29+
30+
controller, _ := conn.Controller()
31+
32+
controllerConn, err := Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
33+
if err != nil {
34+
t.Fatal(err)
35+
}
36+
defer controllerConn.Close()
37+
38+
err = controllerConn.CreateTopics(TopicConfig{
39+
Topic: topic1,
40+
NumPartitions: 1,
41+
ReplicationFactor: 1,
42+
})
43+
if err != nil {
44+
t.Fatalf("unexpected error creating topic: %s", err.Error())
45+
}
46+
47+
err = controllerConn.CreateTopics(TopicConfig{
48+
Topic: topic1,
49+
NumPartitions: 1,
50+
ReplicationFactor: 1,
51+
})
52+
53+
// Duplicate topic should not return an error
54+
if err != nil {
55+
t.Fatalf("unexpected error creating duplicate topic topic: %v", err)
56+
}
57+
58+
err = controllerConn.CreateTopics(
59+
TopicConfig{
60+
Topic: topic1,
61+
NumPartitions: 1,
62+
ReplicationFactor: 1,
63+
},
64+
TopicConfig{
65+
Topic: topic2,
66+
NumPartitions: 1,
67+
ReplicationFactor: 1,
68+
},
69+
TopicConfig{
70+
Topic: topic2,
71+
NumPartitions: 1,
72+
ReplicationFactor: 1,
73+
},
74+
)
75+
76+
if err == nil {
77+
t.Fatal("CreateTopics should have returned an error for invalid requests")
78+
}
79+
80+
if !errors.Is(err, InvalidRequest) {
81+
t.Fatalf("expected invalid request: %v", err)
82+
}
83+
84+
deleteTopic(t, topic1)
85+
}
86+
1187
func TestClientCreateTopics(t *testing.T) {
1288
const (
1389
topic1 = "client-topic-1"
@@ -59,7 +135,6 @@ func TestClientCreateTopics(t *testing.T) {
59135
},
60136
},
61137
})
62-
63138
if err != nil {
64139
t.Fatal(err)
65140
}

0 commit comments

Comments
 (0)