Skip to content

Commit 65e62a5

Browse files
committed
removePrefix
1 parent 59b4cd3 commit 65e62a5

File tree

4 files changed

+37
-8
lines changed

4 files changed

+37
-8
lines changed

connection.go

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -249,13 +249,21 @@ func (lib *Library[ID, TX, DB]) SetEnhanceDB(enhance bool) {
249249
lib.doEnhance = enhance
250250
}
251251

252+
// SetBroadcastConsumerBaseName sets the base consumer group name used for
253+
// creating broadcast consumers. A dash (-) and number will be appended.
254+
// Calling this after consumers have started or messages are being produced will panic.
252255
func (lib *Library[ID, TX, DB]) SetBroadcastConsumerBaseName(name string) {
253256
lib.lock.Lock()
254257
defer lib.lock.Unlock()
255258
lib.mustNotBeRunning("attempt configure event library that is already processing")
256259
lib.broadcastConsumerBaseName = name
257260
}
258261

262+
// SetBroadcastConsumerMaxLocks sets a limit on the maximum number used to form
263+
// consumer group IDs for creating broadcast consumers. Defaults to 200. Keep this
264+
// reasonably small so that Kafka doesn't end up thrashing if broadcast consumers
265+
// cannot be allocated.
266+
// Calling this after consumers have started or messages are being produced will panic.
259267
func (lib *Library[ID, TX, DB]) SetBroadcastConsumerMaxLocks(max uint32) {
260268
lib.lock.Lock()
261269
defer lib.lock.Unlock()
@@ -267,6 +275,8 @@ func (lib *Library[ID, TX, DB]) SetBroadcastConsumerMaxLocks(max uint32) {
267275
// short since max topic length is :
268276
//
269277
// 249 - 55 (maxConsumerGroupLength) - len("-dead-letter") - len(prefix) - 1
278+
//
279+
// Calling this after consumers have started or messages are being produced will panic.
270280
func (lib *Library[ID, TX, DB]) SetPrefix(prefix string) {
271281
lib.lock.Lock()
272282
defer lib.lock.Unlock()
@@ -284,6 +294,8 @@ func (lib *Library[ID, TX, DB]) SetPrefix(prefix string) {
284294
// locks: they'll use different namespace prefixes unless you mess that up by
285295
// using SetBroadcastConsumerBaseName() with the same name for both lock-free and locked
286296
// instances.
297+
//
298+
// Calling this after consumers have started or messages are being produced will panic.
287299
func (lib *Library[ID, TX, DB]) DoNotLockBroadcastConsumerNumbers() {
288300
lib.lock.Lock()
289301
defer lib.lock.Unlock()
@@ -295,6 +307,8 @@ func (lib *Library[ID, TX, DB]) DoNotLockBroadcastConsumerNumbers() {
295307
// the [Library] doesn't have a running producer. Defaults to false. If true, the
296308
// event will be left in the database for some other [Library] to pick up sometime
297309
// in the future.
310+
//
311+
// Calling this after consumers have started or messages are being produced will panic.
298312
func (lib *Library[ID, TX, DB]) SetLazyTxProduce(lazy bool) {
299313
lib.lock.Lock()
300314
defer lib.lock.Unlock()
@@ -304,6 +318,8 @@ func (lib *Library[ID, TX, DB]) SetLazyTxProduce(lazy bool) {
304318

305319
// SkipNotifierSupport turns off support for runtime subscription to broadcast topics
306320
// via RegisterFiltered and RegisterUnfiltered. This is mostly used in testing the events library.
321+
//
322+
// Calling this after consumers have started or messages are being produced will panic.
307323
func (lib *Library[ID, TX, DB]) SkipNotifierSupport() {
308324
lib.lock.Lock()
309325
defer lib.lock.Unlock()
@@ -318,6 +334,10 @@ func (lib *Library[ID, TX, DB]) SkipNotifierSupport() {
318334
// StartConsuming requires a database if ConsumeExactlyOnce has been called
319335
//
320336
// The conn parameter may be nil, in which case CatchUpProduce() and ProduceFromTable() will error.
337+
//
338+
// It is required that Configure be called exactly once before any consumers are started or
339+
// messages are produced.
340+
// Calling this after consumers have started or messages are being produced will panic.
321341
func (lib *Library[ID, TX, DB]) Configure(conn DB, tracer eventmodels.Tracer, mustRegisterTopics bool, saslMechanism sasl.Mechanism, tlsConfig *TLSConfig, brokers []string) {
322342
if !lib.skipNotifier {
323343
processRegistrationTodo(lib) // before taking lock to avoid deadlock
@@ -396,7 +416,7 @@ func (lib *Library[ID, TX, DB]) BroadcastConsumerLastLatency() (lastGap time.Dur
396416
// all handler instances for that consumerGroupName (messages will only be delivered successfully
397417
// once per consumer group)
398418
func (lib *Library[ID, TX, DB]) ConsumeExactlyOnce(consumerGroup ConsumerGroupName, onFailure eventmodels.OnFailure, handlerName string, handler eventmodels.HandlerTxInterface[ID, TX], opts ...HandlerOpt) {
399-
handler.SetLibrary(lib)
419+
handler.SetLibrary(libraryInterface[ID, TX, DB]{lib})
400420
lib.lock.Lock()
401421
defer lib.lock.Unlock()
402422
lib.mustNotBeRunning("attempt configure event consumer in library that is already processing")
@@ -417,7 +437,7 @@ func (lib *Library[ID, TX, DB]) ConsumeExactlyOnce(consumerGroup ConsumerGroupNa
417437
// and one of them returns error, then the message can be re-delivered to the handlers that did not
418438
// return error.
419439
func (lib *Library[ID, TX, DB]) ConsumeIdempotent(consumerGroup ConsumerGroupName, onFailure eventmodels.OnFailure, handlerName string, handler eventmodels.HandlerInterface, opts ...HandlerOpt) {
420-
handler.SetLibrary(lib)
440+
handler.SetLibrary(libraryInterface[ID, TX, DB]{lib})
421441
lib.lock.Lock()
422442
defer lib.lock.Unlock()
423443
lib.mustNotBeRunning("attempt configure event consumer in library that is already processing")
@@ -431,7 +451,7 @@ func (lib *Library[ID, TX, DB]) ConsumeIdempotent(consumerGroup ConsumerGroupNam
431451
// When broadcast handlers return error, the message will be dropped.
432452
// By default broadcast handlers are not retried and the handler timeout is 30 seconds.
433453
func (lib *Library[ID, TX, DB]) ConsumeBroadcast(handlerName string, handler eventmodels.HandlerInterface, opts ...HandlerOpt) {
434-
handler.SetLibrary(lib)
454+
handler.SetLibrary(libraryInterface[ID, TX, DB]{lib})
435455
lib.lock.Lock()
436456
defer lib.lock.Unlock()
437457
lib.mustNotBeRunning("attempt configure event consumer in library that is already processing")
@@ -766,9 +786,9 @@ func (lib *LibraryNoDB) validateTopic(unprefixedTopic string) error {
766786
return nil
767787
}
768788

769-
// RemovePrefix removes a prefix from a topic or consumer group that was added
789+
// removePrefix removes a prefix from a topic or consumer group that was added
770790
// because the library had SetPrefix called previously.
771-
func (lib *LibraryNoDB) RemovePrefix(topicOrConsumerGroup string) string {
791+
func (lib *LibraryNoDB) removePrefix(topicOrConsumerGroup string) string {
772792
if lib.prefix == "" {
773793
return topicOrConsumerGroup
774794
}
@@ -786,3 +806,12 @@ func (lib *LibraryNoDB) addPrefixes(topicsOrConsumerGroups []string) []string {
786806
}
787807
return p
788808
}
809+
810+
// libraryInterface exists to implement eventmodels.LibraryInterface and make RemovePrefix public
811+
type libraryInterface[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX, DB eventmodels.AbstractDB[ID, TX]] struct {
812+
*Library[ID, TX, DB]
813+
}
814+
815+
func (lib libraryInterface[ID, TX, DB]) RemovePrefix(topicOrConsumerGroup string) string {
816+
return lib.removePrefix(topicOrConsumerGroup)
817+
}

consume.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ func (lib *Library[ID, TX, DB]) deliverOneMessage(
403403
sequenceNumber: sequenceNumber,
404404
Message: &msg,
405405
}
406-
handlers, ok := group.topics[lib.RemovePrefix(msg.Topic)]
406+
handlers, ok := group.topics[lib.removePrefix(msg.Topic)]
407407
if ok {
408408
waiters := make(chan handlerSuccess, len(handlers.handlerNames))
409409
go func() {

eventmodels/decode.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ var ErrDecode errors.String = "could not unmarshal message"
1818

1919
// decode transforms a Kafka Message into a Event struct.
2020
func decode[E any](message *kafka.Message, consumerGroup string, lib LibraryInterface, handlerName string) (Event[E], error) {
21-
unprefixedTopic := lib.RemovePrefix(message.Topic)
21+
unprefixedTopic := lib.removePrefix(message.Topic)
2222
meta := Event[E]{
2323
Topic: unprefixedTopic,
2424
Type: unprefixedTopic,

topics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ func (lib *LibraryNoDB) listAvailableTopics() {
278278
continue
279279
}
280280
seen[p.Topic] = true
281-
unprefixedTopic := lib.RemovePrefix(p.Topic)
281+
unprefixedTopic := lib.removePrefix(p.Topic)
282282
if lib.prefix != "" && unprefixedTopic == p.Topic {
283283
if debugPrefixIgnore {
284284
lib.tracer.Logf("[events] topic %s found in partition, IGNORING (not prefixed)", p.Topic)

0 commit comments

Comments
 (0)