Skip to content

Commit df214e0

Browse files
committed
universe: insert group anchors immediately
This commit fixes an issue with large sets of asset issuances: When there are more assets in a group than the current batch sync limit (200), then there's a chance that the group anchor (the one asset containing the group key reveal record) isn't in the first batch. So we wouldn't be able to insert the first batch at all, since those assets would all reference a group key that isn't yet known. To avoid this problem, we insert any proofs with group key reveal records immediately and only start batching the others after going through all of them. This means we'll keep more assets in memory and can start to process them later. But at least the process won't fail anymore.
1 parent 95356ec commit df214e0

File tree

1 file changed

+66
-12
lines changed

1 file changed

+66
-12
lines changed

universe/syncer.go

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -272,21 +272,10 @@ func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot Root,
272272
// local registrar as they're fetched.
273273
var (
274274
fetchedLeaves = make(chan *Item, len(keysToFetch))
275-
newLeafProofs []*Leaf
275+
newLeafProofs = make([]*Leaf, 0, len(keysToFetch))
276276
batchSyncEG errgroup.Group
277277
)
278278

279-
// We use an error group to simply the error handling of a goroutine.
280-
// This goroutine will handle reading in batches of new leaves to
281-
// insert into the DB. We'll fee the output of the goroutines below
282-
// into the input fetchedLeaves channel.
283-
batchSyncEG.Go(func() error {
284-
newLeafProofs, err = s.batchStreamNewItems(
285-
ctx, uniID, fetchedLeaves, len(keysToFetch),
286-
)
287-
return err
288-
})
289-
290279
// If this is a transfer tree, then we'll use these channels to sort
291280
// the contents before sending to the batch writer.
292281
isIssuanceTree := remoteRoot.ID.ProofType == ProofTypeIssuance
@@ -319,6 +308,37 @@ func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot Root,
319308
// Otherwise, we'll another step to the pipeline below
320309
// for sorting.
321310
if isIssuanceTree {
311+
// If this is an issuance proof _AND_ it has a
312+
// group key reveal, then we'll need to import
313+
// it right away. Otherwise, all other issuance
314+
// proofs in the batch might fail, as they might
315+
// reference the group key in this proof's
316+
// group key reveal.
317+
reg := s.cfg.LocalRegistrar
318+
if hasGroupKeyReveal(leafProof.Leaf.RawProof) {
319+
log.Debugf("UniverseRoot(%v): "+
320+
"Inserting new group key "+
321+
"reveal leaf", uniID.String())
322+
_, err = reg.UpsertProofLeaf(
323+
ctx, uniID, key, leafProof.Leaf,
324+
)
325+
if err != nil {
326+
return fmt.Errorf("unable to "+
327+
"register group "+
328+
"anchor proof: %w", err)
329+
}
330+
331+
// Track this manually inserted proof in
332+
// the result.
333+
newLeafProofs = append(
334+
newLeafProofs, leafProof.Leaf,
335+
)
336+
337+
// No need to batch this further, we've
338+
// already inserted it.
339+
return nil
340+
}
341+
322342
fetchedLeaves <- &Item{
323343
ID: uniID,
324344
Key: key,
@@ -338,6 +358,23 @@ func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot Root,
338358
return err
339359
}
340360

361+
// We use an error group to simply the error handling of a goroutine.
362+
// This goroutine will handle reading in batches of new leaves to
363+
// insert into the DB. We'll fee the output of the goroutines below
364+
// into the input fetchedLeaves channel.
365+
batchSyncEG.Go(func() error {
366+
insertedProofs, err := s.batchStreamNewItems(
367+
ctx, uniID, fetchedLeaves, len(keysToFetch),
368+
)
369+
if err != nil {
370+
return err
371+
}
372+
373+
newLeafProofs = append(newLeafProofs, insertedProofs...)
374+
375+
return nil
376+
})
377+
341378
// If this is a transfer tree, then we'll collect all the items as we
342379
// need to sort them to ensure we can validate them in dep order.
343380
if !isIssuanceTree {
@@ -395,6 +432,23 @@ func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot Root,
395432
return nil
396433
}
397434

435+
// hasGroupKeyReveal determines whether a proof has a group key reveal. This is
436+
// used to determine whether we should insert the proof right away, or batch it
437+
// with other proofs.
438+
func hasGroupKeyReveal(rawProof []byte) bool {
439+
// We'll decode the proof to determine if it's a group key reveal.
440+
var dummyProof proof.Proof
441+
record := proof.GroupKeyRevealRecord(&dummyProof.GroupKeyReveal)
442+
443+
proofReader := bytes.NewReader(rawProof)
444+
err := proof.SparseDecode(proofReader, record)
445+
if err != nil {
446+
return false
447+
}
448+
449+
return dummyProof.GroupKeyReveal != nil
450+
}
451+
398452
// batchStreamNewItems streams the set of new items to the local registrar in
399453
// batches and returns the new leaf proofs.
400454
func (s *SimpleSyncer) batchStreamNewItems(ctx context.Context,

0 commit comments

Comments
 (0)