Skip to content

Commit 7fd3672

Browse files
committed
topic post limits
1 parent b32e53e commit 7fd3672

File tree

19 files changed

+1146
-168
lines changed

19 files changed

+1146
-168
lines changed

connection.go

Lines changed: 83 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"math/rand"
99
"net"
1010
"regexp"
11+
"strconv"
1112
"strings"
1213
"sync"
1314
"sync/atomic"
@@ -23,6 +24,7 @@ import (
2324

2425
"github.com/singlestore-labs/events/eventmodels"
2526
"github.com/singlestore-labs/events/internal"
27+
"github.com/singlestore-labs/events/internal/multi"
2628
"github.com/singlestore-labs/events/internal/pwork"
2729
)
2830

@@ -94,6 +96,20 @@ type Library[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX, DB eventm
9496
}
9597

9698
type LibraryNoDB struct {
99+
// --- Size cap subsystem fields (producer-only, decoupled from topic creation) ---
100+
// Global broker caps
101+
sizeCapBrokerLock sync.Mutex // protects changes to sizeCapBrokerState
102+
sizeCapBrokerState atomic.Int32
103+
sizeCapBrokerLoadCtx *multi.Context
104+
sizeCapBrokerReady chan struct{}
105+
sizeCapDefaultAssumed int64 // anything smaller than this can be sent before knowing actual limits
106+
sizeCapBrokerMessageMax atomic.Int64 // message.max.bytes (0 unknown)
107+
sizeCapSocketRequestMax atomic.Int64 // socket.request.max.bytes (rarely limiting; 0 unknown)
108+
sizeCapWork pwork.Work[string, string] // un-prefixed in APIs
109+
sizeCapTopicLimits gwrap.SyncMap[string, sizeCapTopicLimit] // un-prefixed topic
110+
111+
// Per-topic limits map (separate from creatingTopic). Keys are topic names.
112+
97113
hasDB atomic.Bool
98114
tracer eventmodels.Tracer
99115
brokers []string
@@ -236,9 +252,13 @@ func New[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX, DB eventmodel
236252
doEnhance: true,
237253
instanceID: instanceCount.Add(1),
238254
topicsHaveBeenListed: make(chan struct{}),
255+
sizeCapBrokerReady: make(chan struct{}),
256+
sizeCapDefaultAssumed: 1_000_000,
257+
sizeCapBrokerLoadCtx: multi.New(),
239258
},
240259
}
241260
lib.configureTopicsPrework()
261+
lib.configureSizeCapPrework()
242262
return &lib
243263
}
244264

@@ -327,6 +347,14 @@ func (lib *Library[ID, TX, DB]) SkipNotifierSupport() {
327347
lib.skipNotifier = true
328348
}
329349

350+
// SetSizeCapLowerLimit overrides the default lower limit on sizes: any message under
351+
// this size can be sent before actual limits are known.
352+
func (lib *Library[ID, TX, DB]) SetSizeCapLowerLimit(sizeCapDefaultAssumed int64) {
353+
lib.lock.Lock()
354+
defer lib.lock.Unlock()
355+
lib.sizeCapDefaultAssumed = sizeCapDefaultAssumed
356+
}
357+
330358
// Configure sets up the Library so that it has the configuration it needs to run.
331359
// The database connection is optional. Without it, certain features will always error:
332360
//
@@ -379,14 +407,14 @@ func (lib *Library[ID, TX, DB]) start(str string, args ...any) error {
379407
case isRunning:
380408
return nil
381409
}
410+
if len(lib.brokers) == 0 || lib.brokers[0] == "" {
411+
return errors.Errorf("no brokers configured")
412+
}
382413
lib.writer = kafka.NewWriter(kafka.WriterConfig{
383414
Brokers: lib.brokers,
384415
Dialer: lib.dialer(),
385416
})
386417
lib.ready.Store(isRunning)
387-
if len(lib.brokers) == 0 || lib.brokers[0] == "" {
388-
return errors.Errorf("no brokers configured")
389-
}
390418
return nil
391419
}
392420

