Skip to content

Commit 507f37f

Browse files
committed
topic post limits
1 parent 25229e8 commit 507f37f

File tree

16 files changed

+684
-118
lines changed

16 files changed

+684
-118
lines changed

connection.go

Lines changed: 79 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"math/rand"
99
"net"
1010
"regexp"
11+
"strconv"
1112
"strings"
1213
"sync"
1314
"sync/atomic"
@@ -93,6 +94,23 @@ type Library[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX, DB eventm
9394
}
9495

9596
type LibraryNoDB struct {
97+
// --- Size cap subsystem fields (producer-only, decoupled from topic creation) ---
98+
// Global broker caps
99+
sizeCapBrokerState atomic.Int32 // 0=unstarted 1=loading 2=ready 3=failed
100+
sizeCapBrokerReady chan struct{}
101+
sizeCapBrokerMessageMax atomic.Int32 // message.max.bytes (0 unknown)
102+
sizeCapSocketRequestMax atomic.Int32 // socket.request.max.bytes (rarely limiting; 0 unknown)
103+
sizeCapBrokerErr atomic.Value // error or nil
104+
sizeCapDefaultAssumed int // anything smaller than this can be sent before knowing actual limits
105+
106+
// Precreated topics scan (enumeration of already seen topics; optional background)
107+
sizeCapPrecreatedState atomic.Int32 // 0=unstarted 1=running 2=done 3=failed
108+
sizeCapPrecreatedReady chan struct{}
109+
sizeCapPrecreatedErr atomic.Value
110+
111+
// Per-topic limits map (separate from creatingTopic). Keys are topic names.
112+
sizeCapTopicLimits gwrap.SyncMap[string, *sizeCapTopicLimit]
113+
96114
hasDB atomic.Bool
97115
tracer eventmodels.Tracer
98116
brokers []string
@@ -233,6 +251,8 @@ func New[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX, DB eventmodel
233251
doEnhance: true,
234252
instanceID: instanceCount.Add(1),
235253
topicsHaveBeenListed: make(chan struct{}),
254+
sizeCapBrokerReady: make(chan struct{}),
255+
sizeCapDefaultAssumed: 1_000_000,
236256
},
237257
}
238258
}
@@ -286,6 +306,14 @@ func (lib *Library[ID, TX, DB]) SetLazyTxProduce(lazy bool) {
286306
lib.lazyProduce = lazy
287307
}
288308

309+
// SetSizeCapLowerLimit overrides the default lower limit on sizes: any message under
310+
// this size can be sent before actual limits are known.
311+
func (lib *Library[ID, TX, DB]) SetSizeCapLowerLimit(sizeCapDefaultAssumed int) {
312+
lib.lock.Lock()
313+
defer lib.lock.Unlock()
314+
lib.sizeCapDefaultAssumed = sizeCapDefaultAssumed
315+
}
316+
289317
// Configure sets up the Library so that it has the configuration it needs to run.
290318
// The database connection is optional. Without it, certain features will always error:
291319
//
@@ -603,6 +631,32 @@ func (lib *Library[ID, TX, DB]) Tracer() eventmodels.Tracer { return lib
603631
// getController returns a client talking to the controlling broker. The
604632
// controller is needed for certain requests, like creating a topic
605633
func (lib *LibraryNoDB) getController(ctx context.Context) (_ *kafka.Client, err error) {
634+
var c *kafka.Client
635+
err = lib.findABroker(ctx, func(conn *kafka.Conn) error {
636+
controller, err := conn.Controller()
637+
if err != nil {
638+
return errors.Errorf("event library get controller from kafka connection: %w", err)
639+
}
640+
ips, err := net.LookupIP(controller.Host)
641+
if err != nil {
642+
return errors.Errorf("event library lookup IP of controller (%s): %w", controller.Host, err)
643+
}
644+
if len(ips) == 0 {
645+
return errors.Errorf("event library lookup IP of controller (%s) got no addresses", controller.Host)
646+
}
647+
c = &kafka.Client{
648+
Addr: &net.TCPAddr{
649+
IP: ips[0],
650+
Port: controller.Port,
651+
},
652+
Transport: lib.transport(),
653+
}
654+
return nil
655+
})
656+
return c, err
657+
}
658+
659+
func (lib *LibraryNoDB) findABroker(ctx context.Context, f func(*kafka.Conn) error) (err error) {
606660
dialer := lib.dialer()
607661
var tried int
608662
for _, i := range rand.Perm(len(lib.brokers)) {
@@ -613,7 +667,7 @@ func (lib *LibraryNoDB) getController(ctx context.Context) (_ *kafka.Client, err
613667
lib.tracer.Logf("[events] could not connect to broker %d (of %d) %s: %v", i+1, len(lib.brokers), broker, err)
614668
if tried == len(lib.brokers) {
615669
// last broker, give up
616-
return nil, errors.Errorf("event library dial kafka broker (%s): %w", broker, err)
670+
return errors.Errorf("event library dial kafka broker (%s): %w", broker, err)
617671
}
618672
continue
619673
}
@@ -623,27 +677,31 @@ func (lib *LibraryNoDB) getController(ctx context.Context) (_ *kafka.Client, err
623677
err = errors.Errorf("event library close dialer (%s): %w", lib.brokers[0], err)
624678
}
625679
}()
626-
controller, err := conn.Controller()
627-
if err != nil {
628-
return nil, errors.Errorf("event library get controller from kafka connection: %w", err)
629-
}
630-
ips, err := net.LookupIP(controller.Host)
631-
if err != nil {
632-
return nil, errors.Errorf("event library lookup IP of controller (%s): %w", controller.Host, err)
633-
}
634-
if len(ips) == 0 {
635-
return nil, errors.Errorf("event library lookup IP of controller (%s) got no addresses", controller.Host)
636-
}
637-
return &kafka.Client{
638-
Addr: &net.TCPAddr{
639-
IP: ips[0],
640-
Port: controller.Port,
641-
},
642-
Transport: lib.transport(),
643-
}, nil
680+
return f(conn)
681+
}
682+
return errors.Errorf("unexpected condition")
683+
}
684+
685+
func (lib *LibraryNoDB) getBrokers(ctx context.Context) ([]kafka.Broker, error) {
686+
var brokers []kafka.Broker
687+
err := lib.findABroker(ctx, func(conn *kafka.Conn) error {
688+
var err error
689+
brokers, err = conn.Brokers()
690+
return err
691+
})
692+
return brokers, err
693+
}
694+
695+
func (lib *LibraryNoDB) getABrokerID(ctx context.Context) (string, error) {
696+
brokers, err := lib.getBrokers(ctx)
697+
if err != nil {
698+
return "", errors.WithStack(err)
699+
}
700+
if len(brokers) == 0 {
701+
return "", errors.Errorf("get brokers request returned no brokers")
644702
}
645-
// should not be able to get here
646-
return nil, errors.Errorf("unexpected condition")
703+
broker := brokers[rand.Intn(len(brokers))]
704+
return strconv.Itoa(broker.ID), nil
647705
}
648706

