Skip to content

Commit 2d57f96

Browse files
committed
topic post limits
1 parent 59b4cd3 commit 2d57f96

File tree

18 files changed

+938
-164
lines changed

18 files changed

+938
-164
lines changed

connection.go

Lines changed: 79 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"math/rand"
99
"net"
1010
"regexp"
11+
"strconv"
1112
"strings"
1213
"sync"
1314
"sync/atomic"
@@ -23,6 +24,7 @@ import (
2324

2425
"github.com/singlestore-labs/events/eventmodels"
2526
"github.com/singlestore-labs/events/internal"
27+
"github.com/singlestore-labs/events/internal/multi"
2628
"github.com/singlestore-labs/events/internal/pwork"
2729
)
2830

@@ -94,6 +96,20 @@ type Library[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX, DB eventm
9496
}
9597

9698
type LibraryNoDB struct {
99+
// --- Size cap subsystem fields (producer-only, decoupled from topic creation) ---
100+
// Global broker caps
101+
sizeCapBrokerState atomic.Int32
102+
sizeCapBrokerLock sync.Mutex // protects changes to sizeCapBrokerState
103+
sizeCapBrokerLoadCtx *multi.Context
104+
sizeCapBrokerReady chan struct{}
105+
sizeCapDefaultAssumed int64 // anything smaller than this can be sent before knowing actual limits
106+
sizeCapBrokerMessageMax atomic.Int64 // message.max.bytes (0 unknown)
107+
sizeCapSocketRequestMax atomic.Int64 // socket.request.max.bytes (rarely limiting; 0 unknown)
108+
sizeCapWork pwork.Work[string, string] // un-prefixed in APIs
109+
sizeCapTopicLimits gwrap.SyncMap[string, sizeCapTopicLimit] // un-prefixed topic
110+
111+
// Per-topic limits map (separate from creatingTopic). Keys are topic names.
112+
97113
hasDB atomic.Bool
98114
tracer eventmodels.Tracer
99115
brokers []string
@@ -236,9 +252,13 @@ func New[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX, DB eventmodel
236252
doEnhance: true,
237253
instanceID: instanceCount.Add(1),
238254
topicsHaveBeenListed: make(chan struct{}),
255+
sizeCapBrokerReady: make(chan struct{}),
256+
sizeCapDefaultAssumed: 1_000_000,
257+
sizeCapBrokerLoadCtx: multi.New(),
239258
},
240259
}
241260
lib.configureTopicsPrework()
261+
lib.configureSizeCapPrework()
242262
return &lib
243263
}
244264

@@ -311,6 +331,14 @@ func (lib *Library[ID, TX, DB]) SkipNotifierSupport() {
311331
lib.skipNotifier = true
312332
}
313333

