Skip to content

Commit b1bcb03

Browse files
authored
Merge pull request #1030 from ethereumjs/more-performant-trie-checkpointing-mechanism
Trie: Rework Checkpointing Mechanism
2 parents ec0f059 + fa36c17 commit b1bcb03

File tree

13 files changed

+303
-314
lines changed

13 files changed

+303
-314
lines changed

packages/client/lib/sync/fullsync.ts

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@ export class FullSynchronizer extends Synchronizer {
2121
private stopSyncing: boolean
2222
private vmPromise?: Promise<void>
2323

24-
// Tracking vars for log msg condensation on zero tx blocks
25-
private NUM_ZERO_TXS_PER_LOG_MSG = 50
26-
public zeroTxsBlockLogMsgCounter: number = 0
27-
2824
constructor(options: SynchronizerOptions) {
2925
super(options)
3026
this.blockFetcher = null
@@ -68,9 +64,15 @@ export class FullSynchronizer extends Synchronizer {
6864
return
6965
}
7066
this.runningBlocks = true
67+
let blockCounter = 0
68+
let txCounter = 0
69+
const NUM_BLOCKS_PER_LOG_MSG = 50
7170
try {
7271
let oldHead = Buffer.alloc(0)
73-
let newHead = (await this.vm.blockchain.getHead()).hash()
72+
const newHeadBlock = await this.vm.blockchain.getHead()
73+
let newHead = newHeadBlock.hash()
74+
const firstHeadBlock = newHeadBlock
75+
let lastHeadBlock = newHeadBlock
7476
while (!newHead.equals(oldHead) && !this.stopSyncing) {
7577
oldHead = newHead
7678
this.vmPromise = this.vm.runBlockchain(this.vm.blockchain, 1)
@@ -79,27 +81,29 @@ export class FullSynchronizer extends Synchronizer {
7981
newHead = headBlock.hash()
8082
// check if we did run a new block:
8183
if (!newHead.equals(oldHead)) {
82-
const number = headBlock.header.number.toNumber()
83-
const hash = short(newHead)
84-
const numTxs = headBlock.transactions.length
85-
if (numTxs === 0) {
86-
this.zeroTxsBlockLogMsgCounter += 1
87-
}
88-
if (
89-
(numTxs > 0 && this.zeroTxsBlockLogMsgCounter > 0) ||
90-
(numTxs === 0 && this.zeroTxsBlockLogMsgCounter >= this.NUM_ZERO_TXS_PER_LOG_MSG)
91-
) {
92-
this.config.logger.info(`Processed ${this.zeroTxsBlockLogMsgCounter} blocks with 0 txs`)
93-
this.zeroTxsBlockLogMsgCounter = 0
94-
}
95-
if (numTxs > 0) {
96-
this.config.logger.info(`Executed block number=${number} hash=${hash} txs=${numTxs}`)
84+
blockCounter += 1
85+
txCounter += headBlock.transactions.length
86+
lastHeadBlock = headBlock
87+
88+
if (blockCounter >= NUM_BLOCKS_PER_LOG_MSG) {
89+
const firstNumber = firstHeadBlock.header.number.toNumber()
90+
const firstHash = short(firstHeadBlock.hash())
91+
const lastNumber = lastHeadBlock.header.number.toNumber()
92+
const lastHash = short(lastHeadBlock.hash())
93+
this.config.logger.info(
94+
`Executed blocks count=${blockCounter} first=${firstNumber} hash=${firstHash} last=${lastNumber} hash=${lastHash} with txs=${txCounter}`
95+
)
96+
blockCounter = 0
97+
txCounter = 0
9798
}
9899
}
99100
}
101+
} catch (error) {
102+
this.emit('error', error)
100103
} finally {
101104
this.runningBlocks = false
102105
}
106+
return blockCounter
103107
}
104108

105109
/**

packages/client/test/sync/fullsync.spec.ts

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -174,51 +174,28 @@ tape('[FullSynchronizer]', async (t) => {
174174
t.deepEqual(sync.vm.blockchain.getHead(), oldHead, 'should not modify blockchain on emtpy run')
175175

176176
blockchain.getHead = td.func<any>()
177-
td.when(blockchain.getHead()).thenResolve(
178-
{
177+
const getHeadResponse: any = []
178+
for (let i = 2; i <= 100; i++) {
179+
getHeadResponse.push({
179180
hash: () => {
180-
return Buffer.from('hash1')
181+
return Buffer.from(`hash${i}`)
181182
},
182-
header: { number: new BN(1) },
183-
transactions: [],
184-
},
185-
{
186-
hash: () => {
187-
return Buffer.from('hash2')
188-
},
189-
header: { number: new BN(2) },
190-
transactions: [],
191-
}
192-
)
193-
await sync.runBlocks()
194-
t.equal(
195-
sync.zeroTxsBlockLogMsgCounter,
196-
1,
197-
'should increase zero blocks counter on zero tx blocks'
198-
)
183+
header: { number: new BN(i) },
184+
transactions: [i],
185+
})
186+
}
199187

200188
td.when(blockchain.getHead()).thenResolve(
201189
{
202190
hash: () => {
203-
return Buffer.from('hash1')
191+
return Buffer.from('hash0')
204192
},
205193
header: { number: new BN(1) },
206194
transactions: [],
207195
},
208-
{
209-
hash: () => {
210-
return Buffer.from('hash2')
211-
},
212-
header: { number: new BN(2) },
213-
transactions: [1, 2, 3],
214-
}
215-
)
216-
await sync.runBlocks()
217-
t.equal(
218-
sync.zeroTxsBlockLogMsgCounter,
219-
0,
220-
'should reset zero blocks counter on non-zero tx blocks'
196+
...getHeadResponse
221197
)
198+
t.equal(await sync.runBlocks(), 49)
222199

223200
t.end()
224201
})

packages/trie/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
"tsc": "ethereumjs-config-tsc",
2222
"test": "npm run test:node && npm run test:browser",
2323
"test:browser": "npm run build && karma start karma.conf.js",
24-
"test:node": "npm run build && tape -r ts-node/register test/*.ts"
24+
"test:node": "tape -r ts-node/register test/*.ts"
2525
},
2626
"author": {
2727
"name": "mjbecze",
Lines changed: 11 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,26 @@
11
import { Trie as BaseTrie } from './baseTrie'
2-
import { ScratchReadStream } from './scratchReadStream'
3-
import { ScratchDB } from './scratch'
4-
import { DB } from './db'
5-
const WriteStream = require('level-ws')
62

73
/**
84
* Adds checkpointing to the {@link BaseTrie}
95
*/
106
export class CheckpointTrie extends BaseTrie {
11-
_mainDB: DB
12-
_scratch: ScratchDB | null
13-
_checkpoints: Buffer[]
14-
157
constructor(...args: any) {
168
super(...args)
17-
// Reference to main DB instance
18-
this._mainDB = this.db
19-
// DB instance used for checkpoints
20-
this._scratch = null
21-
// Roots of trie at the moment of checkpoint
22-
this._checkpoints = []
239
}
2410

2511
/**
2612
* Is the trie during a checkpoint phase?
2713
*/
2814
get isCheckpoint() {
29-
return this._checkpoints.length > 0
15+
return this.db.checkpoints.length > 0
3016
}
3117

3218
/**
3319
* Creates a checkpoint that can later be reverted to or committed.
34-
* After this is called, no changes to the trie will be permanently saved until `commit` is called.
35-
* To override the checkpointing mechanism use `_maindb.put` to write directly write to db.
20+
* After this is called, all changes can be reverted until `commit` is called.
3621
*/
3722
checkpoint() {
38-
const wasCheckpoint = this.isCheckpoint
39-
this._checkpoints.push(this.root)
40-
41-
// Entering checkpoint mode is not necessary for nested checkpoints
42-
if (!wasCheckpoint && this.isCheckpoint) {
43-
this._enterCpMode()
44-
}
23+
this.db.checkpoint(this.root)
4524
}
4625

4726
/**
@@ -55,13 +34,7 @@ export class CheckpointTrie extends BaseTrie {
5534
}
5635

5736
await this.lock.wait()
58-
59-
this._checkpoints.pop()
60-
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
61-
if (!this.isCheckpoint) {
62-
await this._exitCpMode(true)
63-
}
64-
37+
this.db.commit()
6538
this.lock.signal()
6639
}
6740

@@ -71,15 +44,12 @@ export class CheckpointTrie extends BaseTrie {
7144
* parent checkpoint as current.
7245
*/
7346
async revert(): Promise<void> {
74-
await this.lock.wait()
75-
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
76-
if (this.isCheckpoint) {
77-
this.root = this._checkpoints.pop()!
78-
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
79-
if (!this.isCheckpoint) {
80-
await this._exitCpMode(false)
81-
}
47+
if (!this.isCheckpoint) {
48+
throw new Error('trying to revert when not checkpointed')
8249
}
50+
51+
await this.lock.wait()
52+
this.root = await this.db.revert()
8353
this.lock.signal()
8454
}
8555

@@ -88,57 +58,11 @@ export class CheckpointTrie extends BaseTrie {
8858
* @param includeCheckpoints - If true and during a checkpoint, the copy will contain the checkpointing metadata and will use the same scratch as underlying db.
8959
*/
9060
copy(includeCheckpoints = true): CheckpointTrie {
91-
const db = this._mainDB.copy()
61+
const db = this.db.copy()
9262
const trie = new CheckpointTrie(db._leveldb, this.root)
9363
if (includeCheckpoints && this.isCheckpoint) {
94-
trie._checkpoints = this._checkpoints.slice()
95-
trie._scratch = this._scratch!.copy()
96-
trie.db = trie._scratch
64+
trie.db.checkpoints = this.db.checkpoints.slice()
9765
}
9866
return trie
9967
}
100-
101-
/**
102-
* Enter into checkpoint mode.
103-
* @private
104-
*/
105-
_enterCpMode() {
106-
this._scratch = new ScratchDB(this._mainDB)
107-
this.db = this._scratch
108-
}
109-
110-
/**
111-
* Exit from checkpoint mode.
112-
* @private
113-
*/
114-
async _exitCpMode(commitState: boolean): Promise<void> {
115-
return new Promise((resolve) => {
116-
const scratch = this._scratch as ScratchDB
117-
this._scratch = null
118-
this.db = this._mainDB
119-
120-
if (commitState) {
121-
this._createScratchReadStream(scratch)
122-
.pipe(WriteStream(this.db._leveldb))
123-
.on('close', resolve)
124-
} else {
125-
process.nextTick(resolve)
126-
}
127-
})
128-
}
129-
130-
/**
131-
* Returns a `ScratchReadStream` based on the state updates
132-
* since checkpoint.
133-
* @private
134-
*/
135-
_createScratchReadStream(scratchDb?: ScratchDB) {
136-
const scratch = scratchDb || this._scratch
137-
if (!scratch) {
138-
throw new Error('No scratch found to use')
139-
}
140-
const trie = new BaseTrie(scratch._leveldb, this.root)
141-
trie.db = scratch
142-
return new ScratchReadStream(trie)
143-
}
14468
}

0 commit comments

Comments
 (0)