649707
// getConsumerGroupCoordinator returns a client talking to the control group's

consume.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,12 @@ func (lib *Library[ID, TX, DB]) StartConsuming(ctx context.Context) (started cha
9191
lib.tracer.Logf("[events] Debug: consume startwait +%d for readers", len(lib.readers))
9292
}
9393
allStarted.Add(len(lib.readers))
94-
allDone.Add(len(lib.readers))
94+
allDone.Add(len(lib.readers)) // for each reader
9595
if debugConsumeStartup {
9696
lib.tracer.Logf("[events] Debug: allDone +%d (readers)", len(lib.readers))
9797
}
9898
if len(lib.broadcast.topics) > 0 {
99-
allDone.Add(1)
99+
allDone.Add(1) // for the consumer group consumer
100100
if debugConsumeStartup {
101101
lib.tracer.Logf("[events] Debug: consume startwait +1 for broadcast")
102102
lib.tracer.Logf("[events] Debug: allDone +1 broadcast")
@@ -145,7 +145,7 @@ func (lib *Library[ID, TX, DB]) startConsumingGroup(ctx context.Context, consume
145145
// startedSideEffects should be called only once the consumer is started
146146
startedSideEffects := once.New(func() {
147147
if isBroadcast {
148-
allDone.Add(1)
148+
allDone.Add(1) // for the heartbead sending
149149
if debugConsumeStartup {
150150
lib.tracer.Logf("[events] Debug: allDone +1 for broadcast heartbeat %s %s", consumerGroup, group.Describe())
151151
}
@@ -300,7 +300,7 @@ func (lib *Library[ID, TX, DB]) consume(ctx context.Context, consumerGroup consu
300300
if debugConsumeStartup {
301301
lib.tracer.Logf("[events] Debug: allDone +1 for process commits for %s %s", consumerGroup, group.Describe())
302302
}
303-
allDone.Add(1)
303+
allDone.Add(1) // for processCommits
304304
go lib.processCommits(commitsSoftCtx, ctx, consumerGroup, cg, reader, done, allDone)
305305
for {
306306
select {

consumer_group_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ func TestBroadcastGroupRefresh(t *testing.T) {
108108
if os.Getenv("EVENTS_KAFKA_BROKERS") == "" {
109109
t.Skipf("%s requires kafka brokers", t.Name())
110110
}
111+
t.Log("starting broadcast refresh test")
111112
ntest.RunTest(ntest.BufferedLogger(t),
112113
CommonInjectors,
113114
nject.Provide("nodb", func() *NoDB {

eventnodb/shared_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ var chain = nject.Sequence("nodb-injectors",
4848
)
4949

5050
func TestSharedEventNoDB(t *testing.T) {
51+
t.Log("starting no-database matrix tests")
5152
ntest.RunParallelMatrix(ntest.BufferedLogger(t),
5253
chain,
5354
eventtest.GenerateSharedTestMatrix[eventmodels.BinaryEventID, *eventnodb.NoDBTx, *eventnodb.NoDB](),

eventpg/shared_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func pgconn(db *sql.DB) *DBType {
8383
type LibraryType = events.Library[eventmodels.StringEventID, *eventdb.ExampleBasicTX, *eventpg.Connection[*eventdb.ExampleBasicTX, eventdb.ExampleBasicDB]]
8484

8585
func TestSharedEventPG(t *testing.T) {
86+
t.Log("starting Postgres matrix tests")
8687
ntest.RunParallelMatrix(ntest.BufferedLogger(t),
8788
chain,
8889
eventtest.GenerateSharedTestMatrix[eventmodels.StringEventID, *eventdb.ExampleBasicTX, *eventpg.Connection[*eventdb.ExampleBasicTX, eventdb.ExampleBasicDB]](),

events2/shared_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ func s2conn(db *sql.DB) *DBType {
104104
type LibraryType = events.Library[eventmodels.BinaryEventID, *eventdb.ExampleBasicTX, *events2.Connection[*eventdb.ExampleBasicTX, eventdb.ExampleBasicDB]]
105105

106106
func TestSharedEventS2(t *testing.T) {
107+
t.Log("starting S2 martrix tests")
107108
ntest.RunParallelMatrix(ntest.BufferedLogger(t),
108109
chain,
109110
eventtest.GenerateSharedTestMatrix[eventmodels.BinaryEventID, *eventdb.ExampleBasicTX, *events2.Connection[*eventdb.ExampleBasicTX, eventdb.ExampleBasicDB]](),
@@ -112,6 +113,7 @@ func TestSharedEventS2(t *testing.T) {
112113

113114
func TestLockingLockOrDie(t *testing.T) {
114115
t.Parallel()
116+
t.Log("starting S2 locking test")
115117
ntest.RunTest(ntest.BufferedLogger(t), chain, func(
116118
t ntest.T,
117119
ctx context.Context,

eventtest/common.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package eventtest
44
import (
55
"context"
66
"os"
7+
"regexp"
78
"strings"
89
"time"
910

@@ -32,6 +33,9 @@ func KafkaBrokers(t T) Brokers {
3233
}
3334

3435
var CommonInjectors = nject.Sequence("common",
36+
nject.Required(func(t T) {
37+
t.Logf("starting %s", t.Name())
38+
}),
3539
nject.Provide("context", context.Background),
3640
nject.Required(nject.Provide("Report-results", func(inner func(), t T) {
3741
defer func() {
@@ -67,18 +71,26 @@ type AugmentAbstractDB[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX]
6771
eventmodels.CanAugment[ID, TX]
6872
}
6973

74+
var squashRE = regexp.MustCompile(`[^A-Z]+`)
75+
7076
func Name(t ntest.T) string {
71-
x := strings.Split(t.Name(), "/")
72-
return x[len(x)-1]
77+
n := t.Name()
78+
x := strings.LastIndexByte(n, '/')
79+
if x == -1 {
80+
return n
81+
}
82+
after := n[x+1:]
83+
before := n[:x]
84+
return squashRE.ReplaceAllString(before, "") + after
7385
}
7486

7587
type MyEvent struct {
7688
S string
7789
}
7890

7991
var (
80-
DeliveryTimeout = LongerOnCI(20*time.Second, 10*time.Minute, 2*time.Minute)
81-
StartupTimeout = LongerOnCI(65*time.Second, 5*time.Minute, 65*time.Second)
92+
DeliveryTimeout = LongerOnCI(60*time.Second, 10*time.Minute, 4*time.Minute)
93+
StartupTimeout = LongerOnCI(85*time.Second, 7*time.Minute, 125*time.Second)
8294
)
8395

8496
func IsNilDB[DB any](db DB) bool {
@@ -135,6 +147,7 @@ func GenerateSharedTestMatrix[
135147
"OrderedBlock2CG": nject.Provide("OB2", OrderedBlockTestTwoCG[ID, TX, DB]),
136148
"OrderedRetryLater1CG": nject.Provide("ORL1", OrderedRetryTestOncCG[ID, TX, DB]),
137149
"OrderedRetryLater2CG": nject.Provide("ORL2", OrderedRetryTestTwoCG[ID, TX, DB]),
150+
"OversizeSendTest": nject.Provide("OST", OversizeSendTest[ID, TX, DB]),
138151
"UnfilteredNotifier": nject.Provide("UN", EventUnfilteredNotifierTest[ID, TX, DB]),
139152
}
140153
}

eventtest/dead_letter.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ func DeadLetterTest[
9797
}
9898
var deliveryInfo deliveryInfoBlock
9999

100+
t.Logf("consumer group name: %s", Name(t)+"CG")
100101
consumerGroup := events.NewConsumerGroup(Name(t) + "CG")
101102

102103
var lock sync.Mutex

eventtest/happy_path.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -248,12 +248,12 @@ func IdempotentDeliveryTest[
248248
}
249249

250250
t.Log("Register same consumer group across libraries")
251-
lib1.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentA"), eventmodels.OnFailureBlock, "Ia1", topic.Handler(mkIdempotentHandler("Ia1")))
252-
lib2.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentA"), eventmodels.OnFailureBlock, "Ia2", topic.Handler(mkIdempotentHandler("Ia2")))
251+
lib1.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnA"), eventmodels.OnFailureBlock, "Ia1", topic.Handler(mkIdempotentHandler("Ia1")))
252+
lib2.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnA"), eventmodels.OnFailureBlock, "Ia2", topic.Handler(mkIdempotentHandler("Ia2")))
253253

254254
t.Log("Register in different consumer groups")
255-
lib1.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentB"), eventmodels.OnFailureBlock, "Ib1", topic.Handler(mkIdempotentHandler("Ib1")))
256-
lib2.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentC"), eventmodels.OnFailureBlock, "Ic1", topic.Handler(mkIdempotentHandler("Ic1")))
255+
lib1.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnB"), eventmodels.OnFailureBlock, "Ib1", topic.Handler(mkIdempotentHandler("Ib1")))
256+
lib2.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnC"), eventmodels.OnFailureBlock, "Ic1", topic.Handler(mkIdempotentHandler("Ic1")))
257257

258258
t.Log("Start consumers and producers")
259259

@@ -409,8 +409,8 @@ func ExactlyOnceDeliveryTest[
409409
}
410410

411411
// Same consumer group across libraries
412-
lib1.ConsumeExactlyOnce(events.NewConsumerGroup(Name(t)+"-exactlyonce"), eventmodels.OnFailureBlock, "EO1", topic.HandlerTx(mkExactlyOnceHandler("EO1")))
413-
lib2.ConsumeExactlyOnce(events.NewConsumerGroup(Name(t)+"-exactlyonce"), eventmodels.OnFailureBlock, "EO2", topic.HandlerTx(mkExactlyOnceHandler("EO2")))
412+
lib1.ConsumeExactlyOnce(events.NewConsumerGroup(Name(t)+"-extly1"), eventmodels.OnFailureBlock, "EO1", topic.HandlerTx(mkExactlyOnceHandler("EO1")))
413+
lib2.ConsumeExactlyOnce(events.NewConsumerGroup(Name(t)+"-extly1"), eventmodels.OnFailureBlock, "EO2", topic.HandlerTx(mkExactlyOnceHandler("EO2")))
414414

415415
// Start consumers and producers
416416
produceDone, err := lib1.CatchUpProduce(ctx, time.Second*5, 64)
@@ -521,7 +521,7 @@ func CloudEventEncodingTest[
521521
topic := eventmodels.BindTopic[myEvent](Name(t) + "Topic")
522522
lib.SetTopicConfig(kafka.TopicConfig{Topic: topic.Topic()})
523523

524-
lib.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentA"), eventmodels.OnFailureBlock, Name(t), topic.Handler(
524+
lib.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnA"), eventmodels.OnFailureBlock, Name(t), topic.Handler(
525525
func(ctx context.Context, e eventmodels.Event[myEvent]) error {
526526
lock.Lock()
527527
defer lock.Unlock()
@@ -686,7 +686,7 @@ func CloudEventEncodingTest[
686686
assert.Equalf(t, id, meta.Subject, "subject msg%d %s", i+1, id)
687687
assert.Equalf(t, Name(t)+"Topic", meta.Type, "type msg%d %s", i+1, id)
688688
assert.Equalf(t, "1.0", meta.SpecVersion, "specVersion msg%d %s", i+1, id)
689-
assert.Equalf(t, Name(t)+"-idempotentA", meta.ConsumerGroup, "consumerGroup msg%d %s", i+1, id)
689+
assert.Equalf(t, Name(t)+"-idptnA", meta.ConsumerGroup, "consumerGroup msg%d %s", i+1, id)
690690
assert.Equalf(t, Name(t), meta.HandlerName, "handlerName msg%d %s", i+1, id)
691691
}
692692
}

0 commit comments

Comments
 (0)