Skip to content

Commit afd2906

Browse files
chainimport: refactor processBatch to contain the appendMode
1 parent 37b3d72 commit afd2906

File tree

1 file changed

+97
-57
lines changed

1 file changed

+97
-57
lines changed

chainimport/headers_import.go

Lines changed: 97 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -609,8 +609,9 @@ func (h *headersImport) processNewHeadersRegion(ctx context.Context,
609609
log.Infof("Adding %d new headers (block and filter) from heights "+
610610
"%d to %d", region.end-region.start+1, region.start, region.end)
611611

612-
err := h.appendNewHeaders(ctx, region.start, region.end)
613-
if err != nil {
612+
if err := h.appendNewHeaders(
613+
ctx, region.start, region.end, region.syncModes.append,
614+
); err != nil {
614615
return fmt.Errorf("failed to append new headers: %w", err)
615616
}
616617

@@ -622,7 +623,7 @@ func (h *headersImport) processNewHeadersRegion(ctx context.Context,
622623

623624
// appendNewHeaders adds new headers from import source.
624625
func (h *headersImport) appendNewHeaders(ctx context.Context, startHeight,
625-
endHeight uint32) error {
626+
endHeight uint32, appendMode appendMode) error {
626627

627628
metadata, err := h.blockHeadersImportSource.GetHeaderMetadata()
628629
if err != nil {
@@ -658,7 +659,7 @@ func (h *headersImport) appendNewHeaders(ctx context.Context, startHeight,
658659
}
659660

660661
batchEnd, err := h.processBatch(
661-
blockIter, filterIter, batchStart,
662+
blockIter, filterIter, batchStart, appendMode,
662663
)
663664
if err == io.EOF {
664665
break
@@ -681,72 +682,111 @@ func (h *headersImport) appendNewHeaders(ctx context.Context, startHeight,
681682
// returns the batch end height on success, or an error including io.EOF when no
682683
// more batches.
683684
func (h *headersImport) processBatch(blockIter, filterIter HeaderIterator,
684-
batchStart uint32) (uint32, error) {
685+
batchStart uint32, appendMode appendMode) (uint32, error) {
685686

686-
blockBatch, blockErr := blockIter.ReadBatch(
687-
batchStart, blockIter.GetEndIndex(), blockIter.GetBatchSize(),
687+
var (
688+
blockHeaders []headerfs.BlockHeader
689+
filterHeaders []headerfs.FilterHeader
690+
batchEnd uint32
688691
)
689-
if blockErr == io.EOF {
690-
return 0, io.EOF
691-
}
692-
if blockErr != nil {
693-
return 0, fmt.Errorf("failed to read block headers "+
694-
"batch at height %d: %w", batchStart, blockErr)
695-
}
696692

697-
// Get corresponding filter headers batch.
698-
filterBatch, filterErr := filterIter.ReadBatch(
699-
batchStart, blockIter.GetEndIndex(), blockIter.GetBatchSize(),
700-
)
701-
if filterErr == io.EOF {
702-
return 0, errors.New("filter headers ended before block " +
703-
"headers")
704-
}
705-
if filterErr != nil {
706-
return 0, fmt.Errorf("failed to read filter headers "+
707-
"batch at height %d: %w", batchStart, filterErr)
708-
}
693+
if appendMode != appendFilterOnly {
694+
blockBatch, blockErr := blockIter.ReadBatch(
695+
batchStart, blockIter.GetEndIndex(),
696+
blockIter.GetBatchSize(),
697+
)
698+
if blockErr == io.EOF {
699+
return 0, io.EOF
700+
}
701+
if blockErr != nil {
702+
return 0, fmt.Errorf("failed to read block headers "+
703+
"batch at height %d: %w", batchStart, blockErr)
704+
}
709705

710-
// Convert block header batches to target store format.
711-
blockHeaders := make([]headerfs.BlockHeader, 0, len(blockBatch))
712-
for _, header := range blockBatch {
713-
blkHeader, err := assertBlockHeader(header)
714-
if err != nil {
715-
return 0, err
706+
// Convert block header batches to target store format.
707+
blockHeaders = make([]headerfs.BlockHeader, 0, len(blockBatch))
708+
for _, header := range blockBatch {
709+
blkHeader, err := assertBlockHeader(header)
710+
if err != nil {
711+
return 0, err
712+
}
713+
blockHeaders = append(
714+
blockHeaders, blkHeader.BlockHeader,
715+
)
716716
}
717-
blockHeaders = append(
718-
blockHeaders, blkHeader.BlockHeader,
719-
)
717+
718+
batchEnd = batchStart + uint32(len(blockBatch)) - 1
720719
}
721720

722-
// Convert filter header batches to target store format.
723-
var filterHeaders []headerfs.FilterHeader
724-
filterHeaders = make(
725-
[]headerfs.FilterHeader, 0, len(filterBatch),
726-
)
727-
for _, header := range filterBatch {
728-
fHeader, err := assertFilterHeader(header)
729-
if err != nil {
730-
return 0, err
721+
if appendMode != appendBlockOnly {
722+
filterBatch, filterErr := filterIter.ReadBatch(
723+
batchStart, blockIter.GetEndIndex(),
724+
blockIter.GetBatchSize(),
725+
)
726+
if filterErr == io.EOF {
727+
return 0, io.EOF
728+
}
729+
if filterErr != nil {
730+
return 0, fmt.Errorf("failed to read filter headers "+
731+
"batch at height %d: %w", batchStart, filterErr)
731732
}
732-
filterHeaders = append(
733-
filterHeaders, fHeader.FilterHeader,
733+
734+
// Convert filter header batches to target store format.
735+
filterHeaders = make(
736+
[]headerfs.FilterHeader, 0, len(filterBatch),
734737
)
735-
}
738+
for _, header := range filterBatch {
739+
fHeader, err := assertFilterHeader(header)
740+
if err != nil {
741+
return 0, err
742+
}
743+
filterHeaders = append(
744+
filterHeaders, fHeader.FilterHeader,
745+
)
746+
}
736747

737-
// The length check conditions should never be triggered during normal
738-
// import operations as validation occurs earlier. They serve as sanity
739-
// checks to catch unexpected inconsistencies.
740-
if len(blockHeaders) != len(filterHeaders) {
741-
return 0, fmt.Errorf("mismatch between block headers "+
742-
"(%d) and filter headers (%d)", len(blockHeaders),
743-
len(filterHeaders))
748+
batchEnd = batchStart + uint32(len(filterBatch)) - 1
749+
750+
isLastBatch := batchEnd >= filterIter.GetEndIndex()
751+
if appendMode == appendFilterOnly && isLastBatch {
752+
// Get the chain tip from both target stores.
753+
tBHS := h.options.TargetBlockHeaderStore
754+
lastH, height, err := tBHS.ChainTip()
755+
if err != nil {
756+
return 0, fmt.Errorf("failed to get target "+
757+
"block header chain tip: %w", err)
758+
}
759+
760+
i := len(filterHeaders) - 1
761+
if height != filterHeaders[i].Height {
762+
return 0, fmt.Errorf("mismatch between target "+
763+
"block header chain tip height and "+
764+
"filter headers height: %d != %d",
765+
height, filterHeaders[i].Height)
766+
}
767+
768+
chainTipBlockHeader := headerfs.BlockHeader{
769+
BlockHeader: lastH,
770+
}
771+
setLastFilterHeaderHash(
772+
filterHeaders, chainTipBlockHeader,
773+
)
774+
}
744775
}
745776

746-
chainTipBlockHeader := blockHeaders[len(blockHeaders)-1]
747-
setLastFilterHeaderHash(filterHeaders, chainTipBlockHeader)
777+
if appendMode == appendBlockAndFilter {
778+
// The length check condition should never be triggered during
779+
// normal import operations as validation occurs earlier. They
780+
// serve as sanity checks to catch unexpected inconsistencies.
781+
if len(blockHeaders) != len(filterHeaders) {
782+
return 0, fmt.Errorf("mismatch between block headers "+
783+
"(%d) and filter headers (%d)",
784+
len(blockHeaders), len(filterHeaders))
785+
}
748786

749-
batchEnd := batchStart + uint32(len(blockBatch)) - 1
787+
chainTipBlockHeader := blockHeaders[len(blockHeaders)-1]
788+
setLastFilterHeaderHash(filterHeaders, chainTipBlockHeader)
789+
}
750790

751791
err := h.writeHeadersToTargetStores(
752792
blockHeaders, filterHeaders, batchStart, batchEnd,

0 commit comments

Comments
 (0)