Skip to content

Commit d13dac1

Browse files
authored
Capture chain exchange messages in F3 observer (#1031)
Add the ability to capture chain exchange messages via F3 observer and make them queryable via SQL statement. The captured chain exchange messages offers a mapping of EC chain key to the concrete EC chain value itself, since partial GMessages only contain the EC chain key and not the chain itself. Fixes #970
1 parent c668526 commit d13dac1

File tree

5 files changed

+286
-11
lines changed

5 files changed

+286
-11
lines changed

cmd/f3/observer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@ var observerCmd = cli.Command{
125125
Usage: "The path to the directory used for intermediary finality certificate certstore. If not set, in-memory backing store will be used.",
126126
DefaultText: "in-memory",
127127
},
128+
&cli.DurationFlag{
129+
Name: "chainExchangeMaxMessageAge",
130+
Usage: "The maximum age of the chain exchange messages to observe.",
131+
Value: 3 * time.Minute,
132+
},
128133
},
129134

130135
Action: func(cctx *cli.Context) error {
@@ -138,6 +143,7 @@ var observerCmd = cli.Command{
138143
observer.WithMaxConcurrentConnectionAttempts(cctx.Int("reconnectConcurrency")),
139144
observer.WithMaxBatchSize(cctx.Int("maxBatchSize")),
140145
observer.WithMaxBatchDelay(cctx.Duration("maxBatchDelay")),
146+
observer.WithChainExchangeMaxMessageAge(cctx.Duration("chainExchangeMaxMessageAge")),
141147
}
142148
var identity crypto.PrivKey
143149
if cctx.IsSet("identity") {

observer/model.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,14 @@ type PowerTableDelta struct {
7070
SigningKey []byte `json:"SigningKey"`
7171
}
7272

73+
type ChainExchange struct {
74+
Timestamp time.Time `json:"Timestamp"`
75+
NetworkName string `json:"NetworkName"`
76+
Instance uint64 `json:"Instance"`
77+
VoteValueKey []byte `json:"VoteValueKey"`
78+
VoteValue []TipSet `json:"VoteValue"`
79+
}
80+
7381
func newMessage(timestamp time.Time, nn string, msg gpbft.PartialGMessage) (*Message, error) {
7482
j, err := newJustification(msg.Justification)
7583
if err != nil {
@@ -315,3 +323,14 @@ func powerTableDeltaFromGpbft(ptd certs.PowerTableDiff) ([]PowerTableDelta, erro
315323
}
316324
return deltas, nil
317325
}
326+
327+
func newChainExchange(timestamp time.Time, nn gpbft.NetworkName, instance uint64, chain *gpbft.ECChain) *ChainExchange {
328+
key := chain.Key()
329+
return &ChainExchange{
330+
Timestamp: timestamp,
331+
NetworkName: string(nn),
332+
Instance: instance,
333+
VoteValueKey: key[:],
334+
VoteValue: tipsetsFromGpbft(chain),
335+
}
336+
}

observer/observer.go

Lines changed: 221 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"database/sql"
66
"database/sql/driver"
77
_ "embed"
8+
"encoding/json"
89
"errors"
910
"fmt"
1011
"net/http"
@@ -17,6 +18,7 @@ import (
1718
"github.com/filecoin-project/go-f3/certexchange"
1819
"github.com/filecoin-project/go-f3/certexchange/polling"
1920
"github.com/filecoin-project/go-f3/certstore"
21+
"github.com/filecoin-project/go-f3/chainexchange"
2022
"github.com/filecoin-project/go-f3/gpbft"
2123
"github.com/filecoin-project/go-f3/internal/encoding"
2224
"github.com/filecoin-project/go-f3/internal/lotus"
@@ -33,6 +35,7 @@ import (
3335
"github.com/libp2p/go-libp2p/core/peer"
3436
"github.com/marcboeker/go-duckdb"
3537
"go.uber.org/multierr"
38+
"golang.org/x/exp/maps"
3639
"golang.org/x/sync/errgroup"
3740
)
3841

@@ -51,7 +54,7 @@ type Observer struct {
5154
dht *dht.IpfsDHT
5255

5356
messageObserved chan *Message
54-
msgEncoding *encoding.ZSTD[*gpbft.PartialGMessage]
57+
gMsgEncoding *encoding.ZSTD[*gpbft.PartialGMessage]
5558

5659
dbConnector *duckdb.Connector
5760
dbLatestMessages *duckdb.Appender
@@ -62,21 +65,32 @@ type Observer struct {
6265
dbFinalityCertificates *duckdb.Appender
6366
unflushedCertificateCount int
6467
lastFlushedCertificatesAt time.Time
68+
69+
chainExchangeObserved chan *chainexchange.Message
70+
ceMsgEncoding *encoding.ZSTD[*chainexchange.Message]
6571
}
6672

6773
func New(o ...Option) (*Observer, error) {
6874
opts, err := newOptions(o...)
6975
if err != nil {
7076
return nil, err
7177
}
72-
msgEncoding, err := encoding.NewZSTD[*gpbft.PartialGMessage]()
78+
gMsgEncoding, err := encoding.NewZSTD[*gpbft.PartialGMessage]()
79+
if err != nil {
80+
return nil, err
81+
}
82+
83+
ceMsgEncoding, err := encoding.NewZSTD[*chainexchange.Message]()
7384
if err != nil {
7485
return nil, err
7586
}
87+
7688
return &Observer{
77-
options: opts,
78-
messageObserved: make(chan *Message, opts.messageBufferSize),
79-
msgEncoding: msgEncoding,
89+
options: opts,
90+
messageObserved: make(chan *Message, opts.messageBufferSize),
91+
gMsgEncoding: gMsgEncoding,
92+
chainExchangeObserved: make(chan *chainexchange.Message, opts.chainExchangeBufferSize),
93+
ceMsgEncoding: ceMsgEncoding,
8094
}, nil
8195
}
8296

@@ -88,6 +102,7 @@ func (o *Observer) Start(ctx context.Context) error {
88102
ctx, stop := context.WithCancel(ctx)
89103
eg, ctx := errgroup.WithContext(ctx)
90104
eg.Go(func() error { return o.observeMessages(ctx) })
105+
eg.Go(func() error { return o.observeChainExchanges(ctx) })
91106
eg.Go(func() error { return o.observeFinalityCertificates(ctx) })
92107
eg.Go(func() error { return o.stayConnected(ctx) })
93108
eg.Go(o.listenAndServeQueries)
@@ -211,7 +226,7 @@ func (o *Observer) createOrReplaceMessagesView(ctx context.Context, includeParqu
211226
func (o *Observer) observeMessages(ctx context.Context) error {
212227
rotation := time.NewTicker(o.rotateInterval)
213228
flush := time.NewTicker(o.maxBatchDelay)
214-
stopObserverForNetwork, err := o.startObserverFor(ctx, o.networkName)
229+
stopObserverForNetwork, err := o.startMessageObserverFor(ctx, o.networkName)
215230
if err != nil {
216231
return fmt.Errorf("failed to start observer for network %s: %w", o.networkName, err)
217232
}
@@ -245,6 +260,103 @@ func (o *Observer) observeMessages(ctx context.Context) error {
245260
return nil
246261
}
247262

263+
func (o *Observer) observeChainExchanges(ctx context.Context) error {
264+
flush := time.NewTicker(o.maxBatchDelay)
265+
stopObserverForNetwork, err := o.startChainExchangeObserverFor(ctx, o.networkName)
266+
if err != nil {
267+
return fmt.Errorf("failed to start observer for network %s: %w", o.networkName, err)
268+
}
269+
270+
defer stopObserverForNetwork()
271+
272+
seenKeys := make(map[string]struct{})
273+
bufferedChains := make([]*ChainExchange, 0, o.maxBatchSize)
274+
275+
tryStoreAllChainExchanges := func() {
276+
start := time.Now()
277+
if err := o.storeAllChainExchanges(ctx, bufferedChains); err != nil {
278+
logger.Errorw("Failed to store chain exchanges", "count", len(bufferedChains), "err", err)
279+
// Don't clear; let it retry upon the next message.
280+
} else {
281+
logger.Infow("Stored batch of chain exchanges", "count", len(bufferedChains), "took", time.Since(start))
282+
bufferedChains = bufferedChains[:0]
283+
maps.Clear(seenKeys)
284+
}
285+
}
286+
287+
for ctx.Err() == nil {
288+
select {
289+
case <-ctx.Done():
290+
return nil
291+
case oc := <-o.chainExchangeObserved:
292+
timestamp := time.UnixMilli(oc.Timestamp)
293+
// Only key on vote value key instead of vote value key plus instance. Because,
294+
// the purpose of observing the chain exchanges is to be able to infer what a
295+
// vote value key corresponds to. We can use the flow of messages captured to
296+
// infer if an instance is re-using a key from previous instances.
297+
allPrefixes := oc.Chain.AllPrefixes()
298+
for i := len(allPrefixes) - 1; i >= 0 && ctx.Err() == nil; i-- {
299+
prefix := allPrefixes[i]
300+
key := prefix.Key()
301+
bufferKey := string(key[:])
302+
if _, exists := seenKeys[bufferKey]; exists {
303+
break
304+
}
305+
seenKeys[bufferKey] = struct{}{}
306+
exchange := newChainExchange(timestamp, o.networkName, oc.Instance, prefix)
307+
bufferedChains = append(bufferedChains, exchange)
308+
logger.Debugw("Observed chain exchange message", "message", exchange)
309+
if len(bufferedChains) >= o.maxBatchSize {
310+
tryStoreAllChainExchanges()
311+
}
312+
}
313+
case <-flush.C:
314+
tryStoreAllChainExchanges()
315+
}
316+
}
317+
return nil
318+
}
319+
320+
func (o *Observer) storeAllChainExchanges(ctx context.Context, exchanges []*ChainExchange) error {
321+
tx, err := o.db.BeginTx(ctx, nil)
322+
if err != nil {
323+
return err
324+
}
325+
326+
stmt, err := tx.PrepareContext(ctx, `
327+
INSERT OR IGNORE INTO chain_exchanges (Timestamp, NetworkName, Instance, VoteValueKey, VoteValue)
328+
VALUES (?, ?, ?, ?, ?::json)`)
329+
if err != nil {
330+
_ = tx.Rollback()
331+
return fmt.Errorf("failed to prepare statement while storing chain exchanges: %w", err)
332+
}
333+
defer func() {
334+
if err := stmt.Close(); err != nil {
335+
logger.Errorw("Failed to close prepared statement while storing chain exchanges", "err", err)
336+
}
337+
}()
338+
339+
for row, cx := range exchanges {
340+
voteAsJson, err := json.Marshal(cx.VoteValue)
341+
if err != nil {
342+
_ = tx.Rollback()
343+
return fmt.Errorf("failed to marshal chain exchange vote value at row %d: %w", row, err)
344+
}
345+
_, err = stmt.Exec(
346+
cx.Timestamp,
347+
cx.NetworkName,
348+
cx.Instance,
349+
cx.VoteValueKey,
350+
string(voteAsJson),
351+
)
352+
if err != nil {
353+
_ = tx.Rollback()
354+
return fmt.Errorf("failed to insert chain exchange at row %d: %w", row, err)
355+
}
356+
}
357+
return tx.Commit()
358+
}
359+
248360
func (o *Observer) observeFinalityCertificates(ctx context.Context) error {
249361
if o.initialPowerTableCID == cid.Undef {
250362
logger.Warn("Initial power table CID is not set. Finality certificates will not be collected.")
@@ -542,7 +654,7 @@ func (o *Observer) tryConnectToBootstrapPeers(ctx context.Context) {
542654
_ = eg.Wait()
543655
}
544656

545-
func (o *Observer) startObserverFor(ctx context.Context, networkName gpbft.NetworkName) (_stop func(), _err error) {
657+
func (o *Observer) startMessageObserverFor(ctx context.Context, networkName gpbft.NetworkName) (_stop func(), _err error) {
546658
topicName := manifest.PubSubTopicFromNetworkName(networkName)
547659
var (
548660
topic *pubsub.Topic
@@ -564,7 +676,7 @@ func (o *Observer) startObserverFor(ctx context.Context, networkName gpbft.Netwo
564676
}
565677
}()
566678
if !o.pubSubValidatorDisabled {
567-
if err := o.pubSub.RegisterTopicValidator(topicName, o.validatePubSubMessage); err != nil {
679+
if err := o.pubSub.RegisterTopicValidator(topicName, o.validatePubSubGMessage); err != nil {
568680
return nil, fmt.Errorf("failed to register topic validator: %w", err)
569681
}
570682
} else {
@@ -621,6 +733,81 @@ func (o *Observer) startObserverFor(ctx context.Context, networkName gpbft.Netwo
621733
}, nil
622734
}
623735

736+
func (o *Observer) startChainExchangeObserverFor(ctx context.Context, networkName gpbft.NetworkName) (_stop func(), _err error) {
737+
topicName := manifest.ChainExchangeTopicFromNetworkName(networkName)
738+
var (
739+
topic *pubsub.Topic
740+
subscription *pubsub.Subscription
741+
err error
742+
)
743+
744+
defer func() {
745+
if _err != nil {
746+
if !o.pubSubValidatorDisabled {
747+
_ = o.pubSub.UnregisterTopicValidator(topicName)
748+
if topic != nil {
749+
_ = topic.Close()
750+
}
751+
}
752+
if subscription != nil {
753+
subscription.Cancel()
754+
}
755+
}
756+
}()
757+
if err := o.pubSub.RegisterTopicValidator(topicName, o.validatePubSubChainExchangeMessage); err != nil {
758+
return nil, fmt.Errorf("failed to register chain exchange topic validator: %w", err)
759+
}
760+
topic, err = o.pubSub.Join(topicName, pubsub.WithTopicMessageIdFn(psutil.ChainExchangeMessageIdFn))
761+
if err != nil {
762+
return nil, fmt.Errorf("failed to join topic: %w", err)
763+
}
764+
if err = topic.SetScoreParams(psutil.PubsubTopicScoreParams); err != nil {
765+
logger.Warnw("failed to set topic score params for chain exchange", "err", err)
766+
}
767+
subscription, err = topic.Subscribe(pubsub.WithBufferSize(o.subBufferSize))
768+
if err != nil {
769+
return nil, fmt.Errorf("failed to subscribe to chain exchange topic: %w", err)
770+
}
771+
772+
ctx, cancel := context.WithCancel(ctx)
773+
var wg sync.WaitGroup
774+
wg.Add(1)
775+
go func() {
776+
defer func() {
777+
subscription.Cancel()
778+
_ = topic.Close()
779+
wg.Done()
780+
}()
781+
782+
for ctx.Err() == nil {
783+
msg, err := subscription.Next(ctx)
784+
if err != nil && !errors.Is(err, context.Canceled) {
785+
logger.Errorw("Failed to get next pubsub message from chain exchange topic", "network", networkName, "err", err)
786+
continue
787+
}
788+
if msg == nil || msg.ValidatorData == nil {
789+
continue
790+
}
791+
792+
ceMsg, ok := msg.ValidatorData.(chainexchange.Message)
793+
if !ok {
794+
logger.Errorw("Received message with invalid ValidatorData type", "expected", "chainexchange.Message", "got", fmt.Sprintf("%T", msg.ValidatorData))
795+
continue
796+
}
797+
798+
select {
799+
case <-ctx.Done():
800+
return
801+
case o.chainExchangeObserved <- &ceMsg:
802+
}
803+
}
804+
}()
805+
return func() {
806+
cancel()
807+
wg.Wait()
808+
}, nil
809+
}
810+
624811
func (o *Observer) anyParquetFilesPresent() (bool, error) {
625812
dir, err := os.ReadDir(o.rotatePath)
626813
if err != nil {
@@ -643,11 +830,35 @@ func (o *Observer) Stop(ctx context.Context) error {
643830
return err
644831
}
645832

646-
func (o *Observer) validatePubSubMessage(_ context.Context, _ peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
833+
func (o *Observer) validatePubSubGMessage(_ context.Context, _ peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
647834
var pgmsg gpbft.PartialGMessage
648-
if err := o.msgEncoding.Decode(msg.Data, &pgmsg); err != nil {
835+
if err := o.gMsgEncoding.Decode(msg.Data, &pgmsg); err != nil {
649836
return pubsub.ValidationReject
650837
}
651838
msg.ValidatorData = pgmsg
652839
return pubsub.ValidationAccept
653840
}
841+
842+
func (o *Observer) validatePubSubChainExchangeMessage(_ context.Context, _ peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
843+
var ceMsg chainexchange.Message
844+
if err := o.ceMsgEncoding.Decode(msg.Data, &ceMsg); err != nil {
845+
logger.Debugw("Failed to decode chain exchange message", "err", err, "message", msg)
846+
return pubsub.ValidationReject
847+
}
848+
if err := ceMsg.Chain.Validate(); err != nil {
849+
logger.Debugw("Invalid chain in chain exchange message", "err", err, "message", ceMsg)
850+
return pubsub.ValidationReject
851+
}
852+
if ceMsg.Chain.IsZero() {
853+
logger.Debugw("Chain in chain exchange message is zero", "message", ceMsg)
854+
return pubsub.ValidationReject
855+
}
856+
now := time.Now().UnixMilli()
857+
lowerBound := now - o.chainExchangeMaxMessageAge.Milliseconds()
858+
if lowerBound > ceMsg.Timestamp || ceMsg.Timestamp > now {
859+
logger.Debugw("Timestamp too old or too far ahead", "from", msg.GetFrom(), "timestamp", ceMsg.Timestamp, "lowerBound", lowerBound)
860+
return pubsub.ValidationIgnore
861+
}
862+
msg.ValidatorData = ceMsg
863+
return pubsub.ValidationAccept
864+
}

0 commit comments

Comments
 (0)