88 "math/rand"
99 "net"
1010 "regexp"
11+ "strconv"
1112 "strings"
1213 "sync"
1314 "sync/atomic"
@@ -23,6 +24,7 @@ import (
2324
2425 "github.com/singlestore-labs/events/eventmodels"
2526 "github.com/singlestore-labs/events/internal"
27+ "github.com/singlestore-labs/events/internal/multi"
2628 "github.com/singlestore-labs/events/internal/pwork"
2729)
2830
@@ -94,6 +96,21 @@ type Library[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX, DB eventm
9496}
9597
9698type LibraryNoDB struct {
99+ // --- Size cap subsystem fields (producer-only, decoupled from topic creation) ---
100+ // Global broker caps
101+ sizeCapBrokerState atomic.Int32
102+ sizeCapBrokerLock sync.Mutex // protects changes to sizeCapBrokerState
103+ sizeCapBrokerLoadCtx * multi.Context
104+ sizeCapBrokerReady chan struct {}
105+ sizeCapDefaultAssumed int64 // anything smaller than this can be sent before knowing actual limits
106+ sizeCapBrokerMessageMax atomic.Int64 // message.max.bytes (0 unknown)
107+ sizeCapSocketRequestMax atomic.Int64 // socket.request.max.bytes (rarely limiting; 0 unknown)
108+ sizeCapBrokerErr atomic.Value // error or nil
109+ sizeCapWork pwork.Work [string , string ] // un-prefixed in APIs
110+ sizeCapTopicLimits gwrap.SyncMap [string , sizeCapTopicLimit ] // un-prefixed topic
111+
112+ // Per-topic limits map (separate from creatingTopic). Keys are topic names.
113+
97114 hasDB atomic.Bool
98115 tracer eventmodels.Tracer
99116 brokers []string
@@ -236,9 +253,13 @@ func New[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX, DB eventmodel
236253 doEnhance : true ,
237254 instanceID : instanceCount .Add (1 ),
238255 topicsHaveBeenListed : make (chan struct {}),
256+ sizeCapBrokerReady : make (chan struct {}),
257+ sizeCapDefaultAssumed : 1_000_000 ,
258+ sizeCapBrokerLoadCtx : multi .New (),
239259 },
240260 }
241261 lib .configureTopicsPrework ()
262+ lib .configureSizeCapPrework ()
242263 return & lib
243264}
244265
@@ -311,6 +332,14 @@ func (lib *Library[ID, TX, DB]) SkipNotifierSupport() {
311332 lib .skipNotifier = true
312333}
313334
335+ // SetSizeCapLowerLimit overrides the default lower limit on sizes: any message under
336+ // this size can be sent before actual limits are known.
337+ func (lib * Library [ID , TX , DB ]) SetSizeCapLowerLimit (sizeCapDefaultAssumed int64 ) {
338+ lib .lock .Lock ()
339+ defer lib .lock .Unlock ()
340+ lib .sizeCapDefaultAssumed = sizeCapDefaultAssumed
341+ }
342+
314343// Configure sets up the Library so that it has the configuration it needs to run.
315344// The database connection is optional. Without it, certain features will always error:
316345//
@@ -630,6 +659,32 @@ func (lib *Library[ID, TX, DB]) Tracer() eventmodels.Tracer { return lib
630659// getController returns a client talking to the controlling broker. The
631660// controller is needed for certain requests, like creating a topic
632661func (lib * LibraryNoDB ) getController (ctx context.Context ) (_ * kafka.Client , err error ) {
662+ var c * kafka.Client
663+ err = lib .findABroker (ctx , func (conn * kafka.Conn ) error {
664+ controller , err := conn .Controller ()
665+ if err != nil {
666+ return errors .Errorf ("event library get controller from kafka connection: %w" , err )
667+ }
668+ ips , err := net .LookupIP (controller .Host )
669+ if err != nil {
670+ return errors .Errorf ("event library lookup IP of controller (%s): %w" , controller .Host , err )
671+ }
672+ if len (ips ) == 0 {
673+ return errors .Errorf ("event library lookup IP of controller (%s) got no addresses" , controller .Host )
674+ }
675+ c = & kafka.Client {
676+ Addr : & net.TCPAddr {
677+ IP : ips [0 ],
678+ Port : controller .Port ,
679+ },
680+ Transport : lib .transport (),
681+ }
682+ return nil
683+ })
684+ return c , err
685+ }
686+
687+ func (lib * LibraryNoDB ) findABroker (ctx context.Context , f func (* kafka.Conn ) error ) (err error ) {
633688 dialer := lib .dialer ()
634689 var tried int
635690 for _ , i := range rand .Perm (len (lib .brokers )) {
@@ -640,7 +695,7 @@ func (lib *LibraryNoDB) getController(ctx context.Context) (_ *kafka.Client, err
640695 lib .tracer .Logf ("[events] could not connect to broker %d (of %d) %s: %v" , i + 1 , len (lib .brokers ), broker , err )
641696 if tried == len (lib .brokers ) {
642697 // last broker, give up
643- return nil , errors .Errorf ("event library dial kafka broker (%s): %w" , broker , err )
698+ return errors .Errorf ("event library dial kafka broker (%s): %w" , broker , err )
644699 }
645700 continue
646701 }
@@ -650,27 +705,31 @@ func (lib *LibraryNoDB) getController(ctx context.Context) (_ *kafka.Client, err
650705 err = errors .Errorf ("event library close dialer (%s): %w" , lib .brokers [0 ], err )
651706 }
652707 }()
653- controller , err := conn .Controller ()
654- if err != nil {
655- return nil , errors .Errorf ("event library get controller from kafka connection: %w" , err )
656- }
657- ips , err := net .LookupIP (controller .Host )
658- if err != nil {
659- return nil , errors .Errorf ("event library lookup IP of controller (%s): %w" , controller .Host , err )
660- }
661- if len (ips ) == 0 {
662- return nil , errors .Errorf ("event library lookup IP of controller (%s) got no addresses" , controller .Host )
663- }
664- return & kafka.Client {
665- Addr : & net.TCPAddr {
666- IP : ips [0 ],
667- Port : controller .Port ,
668- },
669- Transport : lib .transport (),
670- }, nil
708+ return f (conn )
709+ }
710+ return errors .Errorf ("unexpected condition" )
711+ }
712+
713+ func (lib * LibraryNoDB ) getBrokers (ctx context.Context ) ([]kafka.Broker , error ) {
714+ var brokers []kafka.Broker
715+ err := lib .findABroker (ctx , func (conn * kafka.Conn ) error {
716+ var err error
717+ brokers , err = conn .Brokers ()
718+ return err
719+ })
720+ return brokers , err
721+ }
722+
723+ func (lib * LibraryNoDB ) getABrokerID (ctx context.Context ) (string , error ) {
724+ brokers , err := lib .getBrokers (ctx )
725+ if err != nil {
726+ return "" , errors .WithStack (err )
727+ }
728+ if len (brokers ) == 0 {
729+ return "" , errors .Errorf ("get brokers request returned no brokers" )
671730 }
672- // should not be able to get here
673- return nil , errors . Errorf ( "unexpected condition" )
731+ broker := brokers [ rand . Intn ( len ( brokers ))]
732+ return strconv . Itoa ( broker . ID ), nil
674733}
675734
676735// getConsumerGroupCoordinator returns a client talking to the control group's
0 commit comments