Skip to content

Commit 1818c1e

Browse files
authored
Batch insert observed messages using Duckdb appender (#998)
* Batch insert observed messages using Duckdb appender For fast batch insertion of messages observed into duckdb, use the native appender to batch the messages based on size or max wait time. Whichever happens first. Fixes #993 * Periodically attempt to flush in-flight messages out to the database This is to ensure max batch delay is respected independent of message arrival/discovery flow. * Fix usage of timer instead of ticker for both rotation and flush
1 parent 793b7d2 commit 1818c1e

File tree

3 files changed

+95
-24
lines changed

3 files changed

+95
-24
lines changed

cmd/f3/observer.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,16 @@ var observerCmd = cli.Command{
105105
Usage: "The connectivity threshold below which peer discovery via Lotus Net Peers is engaged. Disabled if set to zero or no lotusDaemon endpoints are provided.",
106106
Value: 100,
107107
},
108+
&cli.IntFlag{
109+
Name: "maxBatchSize",
110+
Usage: "The maximum number of messages to batch together in a single insertion into database.",
111+
Value: 1000,
112+
},
113+
&cli.DurationFlag{
114+
Name: "maxBatchDelay",
115+
Usage: "The maximum time to wait before a batch is flushed to the database.",
116+
Value: time.Minute,
117+
},
108118
},
109119