334+
// SetSizeCapLowerLimit overrides the default lower limit on sizes: any message under
335+
// this size can be sent before actual limits are known.
336+
func (lib *Library[ID, TX, DB]) SetSizeCapLowerLimit(sizeCapDefaultAssumed int64) {
337+
lib.lock.Lock()
338+
defer lib.lock.Unlock()
339+
lib.sizeCapDefaultAssumed = sizeCapDefaultAssumed
340+
}
341+
314342
// Configure sets up the Library so that it has the configuration it needs to run.
315343
// The database connection is optional. Without it, certain features will always error:
316344
//
@@ -630,6 +658,32 @@ func (lib *Library[ID, TX, DB]) Tracer() eventmodels.Tracer { return lib
630658
// getController returns a client talking to the controlling broker. The
631659
// controller is needed for certain requests, like creating a topic
632660
func (lib *LibraryNoDB) getController(ctx context.Context) (_ *kafka.Client, err error) {
661+
var c *kafka.Client
662+
err = lib.findABroker(ctx, func(conn *kafka.Conn) error {
663+
controller, err := conn.Controller()
664+
if err != nil {
665+
return errors.Errorf("event library get controller from kafka connection: %w", err)
666+
}
667+
ips, err := net.LookupIP(controller.Host)
668+
if err != nil {
669+
return errors.Errorf("event library lookup IP of controller (%s): %w", controller.Host, err)
670+
}
671+
if len(ips) == 0 {
672+
return errors.Errorf("event library lookup IP of controller (%s) got no addresses", controller.Host)
673+
}
674+
c = &kafka.Client{
675+
Addr: &net.TCPAddr{
676+
IP: ips[0],
677+
Port: controller.Port,
678+
},
679+
Transport: lib.transport(),
680+
}
681+
return nil
682+
})
683+
return c, err
684+
}
685+
686+
func (lib *LibraryNoDB) findABroker(ctx context.Context, f func(*kafka.Conn) error) (err error) {
633687
dialer := lib.dialer()
634688
var tried int
635689
for _, i := range rand.Perm(len(lib.brokers)) {
@@ -640,7 +694,7 @@ func (lib *LibraryNoDB) getController(ctx context.Context) (_ *kafka.Client, err
640694
lib.tracer.Logf("[events] could not connect to broker %d (of %d) %s: %v", i+1, len(lib.brokers), broker, err)
641695
if tried == len(lib.brokers) {
642696
// last broker, give up
643-
return nil, errors.Errorf("event library dial kafka broker (%s): %w", broker, err)
697+
return errors.Errorf("event library dial kafka broker (%s): %w", broker, err)
644698
}
645699
continue
646700
}
@@ -650,27 +704,31 @@ func (lib *LibraryNoDB) getController(ctx context.Context) (_ *kafka.Client, err
650704
err = errors.Errorf("event library close dialer (%s): %w", lib.brokers[0], err)
651705
}
652706
}()
653-
controller, err := conn.Controller()
654-
if err != nil {
655-
return nil, errors.Errorf("event library get controller from kafka connection: %w", err)
656-
}
657-
ips, err := net.LookupIP(controller.Host)
658-
if err != nil {
659-
return nil, errors.Errorf("event library lookup IP of controller (%s): %w", controller.Host, err)
660-
}
661-
if len(ips) == 0 {
662-
return nil, errors.Errorf("event library lookup IP of controller (%s) got no addresses", controller.Host)
663-
}
664-
return &kafka.Client{
665-
Addr: &net.TCPAddr{
666-
IP: ips[0],
667-
Port: controller.Port,
668-
},
669-
Transport: lib.transport(),
670-
}, nil
707+
return f(conn)
708+
}
709+
return errors.Errorf("unexpected condition")
710+
}
711+
712+
func (lib *LibraryNoDB) getBrokers(ctx context.Context) ([]kafka.Broker, error) {
713+
var brokers []kafka.Broker
714+
err := lib.findABroker(ctx, func(conn *kafka.Conn) error {
715+
var err error
716+
brokers, err = conn.Brokers()
717+
return err
718+
})
719+
return brokers, err
720+
}
721+
722+
func (lib *LibraryNoDB) getABrokerID(ctx context.Context) (string, error) {
723+
brokers, err := lib.getBrokers(ctx)
724+
if err != nil {
725+
return "", errors.WithStack(err)
726+
}
727+
if len(brokers) == 0 {
728+
return "", errors.Errorf("get brokers request returned no brokers")
671729
}
672-
// should not be able to get here
673-
return nil, errors.Errorf("unexpected condition")
730+
broker := brokers[rand.Intn(len(brokers))]
731+
return strconv.Itoa(broker.ID), nil
674732
}
675733

676734
// getConsumerGroupCoordinator returns a client talking to the control group's

