Skip to content
This repository was archived by the owner on Jun 4, 2021. It is now read-only.

Commit eaa5916

Browse files
authored
Set subscription readiness based on consumer group status (#182) * WIP: Set subscription readiness based on consumer group status * Refactor KafkaWatcher to an edge-triggered, level driven design * Refactor the test to prevent a map race * goimports and gofmt comments * Add tests for the KafkaWatcher and reconnect for AdminClient * Improve consumer group watcher code aethetics and robustness * Make variable names more descriptive for consumer group watcher and adimn client * Forget watcher when channel is deleted * Terminate old watcher and free the cache when config is created * Set the KafkaChannel consolidated dispatcher reconciler to be read-only * Enable HA for KafkaChannel(consolidated) dispatcher * Run update-codegen.sh * Optimize consumer group watcher * Use sets.String for internal cache instead of string slice * Allow for a single callback per watcher * Synchronization on Terminate and Forget
1 parent f9ebcd1 commit eaa5916

File tree

9 files changed

+662
-62
lines changed

9 files changed

+662
-62
lines changed

kafka/channel/cmd/channel_dispatcher/main.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,5 @@ func main() {
3535
ctx = injection.WithNamespaceScope(ctx, ns)
3636
}
3737

38-
// Do not run the dispatcher in leader-election mode
39-
ctx = sharedmain.WithHADisabled(ctx)
40-
4138
sharedmain.MainWithContext(ctx, component, controller.NewController)
4239
}

kafka/channel/pkg/kafka/admin.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
Copyright 2020 The Knative Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kafka
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"math"
23+
"sync"
24+
"time"
25+
26+
"go.uber.org/zap"
27+
"k8s.io/apimachinery/pkg/util/sets"
28+
29+
"github.com/Shopify/sarama"
30+
"knative.dev/pkg/logging"
31+
)
32+
33+
var mutex sync.Mutex
34+
35+
type ClusterAdminFactory func() (sarama.ClusterAdmin, error)
36+
37+
type AdminClient interface {
38+
// ListConsumerGroups Lists the consumer groups
39+
ListConsumerGroups() ([]string, error)
40+
}
41+
42+
// AdminClientManager manages a ClusterAdmin connection and recreates one when needed
43+
// it is made to overcome https://github.com/Shopify/sarama/issues/1162
44+
type AdminClientManager struct {
45+
logger *zap.SugaredLogger
46+
adminFactory ClusterAdminFactory
47+
clusterAdmin sarama.ClusterAdmin
48+
}
49+
50+
func NewAdminClient(ctx context.Context, caFactory ClusterAdminFactory) (AdminClient, error) {
51+
logger := logging.FromContext(ctx)
52+
logger.Info("Creating a new AdminClient")
53+
kafkaClusterAdmin, err := caFactory()
54+
if err != nil {
55+
logger.Errorw("error while creating ClusterAdmin", zap.Error(err))
56+
return nil, err
57+
}
58+
return &AdminClientManager{
59+
logger: logger,
60+
adminFactory: caFactory,
61+
clusterAdmin: kafkaClusterAdmin,
62+
}, nil
63+
}
64+
65+
// ListConsumerGroups Returns a list of the consumer groups.
66+
//
67+
// In the occasion of errors, there will be a retry with an exponential backoff.
68+
// Due to a known issue in Sarama ClusterAdmin https://github.com/Shopify/sarama/issues/1162,
69+
// a new ClusterAdmin will be created with every retry until the call succeeds or
70+
// the timeout is reached.
71+
func (c *AdminClientManager) ListConsumerGroups() ([]string, error) {
72+
c.logger.Info("Attempting to list consumer group")
73+
mutex.Lock()
74+
defer mutex.Unlock()
75+
r := 0
76+
// This gives us around ~13min of exponential backoff
77+
max := 13
78+
cgsMap, err := c.clusterAdmin.ListConsumerGroups()
79+
for err != nil && r <= max {
80+
// There's on error, let's retry and presume a new ClusterAdmin can fix it
81+
82+
// Calculate incremental delay following this https://docs.aws.amazon.com/general/latest/gr/api-retries.html
83+
t := int(math.Pow(2, float64(r)) * 100)
84+
d := time.Duration(t) * time.Millisecond
85+
c.logger.Errorw("listing consumer group failed. Refreshing the ClusterAdmin and retrying.",
86+
zap.Error(err),
87+
zap.Duration("retry after", d),
88+
zap.Int("Retry attempt", r),
89+
zap.Int("Max retries", max),
90+
)
91+
time.Sleep(d)
92+
93+
// let's reconnect and try again
94+
c.clusterAdmin, err = c.adminFactory()
95+
r += 1
96+
if err != nil {
97+
// skip this attempt
98+
continue
99+
}
100+
cgsMap, err = c.clusterAdmin.ListConsumerGroups()
101+
}
102+
103+
if r > max {
104+
return nil, fmt.Errorf("failed to refresh the culster admin and retry: %v", err)
105+
}
106+
107+
return sets.StringKeySet(cgsMap).List(), nil
108+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
Copyright 2020 The Knative Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kafka
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
"testing"
23+
"time"
24+
25+
"github.com/Shopify/sarama"
26+
27+
pkgtesting "knative.dev/pkg/logging/testing"
28+
)
29+
30+
const testCG = "cg1"
31+
32+
var m sync.RWMutex
33+
34+
type FakeClusterAdmin struct {
35+
sarama.ClusterAdmin
36+
faulty bool
37+
}
38+
39+
func (f *FakeClusterAdmin) ListConsumerGroups() (map[string]string, error) {
40+
cgs := map[string]string{
41+
testCG: "cg",
42+
}
43+
m.RLock()
44+
defer m.RUnlock()
45+
if f.faulty {
46+
return nil, fmt.Errorf("Error")
47+
}
48+
return cgs, nil
49+
}
50+
51+
func TestAdminClient(t *testing.T) {
52+
var wg sync.WaitGroup
53+
wg.Add(10)
54+
ctx := pkgtesting.TestContextWithLogger(t)
55+
f := &FakeClusterAdmin{}
56+
ac, err := NewAdminClient(ctx, func() (sarama.ClusterAdmin, error) {
57+
return f, nil
58+
})
59+
if err != nil {
60+
t.Error("failed to obtain new client", err)
61+
}
62+
for i := 0; i < 10; i += 1 {
63+
go func() {
64+
doList(t, ac)
65+
check := make(chan struct{})
66+
go func() {
67+
m.Lock()
68+
f.faulty = true
69+
m.Unlock()
70+
check <- struct{}{}
71+
time.Sleep(2 * time.Second)
72+
m.Lock()
73+
f.faulty = false
74+
m.Unlock()
75+
check <- struct{}{}
76+
}()
77+
<-check
78+
doList(t, ac)
79+
<-check
80+
wg.Done()
81+
}()
82+
}
83+
wg.Wait()
84+
}
85+
86+
func doList(t *testing.T, ac AdminClient) {
87+
cgs, _ := ac.ListConsumerGroups()
88+
if len(cgs) != 1 {
89+
t.Fatalf("list consumer group: got %d, want %d", len(cgs), 1)
90+
}
91+
if cgs[0] != testCG {
92+
t.Fatalf("consumer group: got %s, want %s", cgs[0], testCG)
93+
}
94+
}

0 commit comments

Comments
 (0)