@@ -650,6 +678,32 @@ func (lib *Library[ID, TX, DB]) Tracer() eventmodels.Tracer { return lib
650678
// getController returns a client talking to the controlling broker. The
651679
// controller is needed for certain requests, like creating a topic
652680
func (lib *LibraryNoDB) getController(ctx context.Context) (_ *kafka.Client, err error) {
681+
var c *kafka.Client
682+
err = lib.findABroker(ctx, func(conn *kafka.Conn) error {
683+
controller, err := conn.Controller()
684+
if err != nil {
685+
return errors.Errorf("event library get controller from kafka connection: %w", err)
686+
}
687+
ips, err := net.LookupIP(controller.Host)
688+
if err != nil {
689+
return errors.Errorf("event library lookup IP of controller (%s): %w", controller.Host, err)
690+
}
691+
if len(ips) == 0 {
692+
return errors.Errorf("event library lookup IP of controller (%s) got no addresses", controller.Host)
693+
}
694+
c = &kafka.Client{
695+
Addr: &net.TCPAddr{
696+
IP: ips[0],
697+
Port: controller.Port,
698+
},
699+
Transport: lib.transport(),
700+
}
701+
return nil
702+
})
703+
return c, err
704+
}
705+
706+
func (lib *LibraryNoDB) findABroker(ctx context.Context, f func(*kafka.Conn) error) (err error) {
653707
dialer := lib.dialer()
654708
var tried int
655709
for _, i := range rand.Perm(len(lib.brokers)) {
@@ -660,37 +714,41 @@ func (lib *LibraryNoDB) getController(ctx context.Context) (_ *kafka.Client, err
660714
lib.tracer.Logf("[events] could not connect to broker %d (of %d) %s: %v", i+1, len(lib.brokers), broker, err)
661715
if tried == len(lib.brokers) {
662716
// last broker, give up
663-
return nil, errors.Errorf("event library dial kafka broker (%s): %w", broker, err)
717+
return errors.Errorf("event library dial kafka broker (%s): %w", broker, err)
664718
}
665719
continue
666720
}
667721
defer func() {
668722
e := conn.Close()
669723
if err == nil && e != nil {
670-
err = errors.Errorf("event library close dialer (%s): %w", lib.brokers[0], err)
724+
err = errors.Errorf("event library close dialer (%s): %w", broker, e)
671725
}
672726
}()
673-
controller, err := conn.Controller()
674-
if err != nil {
675-
return nil, errors.Errorf("event library get controller from kafka connection: %w", err)
676-
}
677-
ips, err := net.LookupIP(controller.Host)
678-
if err != nil {
679-
return nil, errors.Errorf("event library lookup IP of controller (%s): %w", controller.Host, err)
680-
}
681-
if len(ips) == 0 {
682-
return nil, errors.Errorf("event library lookup IP of controller (%s) got no addresses", controller.Host)
683-
}
684-
return &kafka.Client{
685-
Addr: &net.TCPAddr{
686-
IP: ips[0],
687-
Port: controller.Port,
688-
},
689-
Transport: lib.transport(),
690-
}, nil
727+
return f(conn)
728+
}
729+
return errors.Errorf("unexpected condition")
730+
}
731+
732+
func (lib *LibraryNoDB) getBrokers(ctx context.Context) ([]kafka.Broker, error) {
733+
var brokers []kafka.Broker
734+
err := lib.findABroker(ctx, func(conn *kafka.Conn) error {
735+
var err error
736+
brokers, err = conn.Brokers()
737+
return err
738+
})
739+
return brokers, err
740+
}
741+
742+
func (lib *LibraryNoDB) getABrokerID(ctx context.Context) (string, error) {
743+
brokers, err := lib.getBrokers(ctx)
744+
if err != nil {
745+
return "", errors.WithStack(err)
746+
}
747+
if len(brokers) == 0 {
748+
return "", errors.Errorf("get brokers request returned no brokers")
691749
}
692-
// should not be able to get here
693-
return nil, errors.Errorf("unexpected condition")
750+
broker := brokers[rand.Intn(len(brokers))]
751+
return strconv.Itoa(broker.ID), nil
694752
}
695753

696754
// getConsumerGroupCoordinator returns a client talking to the control group's

consume.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,12 @@ func (lib *Library[ID, TX, DB]) StartConsuming(ctx context.Context) (started cha
104104
lib.tracer.Logf("[events] Debug: consume startwait +%d for readers", len(lib.readers))
105105
}
106106
allStarted.Add(len(lib.readers))
107-
allDone.Add(len(lib.readers))
107+
allDone.Add(len(lib.readers)) // for each reader
108108
if debugConsumeStartup {
109109
lib.tracer.Logf("[events] Debug: allDone +%d (readers)", len(lib.readers))
110110
}
111111
if len(lib.broadcast.topics) > 0 {
112-
allDone.Add(1)
112+
allDone.Add(1) // for the consumer group consumer
113113
if debugConsumeStartup {
114114
lib.tracer.Logf("[events] Debug: consume startwait +1 for broadcast")
115115
lib.tracer.Logf("[events] Debug: allDone +1 broadcast")
@@ -158,7 +158,7 @@ func (lib *Library[ID, TX, DB]) startConsumingGroup(ctx context.Context, consume
158158
// startedSideEffects should be called only once the consumer is started
159159
startedSideEffects := once.New(func() {
160160
if isBroadcast {
161-
allDone.Add(1)
161+
allDone.Add(1) // for the heartbeat sending
162162
if debugConsumeStartup {
163163
lib.tracer.Logf("[events] Debug: allDone +1 for broadcast heartbeat %s %s", consumerGroup, group.Describe())
164164
}
@@ -314,7 +314,7 @@ func (lib *Library[ID, TX, DB]) consume(ctx context.Context, consumerGroup consu
314314
if debugConsumeStartup {
315315
lib.tracer.Logf("[events] Debug: allDone +1 for process commits for %s %s", cgWithPrefix, group.Describe())
316316
}
317-
allDone.Add(1)
317+
allDone.Add(1) // for processCommits
318318
go lib.processCommits(commitsSoftCtx, ctx, consumerGroup, cgWithPrefix, reader, done, allDone)
319319
for {
320320
select {

consumer_group_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ func TestBroadcastGroupRefresh(t *testing.T) {
108108
if os.Getenv("EVENTS_KAFKA_BROKERS") == "" {
109109
t.Skipf("%s requires kafka brokers", t.Name())
110110
}
111+
t.Log("starting broadcast refresh test")
111112
ntest.RunTest(ntest.BufferedLogger(t),
112113
CommonInjectors,
113114
nject.Provide("nodb", func() *NoDB {

eventnodb/shared_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ var chain = nject.Sequence("nodb-injectors",
4949
)
5050

5151
func TestSharedEventNoDB(t *testing.T) {
52+
t.Log("starting no-database matrix tests")
5253
ntest.RunParallelMatrix(ntest.BufferedLogger(t),
5354
chain,
5455
eventtest.GenerateSharedTestMatrix[eventmodels.BinaryEventID, *eventnodb.NoDBTx, *eventnodb.NoDB](),

eventpg/shared_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ func pgconn(db *sql.DB) *DBType {
8484
type LibraryType = events.Library[eventmodels.StringEventID, *eventdb.ExampleBasicTX, *eventpg.Connection[*eventdb.ExampleBasicTX, eventdb.ExampleBasicDB]]
8585

8686
func TestSharedEventPG(t *testing.T) {
87+
t.Log("starting Postgres matrix tests")
8788
ntest.RunParallelMatrix(ntest.BufferedLogger(t),
8889
chain,
8990
eventtest.GenerateSharedTestMatrix[eventmodels.StringEventID, *eventdb.ExampleBasicTX, *eventpg.Connection[*eventdb.ExampleBasicTX, eventdb.ExampleBasicDB]](),

events2/shared_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ func s2conn(db *sql.DB) *DBType {
105105
type LibraryType = events.Library[eventmodels.BinaryEventID, *eventdb.ExampleBasicTX, *events2.Connection[*eventdb.ExampleBasicTX, eventdb.ExampleBasicDB]]
106106

107107
func TestSharedEventS2(t *testing.T) {
108+
t.Log("starting S2 martrix tests")
108109
ntest.RunParallelMatrix(ntest.BufferedLogger(t),
109110
chain,
110111
eventtest.GenerateSharedTestMatrix[eventmodels.BinaryEventID, *eventdb.ExampleBasicTX, *events2.Connection[*eventdb.ExampleBasicTX, eventdb.ExampleBasicDB]](),
@@ -113,6 +114,7 @@ func TestSharedEventS2(t *testing.T) {
113114

114115
func TestLockingLockOrDie(t *testing.T) {
115116
t.Parallel()
117+
t.Log("starting S2 locking test")
116118
ntest.RunTest(ntest.BufferedLogger(t), chain, func(
117119
t ntest.T,
118120
ctx context.Context,

eventtest/common.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package eventtest
44
import (
55
"context"
66
"os"
7+
"regexp"
78
"strings"
89
"time"
910

@@ -34,6 +35,9 @@ func KafkaBrokers(t T) Brokers {
3435
}
3536

3637
var CommonInjectors = nject.Sequence("common",
38+
nject.Required(func(t T) {
39+
t.Logf("starting %s", t.Name())
40+
}),
3741
nject.Provide("context", context.Background),
3842
nject.Required(nject.Provide("Report-results", func(inner func(), t T) {
3943
defer func() {
@@ -72,18 +76,26 @@ type AugmentAbstractDB[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX]
7276
eventmodels.CanAugment[ID, TX]
7377
}
7478

79+
var squashRE = regexp.MustCompile(`[^A-Z]+`)
80+
7581
func Name(t ntest.T) string {
76-
x := strings.Split(t.Name(), "/")
77-
return x[len(x)-1]
82+
n := t.Name()
83+
x := strings.LastIndexByte(n, '/')
84+
if x == -1 {
85+
return n
86+
}
87+
after := n[x+1:]
88+
before := n[:x]
89+
return squashRE.ReplaceAllString(before, "") + after
7890
}
7991

8092
type MyEvent struct {
8193
S string
8294
}
8395

8496
var (
85-
DeliveryTimeout = LongerOnCI(20*time.Second, 10*time.Minute, 2*time.Minute)
86-
StartupTimeout = LongerOnCI(65*time.Second, 5*time.Minute, 65*time.Second)
97+
DeliveryTimeout = LongerOnCI(60*time.Second, 10*time.Minute, 4*time.Minute)
98+
StartupTimeout = LongerOnCI(85*time.Second, 7*time.Minute, 125*time.Second)
8799
)
88100

89101
func IsNilDB[DB any](db DB) bool {
@@ -140,6 +152,7 @@ func GenerateSharedTestMatrix[
140152
"OrderedBlock2CG": nject.Provide("OB2", OrderedBlockTestTwoCG[ID, TX, DB]),
141153
"OrderedRetryLater1CG": nject.Provide("ORL1", OrderedRetryTestOncCG[ID, TX, DB]),
142154
"OrderedRetryLater2CG": nject.Provide("ORL2", OrderedRetryTestTwoCG[ID, TX, DB]),
155+
"OversizeSendTest": nject.Provide("OST", OversizeSendTest[ID, TX, DB]),
143156
"UnfilteredNotifier": nject.Provide("UN", EventUnfilteredNotifierTest[ID, TX, DB]),
144157
}
145158
}

eventtest/dead_letter.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ func DeadLetterTest[
103103
}
104104
var deliveryInfo deliveryInfoBlock
105105

106+
t.Logf("consumer group name: %s", Name(t)+"CG")
106107
consumerGroup := events.NewConsumerGroup(Name(t) + "CG")
107108

108109
var lock sync.Mutex

eventtest/happy_path.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -256,12 +256,12 @@ func IdempotentDeliveryTest[
256256
}
257257

258258
t.Log("Register same consumer group across libraries")
259-
lib1.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentA"), eventmodels.OnFailureBlock, "Ia1", topic.Handler(mkIdempotentHandler("Ia1")))
260-
lib2.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentA"), eventmodels.OnFailureBlock, "Ia2", topic.Handler(mkIdempotentHandler("Ia2")))
259+
lib1.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnA"), eventmodels.OnFailureBlock, "Ia1", topic.Handler(mkIdempotentHandler("Ia1")))
260+
lib2.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnA"), eventmodels.OnFailureBlock, "Ia2", topic.Handler(mkIdempotentHandler("Ia2")))
261261

262262
t.Log("Register in different consumer groups")
263-
lib1.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentB"), eventmodels.OnFailureBlock, "Ib1", topic.Handler(mkIdempotentHandler("Ib1")))
264-
lib2.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentC"), eventmodels.OnFailureBlock, "Ic1", topic.Handler(mkIdempotentHandler("Ic1")))
263+
lib1.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnB"), eventmodels.OnFailureBlock, "Ib1", topic.Handler(mkIdempotentHandler("Ib1")))
264+
lib2.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnC"), eventmodels.OnFailureBlock, "Ic1", topic.Handler(mkIdempotentHandler("Ic1")))
265265

266266
t.Log("Start consumers and producers")
267267

@@ -422,8 +422,8 @@ func ExactlyOnceDeliveryTest[
422422
}
423423

424424
// Same consumer group across libraries
425-
lib1.ConsumeExactlyOnce(events.NewConsumerGroup(Name(t)+"-exactlyonce"), eventmodels.OnFailureBlock, "EO1", topic.HandlerTx(mkExactlyOnceHandler("EO1")))
426-
lib2.ConsumeExactlyOnce(events.NewConsumerGroup(Name(t)+"-exactlyonce"), eventmodels.OnFailureBlock, "EO2", topic.HandlerTx(mkExactlyOnceHandler("EO2")))
425+
lib1.ConsumeExactlyOnce(events.NewConsumerGroup(Name(t)+"-extly1"), eventmodels.OnFailureBlock, "EO1", topic.HandlerTx(mkExactlyOnceHandler("EO1")))
426+
lib2.ConsumeExactlyOnce(events.NewConsumerGroup(Name(t)+"-extly1"), eventmodels.OnFailureBlock, "EO2", topic.HandlerTx(mkExactlyOnceHandler("EO2")))
427427

428428
// Start consumers and producers
429429
produceDone, err := lib1.CatchUpProduce(ctx, time.Second*5, 64)
@@ -537,7 +537,7 @@ func CloudEventEncodingTest[
537537
topic := eventmodels.BindTopic[myEvent](Name(t) + "Topic")
538538
lib.SetTopicConfig(kafka.TopicConfig{Topic: topic.Topic()})
539539

540-
lib.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentA"), eventmodels.OnFailureBlock, Name(t), topic.Handler(
540+
lib.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnA"), eventmodels.OnFailureBlock, Name(t), topic.Handler(
541541
func(ctx context.Context, e eventmodels.Event[myEvent]) error {
542542
lock.Lock()
543543
defer lock.Unlock()
@@ -702,7 +702,7 @@ func CloudEventEncodingTest[
702702
assert.Equalf(t, id, meta.Subject, "subject msg%d %s", i+1, id)
703703
assert.Equalf(t, Name(t)+"Topic", meta.Type, "type msg%d %s", i+1, id)
704704
assert.Equalf(t, "1.0", meta.SpecVersion, "specVersion msg%d %s", i+1, id)
705-
assert.Equalf(t, Name(t)+"-idempotentA", meta.ConsumerGroup, "consumerGroup msg%d %s", i+1, id)
705+
assert.Equalf(t, Name(t)+"-idptnA", meta.ConsumerGroup, "consumerGroup msg%d %s", i+1, id)
706706
assert.Equalf(t, Name(t), meta.HandlerName, "handlerName msg%d %s", i+1, id)
707707
}
708708
}

0 commit comments

Comments
 (0)