Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 83 additions & 25 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math/rand"
"net"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -23,6 +24,7 @@ import (

"github.com/singlestore-labs/events/eventmodels"
"github.com/singlestore-labs/events/internal"
"github.com/singlestore-labs/events/internal/multi"
"github.com/singlestore-labs/events/internal/pwork"
)

Expand Down Expand Up @@ -94,6 +96,20 @@ type Library[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX, DB eventm
}

type LibraryNoDB struct {
// --- Size cap subsystem fields (producer-only, decoupled from topic creation) ---
// Global broker caps
sizeCapBrokerLock sync.Mutex // protects changes to sizeCapBrokerState
sizeCapBrokerState atomic.Int32
sizeCapBrokerLoadCtx *multi.Context
sizeCapBrokerReady chan struct{}
sizeCapDefaultAssumed int64 // anything smaller than this can be sent before knowing actual limits
sizeCapBrokerMessageMax atomic.Int64 // message.max.bytes (0 unknown)
sizeCapSocketRequestMax atomic.Int64 // socket.request.max.bytes (rarely limiting; 0 unknown)
sizeCapWork pwork.Work[string, string] // un-prefixed in APIs
sizeCapTopicLimits gwrap.SyncMap[string, sizeCapTopicLimit] // un-prefixed topic

// Per-topic limits map (separate from creatingTopic). Keys are topic names.

hasDB atomic.Bool
tracer eventmodels.Tracer
brokers []string
Expand Down Expand Up @@ -236,9 +252,13 @@ func New[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX, DB eventmodel
doEnhance: true,
instanceID: instanceCount.Add(1),
topicsHaveBeenListed: make(chan struct{}),
sizeCapBrokerReady: make(chan struct{}),
sizeCapDefaultAssumed: 1_000_000,
sizeCapBrokerLoadCtx: multi.New(),
},
}
lib.configureTopicsPrework()
lib.configureSizeCapPrework()
return &lib
}

Expand Down Expand Up @@ -327,6 +347,14 @@ func (lib *Library[ID, TX, DB]) SkipNotifierSupport() {
lib.skipNotifier = true
}

// SetSizeCapLowerLimit overrides the default lower limit on sizes: any message under
// this size can be sent before actual limits are known.
func (lib *Library[ID, TX, DB]) SetSizeCapLowerLimit(sizeCapDefaultAssumed int64) {
lib.lock.Lock()
defer lib.lock.Unlock()
lib.sizeCapDefaultAssumed = sizeCapDefaultAssumed
}

// Configure sets up the Library so that it has the configuration it needs to run.
// The database connection is optional. Without it, certain features will always error:
//
Expand Down Expand Up @@ -379,14 +407,14 @@ func (lib *Library[ID, TX, DB]) start(str string, args ...any) error {
case isRunning:
return nil
}
if len(lib.brokers) == 0 || lib.brokers[0] == "" {
return errors.Errorf("no brokers configured")
}
lib.writer = kafka.NewWriter(kafka.WriterConfig{
Brokers: lib.brokers,
Dialer: lib.dialer(),
})
lib.ready.Store(isRunning)
if len(lib.brokers) == 0 || lib.brokers[0] == "" {
return errors.Errorf("no brokers configured")
}
return nil
}

