Skip to content

Commit 95b25fe

Browse files
authored
[fix] BREAKING CHANGE revert some API changes to db layer to make the producer optional (#115)
The previous set of breaking changes made the producer required in places where it previously was optional. That was a cleanup but it breaks some use cases. This change reverts some of those changes so that a tracer is redundantly required.
1 parent a417fc5 commit 95b25fe

File tree

3 files changed

+62
-36
lines changed

3 files changed

+62
-36
lines changed

eventdb/transaction.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"github.com/memsql/errors"
88

99
"github.com/singlestore-labs/events/eventmodels"
10-
"github.com/singlestore-labs/generic"
1110
)
1211

1312
// BasicTX is what's needed from a transaction in WrapTransaction
@@ -34,41 +33,44 @@ type ComboDB[ID eventmodels.AbstractID[ID], TX BasicTX] interface {
3433
eventmodels.AbstractDB[ID, TX]
3534
}
3635

37-
type SaveEventsFunc[ID eventmodels.AbstractID[ID], TX BasicTX] func(context.Context, TX, eventmodels.Producer[ID, TX], ...eventmodels.ProducingEvent) (map[string][]ID, error)
36+
// SaveEventsFunc is used to preserve events from a transaction.
37+
// It needs to work if the producer is nil. Tracer can be ignored
38+
// if producer is not nil.
39+
type SaveEventsFunc[ID eventmodels.AbstractID[ID], TX BasicTX] func(context.Context, eventmodels.Tracer, TX, eventmodels.Producer[ID, TX], ...eventmodels.ProducingEvent) (map[string][]ID, error)
3840

