88 "math/rand"
99 "net"
1010 "regexp"
11+ "strconv"
1112 "strings"
1213 "sync"
1314 "sync/atomic"
@@ -93,6 +94,22 @@ type Library[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX, DB eventm
9394}
9495
9596type LibraryNoDB struct {
97+ // --- Size cap subsystem fields (producer-only, decoupled from topic creation) ---
98+ // Global broker caps
99+ sizeCapBrokerState atomic.Int32 // 0=unstarted 1=loading 2=ready 3=failed
100+ sizeCapBrokerReady chan struct {}
101+ sizeCapBrokerMessageMax atomic.Int64 // message.max.bytes (0 unknown)
102+ sizeCapSocketRequestMax atomic.Int64 // socket.request.max.bytes (rarely limiting; 0 unknown)
103+ sizeCapBrokerErr atomic.Value // error or nil
104+ sizeCapDefaultAssumed int64 // anything smaller than this can be sent before knowing actual limits
105+
106+ // Precreated topics scan (enumeration of already seen topics; optional background)
107+ sizeCapPrecreatedState atomic.Int32 // 0=unstarted 1=running 2=done 3=failed
108+ sizeCapPrecreatedReady chan struct {}
109+
110+ // Per-topic limits map (separate from creatingTopic). Keys are topic names.
111+ sizeCapTopicLimits gwrap.SyncMap [string , * sizeCapTopicLimit ]
112+
96113 hasDB atomic.Bool
97114 tracer eventmodels.Tracer
98115 brokers []string
@@ -233,6 +250,8 @@ func New[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX, DB eventmodel
233250 doEnhance : true ,
234251 instanceID : instanceCount .Add (1 ),
235252 topicsHaveBeenListed : make (chan struct {}),
253+ sizeCapBrokerReady : make (chan struct {}),
254+ sizeCapDefaultAssumed : 1_000_000 ,
236255 },
237256 }
238257}
@@ -286,6 +305,14 @@ func (lib *Library[ID, TX, DB]) SetLazyTxProduce(lazy bool) {
286305 lib .lazyProduce = lazy
287306}
288307
308+ // SetSizeCapLowerLimit overrides the default lower limit on sizes: any message under
309+ // this size can be sent before actual limits are known.
310+ func (lib * Library [ID , TX , DB ]) SetSizeCapLowerLimit (sizeCapDefaultAssumed int64 ) {
311+ lib .lock .Lock ()
312+ defer lib .lock .Unlock ()
313+ lib .sizeCapDefaultAssumed = sizeCapDefaultAssumed
314+ }
315+
289316// Configure sets up the Library so that it has the configuration it needs to run.
290317// The database connection is optional. Without it, certain features will always error:
291318//
@@ -603,6 +630,32 @@ func (lib *Library[ID, TX, DB]) Tracer() eventmodels.Tracer { return lib
603630// getController returns a client talking to the controlling broker. The
604631// controller is needed for certain requests, like creating a topic
605632func (lib * LibraryNoDB ) getController (ctx context.Context ) (_ * kafka.Client , err error ) {
633+ var c * kafka.Client
634+ err = lib .findABroker (ctx , func (conn * kafka.Conn ) error {
635+ controller , err := conn .Controller ()
636+ if err != nil {
637+ return errors .Errorf ("event library get controller from kafka connection: %w" , err )
638+ }
639+ ips , err := net .LookupIP (controller .Host )
640+ if err != nil {
641+ return errors .Errorf ("event library lookup IP of controller (%s): %w" , controller .Host , err )
642+ }
643+ if len (ips ) == 0 {
644+ return errors .Errorf ("event library lookup IP of controller (%s) got no addresses" , controller .Host )
645+ }
646+ c = & kafka.Client {
647+ Addr : & net.TCPAddr {
648+ IP : ips [0 ],
649+ Port : controller .Port ,
650+ },
651+ Transport : lib .transport (),
652+ }
653+ return nil
654+ })
655+ return c , err
656+ }
657+
658+ func (lib * LibraryNoDB ) findABroker (ctx context.Context , f func (* kafka.Conn ) error ) (err error ) {
606659 dialer := lib .dialer ()
607660 var tried int
608661 for _ , i := range rand .Perm (len (lib .brokers )) {
@@ -613,7 +666,7 @@ func (lib *LibraryNoDB) getController(ctx context.Context) (_ *kafka.Client, err
613666 lib .tracer .Logf ("[events] could not connect to broker %d (of %d) %s: %v" , i + 1 , len (lib .brokers ), broker , err )
614667 if tried == len (lib .brokers ) {
615668 // last broker, give up
616- return nil , errors .Errorf ("event library dial kafka broker (%s): %w" , broker , err )
669+ return errors .Errorf ("event library dial kafka broker (%s): %w" , broker , err )
617670 }
618671 continue
619672 }
@@ -623,27 +676,31 @@ func (lib *LibraryNoDB) getController(ctx context.Context) (_ *kafka.Client, err
623676 err = errors .Errorf ("event library close dialer (%s): %w" , lib .brokers [0 ], err )
624677 }
625678 }()
626- controller , err := conn .Controller ()
627- if err != nil {
628- return nil , errors .Errorf ("event library get controller from kafka connection: %w" , err )
629- }
630- ips , err := net .LookupIP (controller .Host )
631- if err != nil {
632- return nil , errors .Errorf ("event library lookup IP of controller (%s): %w" , controller .Host , err )
633- }
634- if len (ips ) == 0 {
635- return nil , errors .Errorf ("event library lookup IP of controller (%s) got no addresses" , controller .Host )
636- }
637- return & kafka.Client {
638- Addr : & net.TCPAddr {
639- IP : ips [0 ],
640- Port : controller .Port ,
641- },
642- Transport : lib .transport (),
643- }, nil
679+ return f (conn )
680+ }
681+ return errors .Errorf ("unexpected condition" )
682+ }
683+
684+ func (lib * LibraryNoDB ) getBrokers (ctx context.Context ) ([]kafka.Broker , error ) {
685+ var brokers []kafka.Broker
686+ err := lib .findABroker (ctx , func (conn * kafka.Conn ) error {
687+ var err error
688+ brokers , err = conn .Brokers ()
689+ return err
690+ })
691+ return brokers , err
692+ }
693+
694+ func (lib * LibraryNoDB ) getABrokerID (ctx context.Context ) (string , error ) {
695+ brokers , err := lib .getBrokers (ctx )
696+ if err != nil {
697+ return "" , errors .WithStack (err )
698+ }
699+ if len (brokers ) == 0 {
700+ return "" , errors .Errorf ("get brokers request returned no brokers" )
644701 }
645- // should not be able to get here
646- return nil , errors . Errorf ( "unexpected condition" )
702+ broker := brokers [ rand . Intn ( len ( brokers ))]
703+ return strconv . Itoa ( broker . ID ), nil
647704}
648705
649706// getConsumerGroupCoordinator returns a client talking to the control group's
0 commit comments