Skip to content

Commit d7c6562

Browse files
authored
Merge pull request #320 from mohamedawnallah/failGracefullyOnDBHeaderWrites
chainimport[1/3]: import block and filter headers on startup before falling back to P2P synchronization
2 parents 6355419 + da926b2 commit d7c6562

File tree

14 files changed

+1422
-460
lines changed

14 files changed

+1422
-460
lines changed

blockmanager.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ type blockManagerCfg struct {
8484

8585
// RegFilterHeaders is the store where filter headers for the regular
8686
// compact filters are persistently stored.
87-
RegFilterHeaders *headerfs.FilterHeaderStore
87+
RegFilterHeaders headerfs.FilterHeaderStore
8888

8989
// TimeSource is used to access a time estimate based on the clocks of
9090
// the connected peers.
@@ -702,7 +702,7 @@ waitForHeaders:
702702
// network, if it can, and resolves any conflicts between them. It then writes
703703
// any verified headers to the store.
704704
func (b *blockManager) getUncheckpointedCFHeaders(
705-
store *headerfs.FilterHeaderStore, fType wire.FilterType) error {
705+
store headerfs.FilterHeaderStore, fType wire.FilterType) error {
706706

707707
// Get the filter header store's chain tip.
708708
filterTip, filtHeight, err := store.ChainTip()
@@ -928,7 +928,7 @@ func (c *checkpointedCFHeadersQuery) handleResponse(req, resp wire.Message,
928928
// checkpoints we got from the network. It assumes that the filter header store
929929
// matches the checkpoints up to the tip of the store.
930930
func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash,
931-
store *headerfs.FilterHeaderStore, fType wire.FilterType) {
931+
store headerfs.FilterHeaderStore, fType wire.FilterType) {
932932

933933
// We keep going until we've caught up the filter header store with the
934934
// latest known checkpoint.
@@ -1174,7 +1174,7 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash,
11741174
// filter header field in the next message range before writing to disk, and
11751175
// the current height after writing the headers.
11761176
func (b *blockManager) writeCFHeadersMsg(msg *wire.MsgCFHeaders,
1177-
store *headerfs.FilterHeaderStore) (*chainhash.Hash, uint32, error) {
1177+
store headerfs.FilterHeaderStore) (*chainhash.Hash, uint32, error) {
11781178

11791179
// Check that the PrevFilterHeader is the same as the last stored so we
11801180
// can prevent misalignment.
@@ -1364,7 +1364,7 @@ func verifyCheckpoint(prevCheckpoint, nextCheckpoint *chainhash.Hash,
13641364
// information.
13651365
func (b *blockManager) resolveConflict(
13661366
checkpoints map[string][]*chainhash.Hash,
1367-
store *headerfs.FilterHeaderStore, fType wire.FilterType) (
1367+
store headerfs.FilterHeaderStore, fType wire.FilterType) (
13681368
[]*chainhash.Hash, error) {
13691369

13701370
// First check the served checkpoints against the hardcoded ones.
@@ -1913,7 +1913,7 @@ func (b *blockManager) getCheckpts(lastHash *chainhash.Hash,
19131913
// existing store up to the tip of the store. If all of the peers match but
19141914
// the store doesn't, the height at which the mismatch occurs is returned.
19151915
func checkCFCheckptSanity(cp map[string][]*chainhash.Hash,
1916-
headerStore *headerfs.FilterHeaderStore) (int, error) {
1916+
headerStore headerfs.FilterHeaderStore) (int, error) {
19171917

19181918
// Get the known best header to compare against checkpoints.
19191919
_, storeTip, err := headerStore.ChainTip()

blockmanager_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (m *mockDispatcher) Query(requests []*query.Request,
5555

5656
// setupBlockManager initialises a blockManager to be used in tests.
5757
func setupBlockManager(t *testing.T) (*blockManager, headerfs.BlockHeaderStore,
58-
*headerfs.FilterHeaderStore, error) {
58+
headerfs.FilterHeaderStore, error) {
5959

6060
// Set up the block and filter header stores.
6161
tempDir := t.TempDir()
@@ -798,8 +798,11 @@ func TestBlockManagerDetectBadPeers(t *testing.T) {
798798
for _, test := range testCases {
799799
// Create a mock block header store. We only need to be able to
800800
// serve a header for the target index.
801-
blockHeaders := newMockBlockHeaderStore()
802-
blockHeaders.heights[targetIndex] = blockHeader
801+
mBlockHeaderStore := &headerfs.MockBlockHeaderStore{}
802+
803+
mBlockHeaderStore.On("FetchHeaderByHeight", targetIndex).Return(
804+
&blockHeader, nil,
805+
)
803806

804807
// We set up the mock queryAllPeers to only respond according to
805808
// the active testcase.
@@ -851,7 +854,7 @@ func TestBlockManagerDetectBadPeers(t *testing.T) {
851854

852855
bm := &blockManager{
853856
cfg: &blockManagerCfg{
854-
BlockHeaders: blockHeaders,
857+
BlockHeaders: mBlockHeaderStore,
855858
queryAllPeers: queryAllPeers,
856859
},
857860
}

headerfs/file.go

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,19 +50,11 @@ func (h *headerStore) appendRaw(header []byte) error {
5050
// amount of bytes read past the seek distance is determined by the specified
5151
// header type.
5252
func (h *headerStore) readRaw(seekDist uint64) ([]byte, error) {
53-
var headerSize uint32
54-
55-
// Based on the defined header type, we'll determine the number of
56-
// bytes that we need to read past the sync point.
57-
switch h.indexType {
58-
case Block:
59-
headerSize = 80
60-
61-
case RegularFilter:
62-
headerSize = 32
63-
64-
default:
65-
return nil, fmt.Errorf("unknown index type: %v", h.indexType)
53+
// Based on the defined header type, we'll determine the number of bytes
54+
// that we need to read past the sync point.
55+
headerSize, err := h.indexType.Size()
56+
if err != nil {
57+
return nil, err
6658
}
6759

6860
// TODO(roasbeef): add buffer pool
@@ -140,7 +132,7 @@ func (h *blockHeaderStore) readHeader(height uint32) (wire.BlockHeader, error) {
140132

141133
// readHeader reads a single filter header at the specified height from the
142134
// flat files on disk.
143-
func (f *FilterHeaderStore) readHeader(height uint32) (*chainhash.Hash, error) {
135+
func (f *filterHeaderStore) readHeader(height uint32) (*chainhash.Hash, error) {
144136
seekDistance := uint64(height) * 32
145137

146138
rawHeader, err := f.readRaw(seekDistance)
@@ -158,7 +150,7 @@ func (f *FilterHeaderStore) readHeader(height uint32) (*chainhash.Hash, error) {
158150
//
159151
// NOTE: The end height is _inclusive_ so we'll fetch all headers from the
160152
// startHeight up to the end height, including the final header.
161-
func (f *FilterHeaderStore) readHeaderRange(startHeight uint32,
153+
func (f *filterHeaderStore) readHeaderRange(startHeight uint32,
162154
endHeight uint32) ([]chainhash.Hash, error) {
163155

164156
// Based on the defined header type, we'll determine the number of

headerfs/file_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,11 @@ func TestAppendRow(t *testing.T) {
158158
truncFn: test.truncFn,
159159
}
160160

161+
hdrFile := &headerFile{file: mockFile}
162+
161163
// Create a header store with our mock file.
162164
h := &headerStore{
163-
file: mockFile,
165+
headerFile: hdrFile,
164166
headerIndex: &headerIndex{indexType: Block},
165167
}
166168

@@ -207,8 +209,10 @@ func BenchmarkHeaderStoreAppendRaw(b *testing.B) {
207209
tmpFile, cleanup := createFile(b, "header_benchmark")
208210
defer cleanup()
209211

212+
hdrFile := &headerFile{file: tmpFile}
213+
210214
store := &headerStore{
211-
file: tmpFile,
215+
headerFile: hdrFile,
212216
headerIndex: &headerIndex{indexType: Block},
213217
}
214218

0 commit comments

Comments
 (0)