Expand Down Expand Up @@ -650,6 +678,32 @@ func (lib *Library[ID, TX, DB]) Tracer() eventmodels.Tracer { return lib
// getController returns a client talking to the controlling broker. The
// controller is needed for certain requests, like creating a topic
func (lib *LibraryNoDB) getController(ctx context.Context) (_ *kafka.Client, err error) {
var c *kafka.Client
err = lib.findABroker(ctx, func(conn *kafka.Conn) error {
controller, err := conn.Controller()
if err != nil {
return errors.Errorf("event library get controller from kafka connection: %w", err)
}
ips, err := net.LookupIP(controller.Host)
if err != nil {
return errors.Errorf("event library lookup IP of controller (%s): %w", controller.Host, err)
}
if len(ips) == 0 {
return errors.Errorf("event library lookup IP of controller (%s) got no addresses", controller.Host)
}
c = &kafka.Client{
Addr: &net.TCPAddr{
IP: ips[0],
Port: controller.Port,
},
Transport: lib.transport(),
}
return nil
})
return c, err
}

func (lib *LibraryNoDB) findABroker(ctx context.Context, f func(*kafka.Conn) error) (err error) {
dialer := lib.dialer()
var tried int
for _, i := range rand.Perm(len(lib.brokers)) {
Expand All @@ -660,37 +714,41 @@ func (lib *LibraryNoDB) getController(ctx context.Context) (_ *kafka.Client, err
lib.tracer.Logf("[events] could not connect to broker %d (of %d) %s: %v", i+1, len(lib.brokers), broker, err)
if tried == len(lib.brokers) {
// last broker, give up
return nil, errors.Errorf("event library dial kafka broker (%s): %w", broker, err)
return errors.Errorf("event library dial kafka broker (%s): %w", broker, err)
}
continue
}
defer func() {
e := conn.Close()
if err == nil && e != nil {
err = errors.Errorf("event library close dialer (%s): %w", lib.brokers[0], err)
err = errors.Errorf("event library close dialer (%s): %w", broker, e)
}
}()
controller, err := conn.Controller()
if err != nil {
return nil, errors.Errorf("event library get controller from kafka connection: %w", err)
}
ips, err := net.LookupIP(controller.Host)
if err != nil {
return nil, errors.Errorf("event library lookup IP of controller (%s): %w", controller.Host, err)
}
if len(ips) == 0 {
return nil, errors.Errorf("event library lookup IP of controller (%s) got no addresses", controller.Host)
}
return &kafka.Client{
Addr: &net.TCPAddr{
IP: ips[0],
Port: controller.Port,
},
Transport: lib.transport(),
}, nil
return f(conn)
}
return errors.Errorf("unexpected condition")
}

func (lib *LibraryNoDB) getBrokers(ctx context.Context) ([]kafka.Broker, error) {
var brokers []kafka.Broker
err := lib.findABroker(ctx, func(conn *kafka.Conn) error {
var err error
brokers, err = conn.Brokers()
return err
})
return brokers, err
}

func (lib *LibraryNoDB) getABrokerID(ctx context.Context) (string, error) {
brokers, err := lib.getBrokers(ctx)
if err != nil {
return "", errors.WithStack(err)
}
if len(brokers) == 0 {
return "", errors.Errorf("get brokers request returned no brokers")
}
// should not be able to get here
return nil, errors.Errorf("unexpected condition")
broker := brokers[rand.Intn(len(brokers))]
return strconv.Itoa(broker.ID), nil
}

// getConsumerGroupCoordinator returns a client talking to the control group's
Expand Down
8 changes: 4 additions & 4 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ func (lib *Library[ID, TX, DB]) StartConsuming(ctx context.Context) (started cha
lib.tracer.Logf("[events] Debug: consume startwait +%d for readers", len(lib.readers))
}
allStarted.Add(len(lib.readers))
allDone.Add(len(lib.readers))
allDone.Add(len(lib.readers)) // for each reader
if debugConsumeStartup {
lib.tracer.Logf("[events] Debug: allDone +%d (readers)", len(lib.readers))
}
if len(lib.broadcast.topics) > 0 {
allDone.Add(1)
allDone.Add(1) // for the consumer group consumer
if debugConsumeStartup {
lib.tracer.Logf("[events] Debug: consume startwait +1 for broadcast")
lib.tracer.Logf("[events] Debug: allDone +1 broadcast")
Expand Down Expand Up @@ -158,7 +158,7 @@ func (lib *Library[ID, TX, DB]) startConsumingGroup(ctx context.Context, consume
// startedSideEffects should be called only once the consumer is started
startedSideEffects := once.New(func() {
if isBroadcast {
allDone.Add(1)
allDone.Add(1) // for the heartbeat sending
if debugConsumeStartup {
lib.tracer.Logf("[events] Debug: allDone +1 for broadcast heartbeat %s %s", consumerGroup, group.Describe())
}
Expand Down Expand Up @@ -314,7 +314,7 @@ func (lib *Library[ID, TX, DB]) consume(ctx context.Context, consumerGroup consu
if debugConsumeStartup {
lib.tracer.Logf("[events] Debug: allDone +1 for process commits for %s %s", cgWithPrefix, group.Describe())
}
allDone.Add(1)
allDone.Add(1) // for processCommits
go lib.processCommits(commitsSoftCtx, ctx, consumerGroup, cgWithPrefix, reader, done, allDone)
for {
select {
Expand Down
1 change: 1 addition & 0 deletions consumer_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func TestBroadcastGroupRefresh(t *testing.T) {
if os.Getenv("EVENTS_KAFKA_BROKERS") == "" {
t.Skipf("%s requires kafka brokers", t.Name())
}
t.Log("starting broadcast refresh test")
ntest.RunTest(ntest.BufferedLogger(t),
CommonInjectors,
nject.Provide("nodb", func() *NoDB {
Expand Down
1 change: 1 addition & 0 deletions eventnodb/shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var chain = nject.Sequence("nodb-injectors",
)

func TestSharedEventNoDB(t *testing.T) {
t.Log("starting no-database matrix tests")
ntest.RunParallelMatrix(ntest.BufferedLogger(t),
chain,
eventtest.GenerateSharedTestMatrix[eventmodels.BinaryEventID, *eventnodb.NoDBTx, *eventnodb.NoDB](),
Expand Down
1 change: 1 addition & 0 deletions eventpg/shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func pgconn(db *sql.DB) *DBType {
type LibraryType = events.Library[eventmodels.StringEventID, *eventdb.ExampleBasicTX, *eventpg.Connection[*eventdb.ExampleBasicTX, eventdb.ExampleBasicDB]]

func TestSharedEventPG(t *testing.T) {
t.Log("starting Postgres matrix tests")
ntest.RunParallelMatrix(ntest.BufferedLogger(t),
chain,
eventtest.GenerateSharedTestMatrix[eventmodels.StringEventID, *eventdb.ExampleBasicTX, *eventpg.Connection[*eventdb.ExampleBasicTX, eventdb.ExampleBasicDB]](),
Expand Down
2 changes: 2 additions & 0 deletions events2/shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func s2conn(db *sql.DB) *DBType {
type LibraryType = events.Library[eventmodels.BinaryEventID, *eventdb.ExampleBasicTX, *events2.Connection[*eventdb.ExampleBasicTX, eventdb.ExampleBasicDB]]

func TestSharedEventS2(t *testing.T) {
t.Log("starting S2 martrix tests")
ntest.RunParallelMatrix(ntest.BufferedLogger(t),
chain,
eventtest.GenerateSharedTestMatrix[eventmodels.BinaryEventID, *eventdb.ExampleBasicTX, *events2.Connection[*eventdb.ExampleBasicTX, eventdb.ExampleBasicDB]](),
Expand All @@ -113,6 +114,7 @@ func TestSharedEventS2(t *testing.T) {

func TestLockingLockOrDie(t *testing.T) {
t.Parallel()
t.Log("starting S2 locking test")
ntest.RunTest(ntest.BufferedLogger(t), chain, func(
t ntest.T,
ctx context.Context,
Expand Down
21 changes: 17 additions & 4 deletions eventtest/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package eventtest
import (
"context"
"os"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -34,6 +35,9 @@ func KafkaBrokers(t T) Brokers {
}

var CommonInjectors = nject.Sequence("common",
nject.Required(func(t T) {
t.Logf("starting %s", t.Name())
}),
nject.Provide("context", context.Background),
nject.Required(nject.Provide("Report-results", func(inner func(), t T) {
defer func() {
Expand Down Expand Up @@ -72,18 +76,26 @@ type AugmentAbstractDB[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX]
eventmodels.CanAugment[ID, TX]
}

var squashRE = regexp.MustCompile(`[^A-Z]+`)

func Name(t ntest.T) string {
x := strings.Split(t.Name(), "/")
return x[len(x)-1]
n := t.Name()
x := strings.LastIndexByte(n, '/')
if x == -1 {
return n
}
after := n[x+1:]
before := n[:x]
return squashRE.ReplaceAllString(before, "") + after
}

type MyEvent struct {
S string
}

var (
DeliveryTimeout = LongerOnCI(20*time.Second, 10*time.Minute, 2*time.Minute)
StartupTimeout = LongerOnCI(65*time.Second, 5*time.Minute, 65*time.Second)
DeliveryTimeout = LongerOnCI(60*time.Second, 10*time.Minute, 4*time.Minute)
StartupTimeout = LongerOnCI(85*time.Second, 7*time.Minute, 125*time.Second)
)

func IsNilDB[DB any](db DB) bool {
Expand Down Expand Up @@ -140,6 +152,7 @@ func GenerateSharedTestMatrix[
"OrderedBlock2CG": nject.Provide("OB2", OrderedBlockTestTwoCG[ID, TX, DB]),
"OrderedRetryLater1CG": nject.Provide("ORL1", OrderedRetryTestOncCG[ID, TX, DB]),
"OrderedRetryLater2CG": nject.Provide("ORL2", OrderedRetryTestTwoCG[ID, TX, DB]),
"OversizeSendTest": nject.Provide("OST", OversizeSendTest[ID, TX, DB]),
"UnfilteredNotifier": nject.Provide("UN", EventUnfilteredNotifierTest[ID, TX, DB]),
}
}
1 change: 1 addition & 0 deletions eventtest/dead_letter.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func DeadLetterTest[
}
var deliveryInfo deliveryInfoBlock

t.Logf("consumer group name: %s", Name(t)+"CG")
consumerGroup := events.NewConsumerGroup(Name(t) + "CG")

var lock sync.Mutex
Expand Down
16 changes: 8 additions & 8 deletions eventtest/happy_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,12 @@ func IdempotentDeliveryTest[
}

t.Log("Register same consumer group across libraries")
lib1.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentA"), eventmodels.OnFailureBlock, "Ia1", topic.Handler(mkIdempotentHandler("Ia1")))
lib2.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentA"), eventmodels.OnFailureBlock, "Ia2", topic.Handler(mkIdempotentHandler("Ia2")))
lib1.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnA"), eventmodels.OnFailureBlock, "Ia1", topic.Handler(mkIdempotentHandler("Ia1")))
lib2.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnA"), eventmodels.OnFailureBlock, "Ia2", topic.Handler(mkIdempotentHandler("Ia2")))

t.Log("Register in different consumer groups")
lib1.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentB"), eventmodels.OnFailureBlock, "Ib1", topic.Handler(mkIdempotentHandler("Ib1")))
lib2.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentC"), eventmodels.OnFailureBlock, "Ic1", topic.Handler(mkIdempotentHandler("Ic1")))
lib1.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnB"), eventmodels.OnFailureBlock, "Ib1", topic.Handler(mkIdempotentHandler("Ib1")))
lib2.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnC"), eventmodels.OnFailureBlock, "Ic1", topic.Handler(mkIdempotentHandler("Ic1")))

t.Log("Start consumers and producers")

Expand Down Expand Up @@ -422,8 +422,8 @@ func ExactlyOnceDeliveryTest[
}

// Same consumer group across libraries
lib1.ConsumeExactlyOnce(events.NewConsumerGroup(Name(t)+"-exactlyonce"), eventmodels.OnFailureBlock, "EO1", topic.HandlerTx(mkExactlyOnceHandler("EO1")))
lib2.ConsumeExactlyOnce(events.NewConsumerGroup(Name(t)+"-exactlyonce"), eventmodels.OnFailureBlock, "EO2", topic.HandlerTx(mkExactlyOnceHandler("EO2")))
lib1.ConsumeExactlyOnce(events.NewConsumerGroup(Name(t)+"-extly1"), eventmodels.OnFailureBlock, "EO1", topic.HandlerTx(mkExactlyOnceHandler("EO1")))
lib2.ConsumeExactlyOnce(events.NewConsumerGroup(Name(t)+"-extly1"), eventmodels.OnFailureBlock, "EO2", topic.HandlerTx(mkExactlyOnceHandler("EO2")))

// Start consumers and producers
produceDone, err := lib1.CatchUpProduce(ctx, time.Second*5, 64)
Expand Down Expand Up @@ -537,7 +537,7 @@ func CloudEventEncodingTest[
topic := eventmodels.BindTopic[myEvent](Name(t) + "Topic")
lib.SetTopicConfig(kafka.TopicConfig{Topic: topic.Topic()})

lib.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idempotentA"), eventmodels.OnFailureBlock, Name(t), topic.Handler(
lib.ConsumeIdempotent(events.NewConsumerGroup(Name(t)+"-idptnA"), eventmodels.OnFailureBlock, Name(t), topic.Handler(
func(ctx context.Context, e eventmodels.Event[myEvent]) error {
lock.Lock()
defer lock.Unlock()
Expand Down Expand Up @@ -702,7 +702,7 @@ func CloudEventEncodingTest[
assert.Equalf(t, id, meta.Subject, "subject msg%d %s", i+1, id)
assert.Equalf(t, Name(t)+"Topic", meta.Type, "type msg%d %s", i+1, id)
assert.Equalf(t, "1.0", meta.SpecVersion, "specVersion msg%d %s", i+1, id)
assert.Equalf(t, Name(t)+"-idempotentA", meta.ConsumerGroup, "consumerGroup msg%d %s", i+1, id)
assert.Equalf(t, Name(t)+"-idptnA", meta.ConsumerGroup, "consumerGroup msg%d %s", i+1, id)
assert.Equalf(t, Name(t), meta.HandlerName, "handlerName msg%d %s", i+1, id)
}
}
Expand Down
Loading
Loading