3941
// Transact implements a Transact method as needed by AbstractDB (in ComboDB).
4042
// It does not call itself recursively and insteads depends upon BeginTx
4143
// from BasicDB (in ComboDB).
44+
// backupTracer is only used if optProducer is nil.
4245
func Transact[ID eventmodels.AbstractID[ID], TX BasicTX, DB ComboDB[ID, TX]](
4346
ctx context.Context,
4447
db DB,
48+
backupTracer eventmodels.Tracer,
4549
f func(TX) error,
4650
saveEvents SaveEventsFunc[ID, TX],
47-
producer eventmodels.Producer[ID, TX],
51+
optProducer eventmodels.Producer[ID, TX],
4852
) error {
49-
ids, err := WrapTransaction[ID, TX, DB](ctx, db, f, saveEvents, producer)
53+
ids, err := WrapTransaction[ID, TX, DB](ctx, backupTracer, db, f, saveEvents, optProducer)
5054
if err != nil {
5155
return err
5256
}
53-
if len(ids) != 0 {
54-
if producer != nil {
55-
err = producer.ProduceFromTable(ctx, ids)
56-
} else {
57-
_, err = db.ProduceSpecificTxEvents(ctx, generic.CombineSlices(generic.Values(ids)...))
58-
}
57+
if len(ids) != 0 && optProducer != nil {
58+
err = optProducer.ProduceFromTable(ctx, ids)
5959
}
6060
return err
6161
}
6262

6363
// WrapTransaction is a building block that can be shared between database-sprecific
6464
// implementations. It handles the Begin/Rollback/Commit sequence and saving
6565
// events.
66+
// backupTracer is only used if optProducer is nil
6667
func WrapTransaction[ID eventmodels.AbstractID[ID], TX BasicTX, DB BasicDB[TX]](
6768
ctx context.Context,
69+
backupTracer eventmodels.Tracer,
6870
db DB,
6971
f func(TX) error,
7072
saveEvents SaveEventsFunc[ID, TX],
71-
producer eventmodels.Producer[ID, TX],
73+
optProducer eventmodels.Producer[ID, TX],
7274
) (map[string][]ID, error) {
7375
var ids map[string][]ID
7476
err := func() (err error) {
@@ -98,11 +100,11 @@ func WrapTransaction[ID eventmodels.AbstractID[ID], TX BasicTX, DB BasicDB[TX]](
98100
return err
99101
}
100102
if pending := tx.GetPendingEvents(); len(pending) != 0 {
101-
err := ValidateEventTopics(ctx, producer, pending...)
103+
err := ValidateEventTopics(ctx, optProducer, pending...)
102104
if err != nil {
103105
return err
104106
}
105-
ids, err = saveEvents(ctx, tx, producer, pending...)
107+
ids, err = saveEvents(ctx, backupTracer, tx, optProducer, pending...)
106108
if err != nil {
107109
return err
108110
}
@@ -112,12 +114,14 @@ func WrapTransaction[ID eventmodels.AbstractID[ID], TX BasicTX, DB BasicDB[TX]](
112114
return ids, err
113115
}
114116

117+
// ValidateEventTopics returns an error if the topics are not valid. It only
118+
// checks if a Producer is provided.
115119
func ValidateEventTopics[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX](
116120
ctx context.Context,
117-
producer eventmodels.Producer[ID, TX],
121+
optProducer eventmodels.Producer[ID, TX],
118122
events ...eventmodels.ProducingEvent,
119123
) error {
120-
if producer == nil {
124+
if optProducer == nil {
121125
return nil
122126
}
123127
topics := make([]string, 0, len(events))
@@ -129,5 +133,5 @@ func ValidateEventTopics[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractT
129133
topics = append(topics, topic)
130134
}
131135
}
132-
return producer.ValidateTopics(ctx, topics)
136+
return optProducer.ValidateTopics(ctx, topics)
133137
}

eventpg/produce.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"database/sql"
77
"encoding/json"
8+
"log"
89
"os"
910
"time"
1011

@@ -49,8 +50,16 @@ func (w *Connection[TX, DB]) AugmentWithProducer(producer eventmodels.Producer[e
4950
// The methods that Connection supports are all done with stubs so that the underlying implementations can be
5051
// reused by other implementations of AbstractDB.
5152

53+
// backupTracer only needs to create a tracer if we don't have a producer.
54+
func (c Connection[TX, DB]) backupTracer() eventmodels.Tracer {
55+
if c.producer == nil {
56+
return log.Printf
57+
}
58+
return nil
59+
}
60+
5261
func (c *Connection[TX, DB]) Transact(ctx context.Context, f func(TX) error) error {
53-
return eventdb.Transact[eventmodels.StringEventID, TX, *Connection[TX, DB]](ctx, c, f, SaveEventsInsideTx[TX], c.producer)
62+
return eventdb.Transact[eventmodels.StringEventID, TX, *Connection[TX, DB]](ctx, c, c.backupTracer(), f, SaveEventsInsideTx[TX], c.producer)
5463
}
5564

5665
func (c *Connection[TX, DB]) LockOrError(ctx context.Context, key uint32, timeout time.Duration) (unlock func() error, err error) {
@@ -70,7 +79,7 @@ func (c Connection[TX, DB]) MarkEventProcessed(ctx context.Context, tx TX, topic
7079
}
7180

7281
func (c Connection[TX, DB]) SaveEventsInsideTx(ctx context.Context, tx TX, events ...eventmodels.ProducingEvent) (map[string][]eventmodels.StringEventID, error) {
73-
return SaveEventsInsideTx[TX](ctx, tx, c.producer, events...)
82+
return SaveEventsInsideTx[TX](ctx, c.backupTracer(), tx, c.producer, events...)
7483
}
7584

7685
func LockOrError[TX eventmodels.AbstractTX, DB eventmodels.CanTransact[TX]](ctx context.Context, db DB, key uint32, timeout time.Duration) (unlock func() error, err error) {
@@ -273,15 +282,13 @@ func MarkEventProcessed[TX eventmodels.AbstractTX](ctx context.Context, tx TX, t
273282
}
274283

275284
// SaveEventsInsideTx is meant to be used inside a transaction to persist
276-
// events as part of that transaction.
277-
func SaveEventsInsideTx[TX eventmodels.AbstractTX](ctx context.Context, tx TX, producer eventmodels.Producer[eventmodels.StringEventID, TX], events ...eventmodels.ProducingEvent) (map[string][]eventmodels.StringEventID, error) {
285+
// events as part of that transaction. backupTracer is unused if producer is provided.
286+
// topics are unvalidated if optProducer is not provided.
287+
func SaveEventsInsideTx[TX eventmodels.AbstractTX](ctx context.Context, backupTracer eventmodels.Tracer, tx TX, optProducer eventmodels.Producer[eventmodels.StringEventID, TX], events ...eventmodels.ProducingEvent) (map[string][]eventmodels.StringEventID, error) {
278288
if len(events) == 0 {
279289
return nil, nil
280290
}
281-
if producer == nil {
282-
return nil, errors.Errorf("attempt to save events inside tx without a producer")
283-
}
284-
err := eventdb.ValidateEventTopics(ctx, producer, events...)
291+
err := eventdb.ValidateEventTopics(ctx, optProducer, events...)
285292
if err != nil {
286293
return nil, err
287294
}
@@ -305,14 +312,18 @@ func SaveEventsInsideTx[TX eventmodels.AbstractTX](ctx context.Context, tx TX, p
305312
}
306313
ib = ib.Values(id, i+1, topic, event.GetTimestamp().UTC(), event.GetKey(), enc, headersEnc)
307314
}
308-
producer.TracerProvider(ctx)("[events] saving %d events as part of a transaction, example topic: '%s'", len(events), events[0].GetTopic())
315+
tracer := backupTracer
316+
if optProducer != nil {
317+
tracer = optProducer.TracerProvider(ctx)
318+
}
319+
tracer("[events] saving %d events as part of a transaction, example topic: '%s'", len(events), events[0].GetTopic())
309320
sql, args, err := ib.PlaceholderFormat(sq.Dollar).ToSql()
310321
if err != nil {
311322
return nil, errors.WithStack(err)
312323
}
313324
_, err = tx.ExecContext(ctx, sql, args...)
314325
if err != nil {
315-
producer.TracerProvider(ctx)("[SaveEventsInsideTx] could not insert event: %s: %s", sql, err)
326+
tracer("[SaveEventsInsideTx] could not insert event: %s: %s", sql, err)
316327
return nil, errors.WithStack(err)
317328
}
318329
return ids, nil

events2/produce.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"database/sql"
77
"encoding/json"
88
"fmt"
9+
"log"
910
"os"
1011
"time"
1112

@@ -103,8 +104,16 @@ func (c *Connection[TX, DB]) AugmentWithProducer(producer eventmodels.Producer[e
103104
// The methods that Connection supports are all done with stubs so that the underlying implementations can be
104105
// reused by other implementations of AbstractDB.
105106

107+
// backupTracer only needs to create a tracer if we don't have a producer.
108+
func (c Connection[TX, DB]) backupTracer() eventmodels.Tracer {
109+
if c.producer == nil {
110+
return log.Printf
111+
}
112+
return nil
113+
}
114+
106115
func (c *Connection[TX, DB]) Transact(ctx context.Context, f func(TX) error) error {
107-
return eventdb.Transact[eventmodels.BinaryEventID, TX, *Connection[TX, DB]](ctx, c, f, SaveEventsInsideTx[TX], c.producer)
116+
return eventdb.Transact[eventmodels.BinaryEventID, TX, *Connection[TX, DB]](ctx, c, c.backupTracer(), f, SaveEventsInsideTx[TX], c.producer)
108117
}
109118

110119
func (c *Connection[TX, DB]) LockOrError(ctx context.Context, key uint32, timeout time.Duration) (unlock func() error, err error) {
@@ -124,7 +133,7 @@ func (c Connection[TX, DB]) MarkEventProcessed(ctx context.Context, tx TX, topic
124133
}
125134

126135
func (c Connection[TX, DB]) SaveEventsInsideTx(ctx context.Context, tx TX, events ...eventmodels.ProducingEvent) (map[string][]eventmodels.BinaryEventID, error) {
127-
return SaveEventsInsideTx[TX](ctx, tx, c.producer, events...)
136+
return SaveEventsInsideTx[TX](ctx, c.backupTracer(), tx, c.producer, events...)
128137
}
129138

130139
func LockOrError[TX eventmodels.AbstractTX, DB eventmodels.CanTransact[TX]](
@@ -354,15 +363,13 @@ func MarkEventProcessed[TX eventmodels.AbstractTX](ctx context.Context, tx TX, t
354363
}
355364

356365
// SaveEventsInsideTx is meant to be used inside a transaction to persist
357-
// events as part of that transaction.
358-
func SaveEventsInsideTx[TX eventmodels.AbstractTX](ctx context.Context, tx TX, producer eventmodels.Producer[eventmodels.BinaryEventID, TX], events ...eventmodels.ProducingEvent) (map[string][]eventmodels.BinaryEventID, error) {
366+
// events as part of that transaction. backupTracer is unused if producer is provided.
367+
// topics are unvalidated if a producer is not provided.
368+
func SaveEventsInsideTx[TX eventmodels.AbstractTX](ctx context.Context, backupTracer eventmodels.Tracer, tx TX, optProducer eventmodels.Producer[eventmodels.BinaryEventID, TX], events ...eventmodels.ProducingEvent) (map[string][]eventmodels.BinaryEventID, error) {
359369
if len(events) == 0 {
360370
return nil, nil
361371
}
362-
if producer == nil {
363-
return nil, errors.Errorf("attempt to save events inside tx without a producer")
364-
}
365-
err := eventdb.ValidateEventTopics(ctx, producer, events...)
372+
err := eventdb.ValidateEventTopics(ctx, optProducer, events...)
366373
if err != nil {
367374
return nil, err
368375
}
@@ -386,14 +393,18 @@ func SaveEventsInsideTx[TX eventmodels.AbstractTX](ctx context.Context, tx TX, p
386393
}
387394
ib = ib.Values(id, i+1, event.GetTopic(), event.GetTimestamp().UTC().Format("2006-01-02 15:04:05.000000"), event.GetKey(), enc, headersEnc)
388395
}
389-
producer.TracerProvider(ctx)("[events] saving %d events as part of a transaction, example topic: '%s'", len(events), events[0].GetTopic())
396+
tracer := backupTracer
397+
if optProducer != nil {
398+
tracer = optProducer.TracerProvider(ctx)
399+
}
400+
tracer("[events] saving %d events as part of a transaction, example topic: '%s'", len(events), events[0].GetTopic())
390401
sql, args, err := ib.PlaceholderFormat(sq.Question).ToSql()
391402
if err != nil {
392403
return nil, errors.WithStack(err)
393404
}
394405
_, err = tx.ExecContext(ctx, sql, args...)
395406
if err != nil {
396-
producer.TracerProvider(ctx)("[SaveEventsInsideTx] could not insert event: %s: %s", sql, err)
407+
tracer("[SaveEventsInsideTx] could not insert event: %s: %s", sql, err)
397408
return nil, errors.WithStack(err)
398409
}
399410
return ids, nil

0 commit comments

Comments
 (0)