consume.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,12 @@ func (lib *Library[ID, TX, DB]) StartConsuming(ctx context.Context) (started cha
104104
lib.tracer.Logf("[events] Debug: consume startwait +%d for readers", len(lib.readers))
105105
}
106106
allStarted.Add(len(lib.readers))
107-
allDone.Add(len(lib.readers))
107+
allDone.Add(len(lib.readers)) // for each reader
108108
if debugConsumeStartup {
109109
lib.tracer.Logf("[events] Debug: allDone +%d (readers)", len(lib.readers))
110110
}
111111
if len(lib.broadcast.topics) > 0 {
112-
allDone.Add(1)
112+
allDone.Add(1) // for the consumer group consumer
113113
if debugConsumeStartup {
114114
lib.tracer.Logf("[events] Debug: consume startwait +1 for broadcast")
115115
lib.tracer.Logf("[events] Debug: allDone +1 broadcast")
@@ -158,7 +158,7 @@ func (lib *Library[ID, TX, DB]) startConsumingGroup(ctx context.Context, consume
158158
// startedSideEffects should be called only once the consumer is started
159159
startedSideEffects := once.New(func() {
160160
if isBroadcast {
161-
allDone.Add(1)
161+
allDone.Add(1) // for the heartbeat sending
162162
if debugConsumeStartup {
163163
lib.tracer.Logf("[events] Debug: allDone +1 for broadcast heartbeat %s %s", consumerGroup, group.Describe())
164164
}
@@ -314,7 +314,7 @@ func (lib *Library[ID, TX, DB]) consume(ctx context.Context, consumerGroup consu
314314
if debugConsumeStartup {
315315
lib.tracer.Logf("[events] Debug: allDone +1 for process commits for %s %s", cgWithPrefix, group.Describe())
316316
}
317-
allDone.Add(1)
317+
allDone.Add(1) // for processCommits
318318
go lib.processCommits(commitsSoftCtx, ctx, consumerGroup, cgWithPrefix, reader, done, allDone)
319319
for {
320320
select {

consumer_group_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ func TestBroadcastGroupRefresh(t *testing.T) {
108108
if os.Getenv("EVENTS_KAFKA_BROKERS") == "" {
109109
t.Skipf("%s requires kafka brokers", t.Name())
110110
}
111+
t.Log("starting broadcast refresh test")
111112
ntest.RunTest(ntest.BufferedLogger(t),
112113
CommonInjectors,
113114
nject.Provide("nodb", func() *NoDB {

eventnodb/shared_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ var chain = nject.Sequence("nodb-injectors",
4949
)
5050

5151
func TestSharedEventNoDB(t *testing.T) {
52+
t.Log("starting no-database matrix tests")
5253
ntest.RunParallelMatrix(ntest.BufferedLogger(t),
5354
chain,
5455
eventtest.GenerateSharedTestMatrix[eventmodels.BinaryEventID, *eventnodb.NoDBTx, *eventnodb.NoDB](),

eventpg/shared_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ func pgconn(db *sql.DB) *DBType {
8484
type LibraryType = events.Library[eventmodels.StringEventID, *eventdb.ExampleBasicTX, *eventpg.Connection[*eventdb.ExampleBasicTX, eventdb.ExampleBasicDB]]
8585

8686
func TestSharedEventPG(t *testing.T) {
87+
t.Log("starting Postgres matrix tests")
8788
ntest.RunParallelMatrix(ntest.BufferedLogger(t),
8889
chain,
8990
eventtest.GenerateSharedTestMatrix[eventmodels.StringEventID, *eventdb.ExampleBasicTX, *eventpg.Connection[*eventdb.ExampleBasicTX, eventdb.ExampleBasicDB]](),

events2/shared_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ func s2conn(db *sql.DB) *DBType {
105105
type LibraryType = events.Library[eventmodels.BinaryEventID, *eventdb.ExampleBasicTX, *events2.Connection[*eventdb.ExampleBasicTX, eventdb.ExampleBasicDB]]
106106

107107
func TestSharedEventS2(t *testing.T) {
108+
t.Log("starting S2 martrix tests")
108109
ntest.RunParallelMatrix(ntest.BufferedLogger(t),
109110
chain,
110111
eventtest.GenerateSharedTestMatrix[eventmodels.BinaryEventID, *eventdb.ExampleBasicTX, *events2.Connection[*eventdb.ExampleBasicTX, eventdb.ExampleBasicDB]](),
@@ -113,6 +114,7 @@ func TestSharedEventS2(t *testing.T) {
113114

114115
func TestLockingLockOrDie(t *testing.T) {
115116
t.Parallel()
117+
t.Log("starting S2 locking test")
116118
ntest.RunTest(ntest.BufferedLogger(t), chain, func(
117119
t ntest.T,
118120
ctx context.Context,

eventtest/common.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package eventtest
44
import (
55
"context"
66
"os"
7+
"regexp"
78
"strings"
89
"time"
910

@@ -34,6 +35,9 @@ func KafkaBrokers(t T) Brokers {
3435
}
3536

3637
var CommonInjectors = nject.Sequence("common",
38+
nject.Required(func(t T) {
39+
t.Logf("starting %s", t.Name())
40+
}),
3741
nject.Provide("context", context.Background),
3842
nject.Required(nject.Provide("Report-results", func(inner func(), t T) {
3943
defer func() {
@@ -72,18 +76,26 @@ type AugmentAbstractDB[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX]
7276
eventmodels.CanAugment[ID, TX]
7377
}
7478

79+
var squashRE = regexp.MustCompile(`[^A-Z]+`)
80+
7581
func Name(t ntest.T) string {
76-
x := strings.Split(t.Name(), "/")
77-
return x[len(x)-1]
82+
n := t.Name()
83+
x := strings.LastIndexByte(n, '/')
84+
if x == -1 {
85+
return n
86+
}
87+
after := n[x+1:]
88+
before := n[:x]
89+
return squashRE.ReplaceAllString(before, "") + after
7890
}
7991

8092
type MyEvent struct {
8193
S string
8294
}
8395

8496
var (
85-
DeliveryTimeout = LongerOnCI(20*time.Second, 10*time.Minute, 2*time.Minute)
86-
StartupTimeout = LongerOnCI(65*time.Second, 5*time.Minute, 65*time.Second)
97+
DeliveryTimeout = LongerOnCI(60*time.Second, 10*time.Minute, 4*time.Minute)
98+
StartupTimeout = LongerOnCI(85*time.Second, 7*time.Minute, 125*time.Second)
8799
)
88100

89101
func IsNilDB[DB any](db DB) bool {
@@ -140,6 +152,7 @@ func GenerateSharedTestMatrix[
140152
"OrderedBlock2CG": nject.Provide("OB2", OrderedBlockTestTwoCG[ID, TX, DB]),
141153
"OrderedRetryLater1CG": nject.Provide("ORL1", OrderedRetryTestOncCG[ID, TX, DB]),
142154
"OrderedRetryLater2CG": nject.Provide("ORL2", OrderedRetryTestTwoCG[ID, TX, DB]),
155+
"OversizeSendTest": nject.Provide("OST", OversizeSendTest[ID, TX, DB]),
143156
"UnfilteredNotifier": nject.Provide("UN", EventUnfilteredNotifierTest[ID, TX, DB]),
144157
}
145158
}

eventtest/dead_letter.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ func DeadLetterTest[
103103
}
104104
var deliveryInfo deliveryInfoBlock
105105

106+
t.Logf("consumer group name: %s", Name(t)+"CG")
106107
consumerGroup := events.NewConsumerGroup(Name(t) + "CG")
107108

108109
var lock sync.Mutex

eventtest/happy_path.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -256,12 +256,12 @@ func IdempotentDeliveryTest[
256256
}
257257

258258
t.Log("Register same consumer group across libraries")
259-
lib1.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentA"), eventmodels.OnFailureBlock, "Ia1", topic.Handler(mkIdempotentHandler("Ia1")))
260-
lib2.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentA"), eventmodels.OnFailureBlock, "Ia2", topic.Handler(mkIdempotentHandler("Ia2")))
259+
lib1.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnA"), eventmodels.OnFailureBlock, "Ia1", topic.Handler(mkIdempotentHandler("Ia1")))
260+
lib2.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnA"), eventmodels.OnFailureBlock, "Ia2", topic.Handler(mkIdempotentHandler("Ia2")))
261261

262262
t.Log("Register in different consumer groups")
263-
lib1.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentB"), eventmodels.OnFailureBlock, "Ib1", topic.Handler(mkIdempotentHandler("Ib1")))
264-
lib2.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentC"), eventmodels.OnFailureBlock, "Ic1", topic.Handler(mkIdempotentHandler("Ic1")))
263+
lib1.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnB"), eventmodels.OnFailureBlock, "Ib1", topic.Handler(mkIdempotentHandler("Ib1")))
264+
lib2.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnC"), eventmodels.OnFailureBlock, "Ic1", topic.Handler(mkIdempotentHandler("Ic1")))
265265

266266
t.Log("Start consumers and producers")
267267

@@ -422,8 +422,8 @@ func ExactlyOnceDeliveryTest[
422422
}
423423

424424
// Same consumer group across libraries
425-
lib1.ConsumeExactlyOnce(events.NewConsumerGroup(Name(t)+"-exactlyonce"), eventmodels.OnFailureBlock, "EO1", topic.HandlerTx(mkExactlyOnceHandler("EO1")))
426-
lib2.ConsumeExactlyOnce(events.NewConsumerGroup(Name(t)+"-exactlyonce"), eventmodels.OnFailureBlock, "EO2", topic.HandlerTx(mkExactlyOnceHandler("EO2")))
425+
lib1.ConsumeExactlyOnce(events.NewConsumerGroup(Name(t)+"-extly1"), eventmodels.OnFailureBlock, "EO1", topic.HandlerTx(mkExactlyOnceHandler("EO1")))
426+
lib2.ConsumeExactlyOnce(events.NewConsumerGroup(Name(t)+"-extly1"), eventmodels.OnFailureBlock, "EO2", topic.HandlerTx(mkExactlyOnceHandler("EO2")))
427427

428428
// Start consumers and producers
429429
produceDone, err := lib1.CatchUpProduce(ctx, time.Second*5, 64)
@@ -537,7 +537,7 @@ func CloudEventEncodingTest[
537537
topic := eventmodels.BindTopic[myEvent](Name(t) + "Topic")
538538
lib.SetTopicConfig(kafka.TopicConfig{Topic: topic.Topic()})
539539

540-
lib.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentA"), eventmodels.OnFailureBlock, Name(t), topic.Handler(
540+
lib.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnA"), eventmodels.OnFailureBlock, Name(t), topic.Handler(
541541
func(ctx context.Context, e eventmodels.Event[myEvent]) error {
542542
lock.Lock()
543543
defer lock.Unlock()
@@ -702,7 +702,7 @@ func CloudEventEncodingTest[
702702
assert.Equalf(t, id, meta.Subject, "subject msg%d %s", i+1, id)
703703
assert.Equalf(t, Name(t)+"Topic", meta.Type, "type msg%d %s", i+1, id)
704704
assert.Equalf(t, "1.0", meta.SpecVersion, "specVersion msg%d %s", i+1, id)
705-
assert.Equalf(t, Name(t)+"-idempotentA", meta.ConsumerGroup, "consumerGroup msg%d %s", i+1, id)
705+
assert.Equalf(t, Name(t)+"-idptnA", meta.ConsumerGroup, "consumerGroup msg%d %s", i+1, id)
706706
assert.Equalf(t, Name(t), meta.HandlerName, "handlerName msg%d %s", i+1, id)
707707
}
708708
}

0 commit comments

Comments
 (0)