Skip to content

Commit d5b3a82

Browse files
committed
Add support for KIP-429 Incremental Rebalancing
Also fixes consumer.Close() that previously would not trigger the rebalance callback leaving the application no way to do its revoke logic, if any. The fix is to Unsubscribe() first, which triggers the rebalance, and then Close().
1 parent f6bbbb8 commit d5b3a82

File tree

8 files changed

+474
-87
lines changed

8 files changed

+474
-87
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22

33
## v1.6.0
44

5-
v1.6.0 is a feature release adding support for KIP-429 Incremental rebalancing.
5+
v1.6.0 is a feature release adding support for KIP-429 Incremental rebalancing
6+
and KIP-480 Sticky producer partitioner.
67

8+
- See [cooperative_consumer_example.go](examples/cooperative_consumer_example/cooperative_consumer_example.go)
9+
for an example how to use the new incremental rebalancing consumer.
710
- Bundles librdkafka v1.6.0 - see release notes for all enhancements and fixes.
811

912

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
cooperative_consumer_example
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// Example high-level Apache Kafka consumer using the
2+
// cooperative incremental rebalancing protocol which allows
3+
// seamless reassignment of partitions to other group members.
4+
package main
5+
6+
/**
7+
* Copyright 2020 Confluent Inc.
8+
*
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
*/
21+
22+
import (
23+
"fmt"
24+
"github.com/confluentinc/confluent-kafka-go/kafka"
25+
"os"
26+
"os/signal"
27+
"syscall"
28+
)
29+
30+
func main() {
31+
32+
if len(os.Args) < 4 {
33+
fmt.Fprintf(os.Stderr, "Usage: %s <broker> <group> <topics..>\n",
34+
os.Args[0])
35+
os.Exit(1)
36+
}
37+
38+
broker := os.Args[1]
39+
group := os.Args[2]
40+
topics := os.Args[3:]
41+
sigchan := make(chan os.Signal, 1)
42+
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
43+
44+
c, err := kafka.NewConsumer(&kafka.ConfigMap{
45+
"bootstrap.servers": broker,
46+
// Avoid connecting to IPv6 brokers:
47+
// This is needed for the ErrAllBrokersDown show-case below
48+
// when using localhost brokers on OSX, since the OSX resolver
49+
// will return the IPv6 addresses first.
50+
// You typically don't need to specify this configuration
51+
// property.
52+
"broker.address.family": "v4",
53+
// Consumer group ID
54+
"group.id": group,
55+
// Use the cooperative incremental rebalance protocol.
56+
"partition.assignment.strategy": "cooperative-sticky",
57+
// Start reading from the first message of each assigned
58+
// partition if there are no previously committed offsets
59+
// for this group.
60+
"auto.offset.reset": "earliest"})
61+
62+
if err != nil {
63+
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
64+
os.Exit(1)
65+
}
66+
67+
fmt.Printf("Created Consumer %v\n", c)
68+
69+
// Subscribe to topics, call the rebalanceCallback on assignment/revoke.
70+
// The rebalanceCallback is triggered from c.Poll() and c.Close() below.
71+
err = c.SubscribeTopics(topics, rebalanceCallback)
72+
73+
run := true
74+
75+
for run == true {
76+
select {
77+
case sig := <-sigchan:
78+
fmt.Printf("Caught signal %v: terminating\n", sig)
79+
run = false
80+
default:
81+
ev := c.Poll(100)
82+
if ev == nil {
83+
continue
84+
}
85+
86+
switch e := ev.(type) {
87+
case *kafka.Message:
88+
fmt.Printf("%% Message on %s:\n%s\n",
89+
e.TopicPartition, string(e.Value))
90+
if e.Headers != nil {
91+
fmt.Printf("%% Headers: %v\n", e.Headers)
92+
}
93+
case kafka.Error:
94+
// Errors should generally be considered
95+
// informational, the client will try to
96+
// automatically recover.
97+
// But in this example we choose to terminate
98+
// the application if all brokers are down.
99+
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n",
100+
e.Code(), e)
101+
if e.Code() == kafka.ErrAllBrokersDown {
102+
run = false
103+
}
104+
default:
105+
fmt.Printf("Ignored %v\n", e)
106+
}
107+
}
108+
}
109+
110+
fmt.Printf("Closing consumer\n")
111+
c.Close()
112+
}
113+
114+
// rebalanceCallback is called on each group rebalance to assign additional
115+
// partitions, or remove existing partitions, from the consumer's current
116+
// assignment.
117+
//
118+
// The application may use this optional callback to inspect the assignment,
119+
// alter the initial start offset (the .Offset field of each assigned partition),
120+
// and read/write offsets to commit to an alternative store outside of Kafka.
121+
func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
122+
123+
switch ev := event.(type) {
124+
case kafka.AssignedPartitions:
125+
fmt.Fprintf(os.Stderr,
126+
"%% %s rebalance: %d new partition(s) assigned: %v\n",
127+
c.GetRebalanceProtocol(), len(ev.Partitions),
128+
ev.Partitions)
129+
130+
// The application may update the start .Offset of each
131+
// assigned partition and then call IncrementalAssign().
132+
// Even though this example does not alter the offsets we
133+
// provide the call to IncrementalAssign() as an example.
134+
err := c.IncrementalAssign(ev.Partitions)
135+
if err != nil {
136+
panic(err)
137+
}
138+
139+
case kafka.RevokedPartitions:
140+
fmt.Fprintf(os.Stderr,
141+
"%% %s rebalance: %d partition(s) revoked: %v\n",
142+
c.GetRebalanceProtocol(), len(ev.Partitions),
143+
ev.Partitions)
144+
if c.AssignmentLost() {
145+
// Our consumer has been kicked out of the group and the
146+
// entire assignment is thus lost.
147+
fmt.Fprintf(os.Stderr, "%% Current assignment lost!\n")
148+
}
149+
150+
// The client automatically calls IncrementalUnassign() unless
151+
// the callback has already called that method.
152+
}
153+
154+
return nil
155+
}

0 commit comments

Comments
 (0)