Skip to content

Commit 59b4cd3

Browse files
committed
prefix topics and consumer groups
1 parent f272950 commit 59b4cd3

File tree

20 files changed

+284
-174
lines changed

20 files changed

+284
-174
lines changed

broadcast.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -177,14 +177,15 @@ func (lib *Library[ID, TX, DB]) getBroadcastReader(ctx context.Context, broadcas
177177

178178
// We get a reader. Once we check that there is exactly one reader, we know we have exclusive use
179179
// of the group because any other thread will see more than one reader
180-
reader, _, readerConfig, err = lib.getReader(ctx, broadcastConsumerGroup, broadcastTopics(generic.Keys(lib.broadcast.topics)), true, resetPosition)
180+
reader, _, readerConfig, err = lib.getReader(ctx, broadcastConsumerGroup, lib.addPrefixes(broadcastTopics(generic.Keys(lib.broadcast.topics))), true, resetPosition)
181181
if err != nil {
182182
// context was cancelled
183183
return nil, nil, errors.WithStack(err)
184184
}
185185

186186
b := backoffPolicy.Start(ctx)
187187

188+
prefixedBroadcastConsumerGroup := lib.addPrefix(string(broadcastConsumerGroup))
188189
FreshClient:
189190
for {
190191
// We need a new client. Either the group was deleted and thus the broker may have changed,
@@ -209,7 +210,7 @@ FreshClient:
209210
ctxWithTimeout, cancelCtx = context.WithTimeout(ctx, describeTimeout)
210211
describeResponse, err := client.DescribeGroups(ctxWithTimeout, &kafka.DescribeGroupsRequest{
211212
Addr: client.Addr,
212-
GroupIDs: []string{string(broadcastConsumerGroup)},
213+
GroupIDs: []string{prefixedBroadcastConsumerGroup},
213214
})
214215
cancelCtx()
215216
if err != nil {
@@ -223,7 +224,7 @@ FreshClient:
223224
return nil, nil, errors.Errorf("could not describe broadcast group: %w", err)
224225
}
225226
for _, resp := range describeResponse.Groups {
226-
if resp.GroupID != string(broadcastConsumerGroup) {
227+
if resp.GroupID != prefixedBroadcastConsumerGroup {
227228
return nil, nil, errors.Errorf("got a description for a group (%s) that wasn't what was asked for (%s)", resp.GroupID, broadcastConsumerGroup)
228229
}
229230
switch {
@@ -271,7 +272,7 @@ func (lib *Library[ID, TX, DB]) deleteBroadcastConsumerGroup(ctx context.Context
271272
lib.tracer.Logf("[events] deleting consumer group %s", broadcastConsumerGroup)
272273
resp, err := client.DeleteGroups(ctxWithTimeout, &kafka.DeleteGroupsRequest{
273274
Addr: client.Addr,
274-
GroupIDs: []string{string(broadcastConsumerGroup)},
275+
GroupIDs: []string{lib.addPrefix(string(broadcastConsumerGroup))},
275276
})
276277
cancelCtx()
277278
switch {
@@ -476,22 +477,22 @@ func (a *lockIDAllocator) grow() {
476477
a.max = n
477478
}
478479

479-
func broadcastTopics(topics []string) []string {
480-
if !generic.SliceContainsElement(topics, heartbeatTopic.Topic()) {
480+
func broadcastTopics(unprefixedTopics []string) []string {
481+
if !generic.SliceContainsElement(unprefixedTopics, heartbeatTopic.Topic()) {
481482
// we don't actually need a handler, just need to subscribe to the heartbeat topic
482-
topics = append(generic.CopySlice(topics), heartbeatTopic.Topic())
483+
unprefixedTopics = append(generic.CopySlice(unprefixedTopics), heartbeatTopic.Topic())
483484
}
484-
return topics
485+
return unprefixedTopics
485486
}
486487

487488
// getReader only returns once the reader is actually connected working. This is determined
488489
// by calling Stats()
489490
// The only error that getReader returns is from context cancellation
490-
func (lib *Library[ID, TX, DB]) getReader(ctx context.Context, consumerGroup consumerGroupName, topics []string, isBroadcast bool, resetPosition bool) (*kafka.Reader, *kafka.ReaderStats, *kafka.ReaderConfig, error) {
491+
func (lib *Library[ID, TX, DB]) getReader(ctx context.Context, consumerGroup consumerGroupName, prefixedTopics []string, isBroadcast bool, resetPosition bool) (*kafka.Reader, *kafka.ReaderStats, *kafka.ReaderConfig, error) {
491492
readerConfig := kafka.ReaderConfig{
492493
Brokers: lib.brokers,
493-
GroupID: consumerGroup.String(),
494-
GroupTopics: topics,
494+
GroupID: lib.addPrefix(consumerGroup.String()),
495+
GroupTopics: prefixedTopics,
495496
MaxBytes: maxBytes,
496497
Dialer: lib.dialer(),
497498
StartOffset: kafka.FirstOffset,

connection.go

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ const (
4242
broadcastStartupMaxWait = 15 * time.Minute // only checked on group allocation failure
4343
nonBroadcastReaderIdleTimeout = 5 * time.Minute
4444
broadcastReaderIdleTimeout = baseBroadcastHeartbeat * 3 // This cannot be < 1s: Kafka takes a while to deliver events after reader startup or generation change
45-
maxConsumerGroupNameLength = 35
45+
maxConsumerGroupNameLength = 55 // actual limit is 249 characters, but we need to subtract this from the topic name lenght limit so we use less
4646
dialTimeout = time.Minute * 2
4747
deleteTimeout = time.Minute * 4
4848
describeTimeout = time.Minute * 4
@@ -80,7 +80,7 @@ const (
8080
)
8181

8282
var (
83-
maxTopicNameLength = 255 - maxConsumerGroupNameLength - len(deadLetterTopicPostfix) - 1
83+
maxTopicNameLength = 249 - maxConsumerGroupNameLength - len(deadLetterTopicPostfix) - 1 // 249 is actual limit
8484
LegalTopicNames = regexp.MustCompile(fmt.Sprintf(`^[-._a-zA-Z0-9]{1,%d}$`, maxTopicNameLength))
8585
legalConsumerGroupNames = regexp.MustCompile(fmt.Sprintf(`^[-._a-zA-Z0-9]{1,%d}$`, maxConsumerGroupNameLength))
8686
)
@@ -102,8 +102,8 @@ type LibraryNoDB struct {
102102
broadcast *group
103103
startTime time.Time
104104
ready atomic.Int32
105-
topicConfig map[string]kafka.TopicConfig
106-
topicsWork pwork.Work[string, topicsWhy]
105+
topicConfig map[string]kafka.TopicConfig // un-prefixed
106+
topicsWork pwork.Work[string, topicsWhy] // un-prefixed in APIs
107107
topicListingStarted sync.Once
108108
topicsHaveBeenListed chan struct{}
109109
mustRegisterTopics bool
@@ -124,6 +124,7 @@ type LibraryNoDB struct {
124124
instanceID int32
125125
lazyProduce bool
126126
skipNotifier bool
127+
prefix string // prefixes all topics and consumer groups
127128

128129
// lock must be held when....
129130
//
@@ -157,13 +158,13 @@ const (
157158
)
158159

159160
type group struct {
160-
topics map[string]*topicHandlers
161-
maxIdle time.Duration // reset reader when idle for this long
161+
topics map[string]*topicHandlers // unprefixed topic -> handlers
162+
maxIdle time.Duration // reset reader when idle for this long
162163
}
163164

164165
type topicHandlers struct {
165-
handlerNames []string // so that iteration is deterministic
166-
handlers map[string]*registeredHandler
166+
handlerNames []string // so that iteration is deterministic
167+
handlers map[string]*registeredHandler // handlerName -> handler
167168
}
168169

169170
type registeredHandler struct {
@@ -197,7 +198,7 @@ type handlerSuccess struct {
197198
type HandlerOpt func(*registeredHandler, *LibraryNoDB)
198199

199200
type canHandle interface {
200-
GetTopic() string
201+
GetTopic() string // unprefixed
201202
Handle(ctx context.Context, handlerInfo eventmodels.HandlerInfo, message []*kafka.Message) []error
202203
Batch() bool
203204
}
@@ -262,6 +263,17 @@ func (lib *Library[ID, TX, DB]) SetBroadcastConsumerMaxLocks(max uint32) {
262263
lib.broadcastConsumerMaxLock = max
263264
}
264265

266+
// SetPrefix sets a string prefix used for all topics and all consumer groups. Keep this
267+
// short since max topic length is :
268+
//
269+
// 249 - 55 (maxConsumerGroupLength) - len("-dead-letter") - len(prefix) - 1
270+
func (lib *Library[ID, TX, DB]) SetPrefix(prefix string) {
271+
lib.lock.Lock()
272+
defer lib.lock.Unlock()
273+
lib.mustNotBeRunning("attempt configure event library that is already processing")
274+
lib.prefix = prefix
275+
}
276+
265277
// DoNotLockBroadcastConsumerNumbers must be used before starting the library. If called,
266278
// no database locks will be taken to reserve broadcast consumer numbers. There is a
267279
// tradeoff: this saves a database connection that would otherwise sit around holding
@@ -670,7 +682,7 @@ func (lib *Library[ID, TX, DB]) getConsumerGroupCoordinator(ctx context.Context,
670682
}
671683
resp, err := controller.FindCoordinator(ctx, &kafka.FindCoordinatorRequest{
672684
Addr: controller.Addr,
673-
Key: consumerGroup.String(),
685+
Key: lib.addPrefix(consumerGroup.String()),
674686
KeyType: kafka.CoordinatorKeyTypeConsumer,
675687
})
676688
if err != nil {
@@ -743,3 +755,34 @@ func NewConsumerGroup(name string) ConsumerGroupName {
743755

744756
func (n consumerGroupName) String() string { return string(n) }
745757
func (n consumerGroupName) name() consumerGroupName { return n }
758+
759+
func (lib *LibraryNoDB) validateTopic(unprefixedTopic string) error {
760+
if len(unprefixedTopic)+len(lib.prefix) > maxTopicNameLength {
761+
return errors.Errorf("topic name (%s) is too long", unprefixedTopic)
762+
}
763+
if !LegalTopicNames.MatchString(unprefixedTopic) {
764+
return errors.Errorf("topic name (%s) is invalid", unprefixedTopic)
765+
}
766+
return nil
767+
}
768+
769+
// RemovePrefix removes a prefix from a topic or consumer group that was added
770+
// because the library had SetPrefix called previously.
771+
func (lib *LibraryNoDB) RemovePrefix(topicOrConsumerGroup string) string {
772+
if lib.prefix == "" {
773+
return topicOrConsumerGroup
774+
}
775+
return strings.TrimPrefix(topicOrConsumerGroup, lib.prefix)
776+
}
777+
778+
func (lib *LibraryNoDB) addPrefix(topicOrConsumerGroup string) string {
779+
return lib.prefix + topicOrConsumerGroup
780+
}
781+
782+
func (lib *LibraryNoDB) addPrefixes(topicsOrConsumerGroups []string) []string {
783+
p := make([]string, len(topicsOrConsumerGroups))
784+
for i, unprefixed := range topicsOrConsumerGroups {
785+
p[i] = lib.prefix + unprefixed
786+
}
787+
return p
788+
}

connection_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ func TestConstants(t *testing.T) {
1111
t.Log("make sure nobody changes contants in ways that would break behavior")
1212
assert.Greater(t, broadcastReaderIdleTimeout, time.Second)
1313
assert.Less(t, broadcastHeartbeatRandom, 0.8)
14-
assert.Less(t, maxConsumerGroupNameLength, 36)
14+
assert.Less(t, maxConsumerGroupNameLength, 56)
1515
}

0 commit comments

Comments
 (0)