Skip to content

Commit e7cba48

Browse files
Streaming repo parsing (#1000)
First pass at a streaming repo parse utility. This iterates the repo in mst order, waiting for the blocks it needs to be read from the passed in stream. It currently doesnt have a good backpressure mechanism, will need to add that next. I kinda wanted to get this to be a generator pattern type thing instead of using goroutines, but since the blocks can be reordered it gets sticky... we will see
2 parents 2b44b21 + 0223d29 commit e7cba48

File tree

9 files changed

+413
-62
lines changed

9 files changed

+413
-62
lines changed

backfill/backfill.go

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"errors"
77
"fmt"
8+
"io"
89
"log/slog"
910
"net/http"
1011
"strings"
@@ -301,6 +302,7 @@ func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int {
301302
type recordQueueItem struct {
302303
recordPath string
303304
nodeCid cid.Cid
305+
data []byte
304306
}
305307

306308
type recordResult struct {
@@ -323,8 +325,8 @@ func (e *FetchRepoError) Error() string {
323325
return fmt.Sprintf("failed to get repo: %s (%d)", reason, e.StatusCode)
324326
}
325327

326-
// Fetches a repo CAR file over HTTP from the indicated host. If successful, parses the CAR and returns repo.Repo
327-
func (b *Backfiller) fetchRepo(ctx context.Context, did, since, host string) (*repo.Repo, error) {
328+
// Fetches a repo CAR file over HTTP from the indicated host.
329+
func (b *Backfiller) fetchRepo(ctx context.Context, did, since, host string) (io.ReadCloser, error) {
328330
url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", host, did)
329331

330332
if since != "" {
@@ -366,13 +368,7 @@ func (b *Backfiller) fetchRepo(ctx context.Context, did, since, host string) (*r
366368
counter: backfillBytesProcessed.WithLabelValues(b.Name),
367369
}
368370

369-
defer instrumentedReader.Close()
370-
371-
repo, err := repo.ReadRepoFromCar(ctx, instrumentedReader)
372-
if err != nil {
373-
return nil, fmt.Errorf("failed to parse repo from CAR file: %w", err)
374-
}
375-
return repo, nil
371+
return &instrumentedReader, nil
376372
}
377373

378374
// BackfillRepo backfills a repo
@@ -390,7 +386,7 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error)
390386
}
391387
log.Info(fmt.Sprintf("processing backfill for %s", repoDID))
392388

393-
var r *repo.Repo
389+
var r io.ReadCloser
394390
if b.tryRelayRepoFetch {
395391
rr, err := b.fetchRepo(ctx, repoDID, job.Rev(), b.RelayHost)
396392
if err != nil {
@@ -426,36 +422,39 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error)
426422
recordQueue := make(chan recordQueueItem, numRoutines)
427423
recordResults := make(chan recordResult, numRoutines)
428424

425+
var rev string
426+
// guaranteed to be called before any items are sent on the recordQueue channel
427+
onCommit := func(sc *repo.SignedCommit) error {
428+
rev = sc.Rev
429+
return nil
430+
}
431+
429432
// Producer routine
433+
var streamRecordsError error
430434
go func() {
435+
defer r.Close()
431436
defer close(recordQueue)
432-
if err := r.ForEach(ctx, b.NSIDFilter, func(recordPath string, nodeCid cid.Cid) error {
437+
err := repo.StreamRepoRecords(ctx, r, b.NSIDFilter, onCommit, func(recordPath string, nodeCid cid.Cid, data []byte) error {
433438
numRecords++
434-
recordQueue <- recordQueueItem{recordPath: recordPath, nodeCid: nodeCid}
439+
recordQueue <- recordQueueItem{recordPath: recordPath, nodeCid: nodeCid, data: data}
435440
return nil
436-
}); err != nil {
437-
log.Error("failed to iterate records in repo", "err", err)
441+
})
442+
if err != nil {
443+
streamRecordsError = fmt.Errorf("failed to iterate records in repo: %w", err)
438444
}
439445
}()
440446

441-
rev := r.SignedCommit().Rev
442-
443447
// Consumer routines
444448
wg := sync.WaitGroup{}
445449
for i := 0; i < numRoutines; i++ {
446450
wg.Add(1)
447451
go func() {
448452
defer wg.Done()
449453
for item := range recordQueue {
450-
blk, err := r.Blockstore().Get(ctx, item.nodeCid)
451-
if err != nil {
452-
recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to get blocks for record: %w", err)}
453-
continue
454-
}
455454

456-
raw := blk.RawData()
455+
raw := item.data
457456

458-
err = b.HandleCreateRecord(ctx, repoDID, rev, item.recordPath, &raw, &item.nodeCid)
457+
err := b.HandleCreateRecord(ctx, repoDID, rev, item.recordPath, &raw, &item.nodeCid)
459458
if err != nil {
460459
recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)}
461460
continue
@@ -483,7 +482,11 @@ func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error)
483482
close(recordResults)
484483
resultWG.Wait()
485484

486-
if err := job.SetRev(ctx, r.SignedCommit().Rev); err != nil {
485+
if streamRecordsError != nil {
486+
return "failed to stream records", streamRecordsError
487+
}
488+
489+
if err := job.SetRev(ctx, rev); err != nil {
487490
log.Error("failed to update rev after backfilling repo", "err", err)
488491
}
489492

backfill/gormstore.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,10 @@ func (j *Gormjob) BufferOps(ctx context.Context, since *string, rev string, ops
180180
switch j.state {
181181
case StateComplete:
182182
return false, nil
183-
case StateInProgress, StateEnqueued:
183+
case StateEnqueued:
184+
// if the repo is enqueue, but not actively being backfilled, just ignore events for it for now
185+
return true, nil
186+
case StateInProgress:
184187
// keep going and buffer the op
185188
default:
186189
if strings.HasPrefix(j.state, "failed") {

cmd/gosky/main.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -634,11 +634,15 @@ var listAllRecordsCmd = &cli.Command{
634634

635635
var repob []byte
636636
if strings.HasPrefix(arg, "did:") {
637-
xrpcc, err := cliutil.GetXrpcClient(cctx, true)
637+
resp, err := identity.DefaultDirectory().LookupDID(ctx, syntax.DID(arg))
638638
if err != nil {
639639
return err
640640
}
641641

642+
xrpcc := &xrpc.Client{
643+
Host: resp.PDSEndpoint(),
644+
}
645+
642646
if arg == "" {
643647
arg = xrpcc.Auth.Did
644648
}
@@ -660,36 +664,26 @@ var listAllRecordsCmd = &cli.Command{
660664
repob = fb
661665
}
662666

663-
rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repob))
664-
if err != nil {
665-
return err
666-
}
667-
668667
collection := "app.bsky.feed.post"
669668
if cctx.Bool("all") {
670669
collection = ""
671670
}
672671
vals := cctx.Bool("values")
673672
cids := cctx.Bool("cids")
674673

675-
if err := rr.ForEach(ctx, collection, func(k string, v cid.Cid) error {
674+
if err := repo.StreamRepoRecords(ctx, bytes.NewReader(repob), collection, nil, func(k string, cc cid.Cid, v []byte) error {
676675
if !strings.HasPrefix(k, collection) {
677676
return repo.ErrDoneIterating
678677
}
679678

680679
fmt.Print(k)
681680
if cids {
682-
fmt.Println(" - ", v)
681+
fmt.Println(" - ", cc)
683682
} else {
684683
fmt.Println()
685684
}
686685
if vals {
687-
b, err := rr.Blockstore().Get(ctx, v)
688-
if err != nil {
689-
return err
690-
}
691-
692-
convb, err := cborToJson(b.RawData())
686+
convb, err := cborToJson(v)
693687
if err != nil {
694688
return err
695689
}

cmd/gosky/sync.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/bluesky-social/indigo/atproto/identity"
1010
"github.com/bluesky-social/indigo/atproto/syntax"
1111
"github.com/bluesky-social/indigo/util/cliutil"
12+
"github.com/bluesky-social/indigo/xrpc"
1213

1314
cli "github.com/urfave/cli/v2"
1415
)
@@ -54,11 +55,10 @@ var syncGetRepoCmd = &cli.Command{
5455
carPath = ident.DID.String() + ".car"
5556
}
5657

57-
xrpcc, err := cliutil.GetXrpcClient(cctx, false)
58-
if err != nil {
59-
return err
58+
xrpcc := &xrpc.Client{
59+
Host: ident.PDSEndpoint(),
6060
}
61-
xrpcc.Host = ident.PDSEndpoint()
61+
6262
if xrpcc.Host == "" {
6363
return fmt.Errorf("no PDS endpoint for identity")
6464
}

events/consumer.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
comatproto "github.com/bluesky-social/indigo/api/atproto"
1313
"github.com/prometheus/client_golang/prometheus"
1414

15+
cbg "github.com/whyrusleeping/cbor-gen"
16+
1517
"github.com/gorilla/websocket"
1618
)
1719

@@ -162,6 +164,13 @@ func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler,
162164
return nil
163165
})
164166

167+
cr := new(cbg.CborReader)
168+
169+
ir := &instrumentedReader{
170+
addr: remoteAddr,
171+
bytesCounter: bytesFromStreamCounter.WithLabelValues(remoteAddr),
172+
}
173+
165174
lastSeq := int64(-1)
166175
for {
167176
select {
@@ -182,14 +191,12 @@ func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler,
182191
// ok
183192
}
184193

185-
r := &instrumentedReader{
186-
r: rawReader,
187-
addr: remoteAddr,
188-
bytesCounter: bytesFromStreamCounter.WithLabelValues(remoteAddr),
189-
}
194+
ir.r = rawReader
195+
196+
cr.SetReader(ir)
190197

191198
var header EventHeader
192-
if err := header.UnmarshalCBOR(r); err != nil {
199+
if err := header.UnmarshalCBOR(cr); err != nil {
193200
return fmt.Errorf("reading header: %w", err)
194201
}
195202

@@ -200,7 +207,7 @@ func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler,
200207
switch header.MsgType {
201208
case "#commit":
202209
var evt comatproto.SyncSubscribeRepos_Commit
203-
if err := evt.UnmarshalCBOR(r); err != nil {
210+
if err := evt.UnmarshalCBOR(cr); err != nil {
204211
return fmt.Errorf("reading repoCommit event: %w", err)
205212
}
206213

@@ -217,7 +224,7 @@ func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler,
217224
}
218225
case "#sync":
219226
var evt comatproto.SyncSubscribeRepos_Sync
220-
if err := evt.UnmarshalCBOR(r); err != nil {
227+
if err := evt.UnmarshalCBOR(cr); err != nil {
221228
return fmt.Errorf("reading repoSync event: %w", err)
222229
}
223230

@@ -234,7 +241,7 @@ func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler,
234241
}
235242
case "#identity":
236243
var evt comatproto.SyncSubscribeRepos_Identity
237-
if err := evt.UnmarshalCBOR(r); err != nil {
244+
if err := evt.UnmarshalCBOR(cr); err != nil {
238245
return err
239246
}
240247

@@ -250,7 +257,7 @@ func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler,
250257
}
251258
case "#account":
252259
var evt comatproto.SyncSubscribeRepos_Account
253-
if err := evt.UnmarshalCBOR(r); err != nil {
260+
if err := evt.UnmarshalCBOR(cr); err != nil {
254261
return err
255262
}
256263

@@ -267,7 +274,7 @@ func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler,
267274
case "#info":
268275
// TODO: this might also be a LabelInfo (as opposed to RepoInfo)
269276
var evt comatproto.SyncSubscribeRepos_Info
270-
if err := evt.UnmarshalCBOR(r); err != nil {
277+
if err := evt.UnmarshalCBOR(cr); err != nil {
271278
return err
272279
}
273280

@@ -278,7 +285,7 @@ func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler,
278285
}
279286
case "#labels":
280287
var evt comatproto.LabelSubscribeLabels_Labels
281-
if err := evt.UnmarshalCBOR(r); err != nil {
288+
if err := evt.UnmarshalCBOR(cr); err != nil {
282289
return fmt.Errorf("reading Labels event: %w", err)
283290
}
284291

@@ -297,7 +304,7 @@ func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler,
297304

298305
case EvtKindErrorFrame:
299306
var errframe ErrorFrame
300-
if err := errframe.UnmarshalCBOR(r); err != nil {
307+
if err := errframe.UnmarshalCBOR(cr); err != nil {
301308
return err
302309
}
303310

mst/mst.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -936,7 +936,20 @@ func (mst *MerkleSearchTree) findGtOrEqualLeafIndex(ctx context.Context, key str
936936
// WalkLeavesFrom walks the leaves of the tree, calling the cb callback on each
937937
// key that's greater than or equal to the provided from key.
938938
// If cb returns an error, the walk is aborted and the error is returned.
939+
// NB: this method caches the tree structure in memory to make subsequent tree
940+
// operations significantly faster
939941
func (mst *MerkleSearchTree) WalkLeavesFrom(ctx context.Context, from string, cb func(key string, val cid.Cid) error) error {
942+
return mst.walkLeavesFrom(ctx, from, false, cb)
943+
}
944+
945+
// WalkLeavesFromNocache works the same as WalkLeavesFrom but does not cache
946+
// internal tree structure, intended for "once through" passes of MSTs,
947+
// especially in streaming contexts
948+
func (mst *MerkleSearchTree) WalkLeavesFromNocache(ctx context.Context, from string, cb func(key string, val cid.Cid) error) error {
949+
return mst.walkLeavesFrom(ctx, from, true, cb)
950+
}
951+
952+
func (mst *MerkleSearchTree) walkLeavesFrom(ctx context.Context, from string, nocache bool, cb func(key string, val cid.Cid) error) error {
940953
index, err := mst.findGtOrEqualLeafIndex(ctx, from)
941954
if err != nil {
942955
return err
@@ -950,7 +963,7 @@ func (mst *MerkleSearchTree) WalkLeavesFrom(ctx context.Context, from string, cb
950963
if index > 0 {
951964
prev := entries[index-1]
952965
if !prev.isUndefined() && prev.isTree() {
953-
if err := prev.Tree.WalkLeavesFrom(ctx, from, cb); err != nil {
966+
if err := prev.Tree.walkLeavesFrom(ctx, from, nocache, cb); err != nil {
954967
return fmt.Errorf("walk leaves %d: %w", index, err)
955968
}
956969
}
@@ -962,9 +975,12 @@ func (mst *MerkleSearchTree) WalkLeavesFrom(ctx context.Context, from string, cb
962975
return err
963976
}
964977
} else {
965-
if err := e.Tree.WalkLeavesFrom(ctx, from, cb); err != nil {
978+
if err := e.Tree.walkLeavesFrom(ctx, from, nocache, cb); err != nil {
966979
return fmt.Errorf("walk leaves from (%d): %w", i, err)
967980
}
981+
if nocache {
982+
e.Tree = nil
983+
}
968984
}
969985
}
970986
return nil

0 commit comments

Comments
 (0)