110120
Action: func(cctx *cli.Context) error {
@@ -116,6 +126,8 @@ var observerCmd = cli.Command{
116126
observer.WithRetention(cctx.Duration("retention")),
117127
observer.WithDataSourceName(cctx.String("dataSourceName")),
118128
observer.WithMaxConcurrentConnectionAttempts(cctx.Int("reconnectConcurrency")),
129+
observer.WithMaxBatchSize(cctx.Int("maxBatchSize")),
130+
observer.WithMaxBatchDelay(cctx.Duration("maxBatchDelay")),
119131
}
120132
var identity crypto.PrivKey
121133
if cctx.IsSet("identity") {

observer/observer.go

Lines changed: 61 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package observer
33
import (
44
"context"
55
"database/sql"
6+
"database/sql/driver"
67
_ "embed"
7-
"encoding/json"
88
"errors"
99
"fmt"
1010
"net/http"
@@ -24,7 +24,7 @@ import (
2424
pubsub "github.com/libp2p/go-libp2p-pubsub"
2525
record "github.com/libp2p/go-libp2p-record"
2626
"github.com/libp2p/go-libp2p/core/peer"
27-
_ "github.com/marcboeker/go-duckdb"
27+
"github.com/marcboeker/go-duckdb"
2828
"go.uber.org/multierr"
2929
"golang.org/x/sync/errgroup"
3030
)
@@ -45,6 +45,12 @@ type Observer struct {
4545

4646
messageObserved chan *Message
4747
msgEncoding *encoding.ZSTD[*pmsg.PartialGMessage]
48+
49+
dbConnector *duckdb.Connector
50+
dbAppender *duckdb.Appender
51+
dbConnection driver.Conn
52+
unflushedMessageCount int
53+
lastFlushedAt time.Time
4854
}
4955

5056
func New(o ...Option) (*Observer, error) {
@@ -75,14 +81,18 @@ func (o *Observer) Start(ctx context.Context) error {
7581
eg.Go(o.listenAndServeQueries)
7682
o.stop = func() error {
7783
stop()
78-
return eg.Wait()
84+
return multierr.Combine(
85+
o.dbAppender.Close(),
86+
o.dbConnection.Close(),
87+
o.dbConnector.Close(),
88+
o.db.Close(),
89+
eg.Wait())
7990
}
8091
return nil
8192
}
8293

8394
func (o *Observer) initialize(ctx context.Context) error {
8495
var err error
85-
8696
if o.pubSub == nil {
8797
// Set up pubsub to listen for GPBFT messages.
8898
if o.pubSub, err = pubsub.NewGossipSub(ctx, o.host,
@@ -115,9 +125,11 @@ func (o *Observer) initialize(ctx context.Context) error {
115125
}
116126

117127
// Set up database connection.
118-
if o.db, err = sql.Open("duckdb", o.dataSourceName); err != nil {
119-
return err
128+
o.dbConnector, err = duckdb.NewConnector(o.dataSourceName, nil)
129+
if err != nil {
130+
return fmt.Errorf("failed to create duckdb connector: %w", err)
120131
}
132+
o.db = sql.OpenDB(o.dbConnector)
121133

122134
// Create database schema.
123135
if _, err := o.db.ExecContext(ctx, schema); err != nil {
@@ -138,6 +150,16 @@ func (o *Observer) initialize(ctx context.Context) error {
138150
return err
139151
}
140152

153+
// Set up appender used for batch insertion of messages observed.
154+
o.dbConnection, err = o.dbConnector.Connect(ctx)
155+
if err != nil {
156+
return fmt.Errorf("failed to connect to duckdb: %w", err)
157+
}
158+
o.dbAppender, err = duckdb.NewAppenderFromConn(o.dbConnection, "", "latest_messages")
159+
if err != nil {
160+
return fmt.Errorf("failed to create duckdb appender: %w", err)
161+
}
162+
141163
// If connectivity check interval is enabled, repair connections once to avoid
142164
// waiting for the ticker.
143165
if o.connectivityCheckInterval > 0 {
@@ -169,14 +191,16 @@ func (o *Observer) createOrReplaceMessagesView(ctx context.Context, includeParqu
169191
}
170192

171193
func (o *Observer) observe(ctx context.Context) error {
172-
rotation := time.NewTimer(o.rotateInterval)
194+
rotation := time.NewTicker(o.rotateInterval)
195+
flush := time.NewTicker(o.maxBatchDelay)
173196
stopObserverForNetwork, err := o.startObserverFor(ctx, o.networkName)
174197
if err != nil {
175198
return fmt.Errorf("failed to start observer for network %s: %w", o.networkName, err)
176199
}
177200

178201
defer func() {
179202
rotation.Stop()
203+
flush.Stop()
180204
stopObserverForNetwork()
181205
}()
182206

@@ -194,36 +218,49 @@ func (o *Observer) observe(ctx context.Context) error {
194218
if err := o.rotateMessages(ctx); err != nil {
195219
logger.Errorw("Failed to rotate latest messages", "err", err)
196220
}
221+
case <-flush.C:
222+
if err := o.tryFlushMessages(); err != nil {
223+
logger.Errorw("Failed to flush observed messages", "err", err)
224+
}
197225
}
198226
}
199227
return nil
200228
}
201229

202-
func (o *Observer) storeMessage(ctx context.Context, om *Message) error {
203-
const insertMessage = `INSERT INTO latest_messages VALUES(?,?,?,?::json,?,?,?::json,?);`
204-
voteMarshaled, err := json.Marshal(om.Vote)
205-
if err != nil {
206-
return fmt.Errorf("failed to marshal vote: %w", err)
207-
}
208-
var justificationMarshaled any
230+
func (o *Observer) storeMessage(_ context.Context, om *Message) error {
231+
var justification any
209232
if om.Justification != nil {
210-
v, err := json.Marshal(om.Justification)
211-
if err != nil {
212-
return fmt.Errorf("failed to marshal justification: %w", err)
213-
}
214-
justificationMarshaled = string(v)
233+
// Dereference to get the go-duckdb reflection in appender to behave. Otherwise,
234+
// it fails as it interprets the type as mismatch due to how nullable fields are
235+
// handled.
236+
justification = *om.Justification
215237
}
216-
if _, err := o.db.ExecContext(ctx, insertMessage,
238+
if err := o.dbAppender.AppendRow(
217239
om.Timestamp,
218240
om.NetworkName,
219-
om.Sender,
220-
string(voteMarshaled),
241+
int64(om.Sender),
242+
om.Vote,
221243
om.Signature,
222244
om.Ticket,
223-
justificationMarshaled,
245+
justification,
224246
om.VoteValueKey,
225247
); err != nil {
226-
return fmt.Errorf("failed to execute query: %w", err)
248+
return fmt.Errorf("failed to append row: %w", err)
249+
}
250+
o.unflushedMessageCount++
251+
return o.tryFlushMessages()
252+
}
253+
254+
func (o *Observer) tryFlushMessages() error {
255+
batchSizeReached := o.unflushedMessageCount >= o.maxBatchSize
256+
batchDelayElapsed := !o.lastFlushedAt.IsZero() && time.Since(o.lastFlushedAt) >= o.maxBatchDelay
257+
if batchSizeReached || batchDelayElapsed {
258+
if err := o.dbAppender.Flush(); err != nil {
259+
return fmt.Errorf("failed to flush appender: %w", err)
260+
}
261+
logger.Infow("Flushed messages to database", "count", o.unflushedMessageCount, "after", time.Since(o.lastFlushedAt))
262+
o.unflushedMessageCount = 0
263+
o.lastFlushedAt = time.Now()
227264
}
228265
return nil
229266
}

observer/options.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ type options struct {
4141
pubSubValidatorDisabled bool
4242

4343
dataSourceName string
44+
45+
maxBatchSize int
46+
maxBatchDelay time.Duration
4447
}
4548

4649
func newOptions(opts ...Option) (*options, error) {
@@ -54,6 +57,8 @@ func newOptions(opts ...Option) (*options, error) {
5457
rotatePath: ".",
5558
rotateInterval: 10 * time.Minute,
5659
retention: -1,
60+
maxBatchSize: 1000,
61+
maxBatchDelay: time.Minute,
5762
}
5863
for _, apply := range opts {
5964
if err := apply(&opt); err != nil {
@@ -256,3 +261,20 @@ func WithPubSub(ps *pubsub.PubSub) Option {
256261
return nil
257262
}
258263
}
264+
265+
func WithMaxBatchSize(size int) Option {
266+
return func(o *options) error {
267+
o.maxBatchSize = size
268+
return nil
269+
}
270+
}
271+
272+
func WithMaxBatchDelay(d time.Duration) Option {
273+
return func(o *options) error {
274+
if d < 0 {
275+
return fmt.Errorf("max batch delay must be greater than or equal to 0")
276+
}
277+
o.maxBatchDelay = d
278+
return nil
279+
}
280+
}

0 commit comments

Comments
 (0)