Skip to content

Commit fc546a5

Browse files
authored
Merge pull request #7983 from onflow/leo/refactor-stored-chunk-data-pack
[Storage] Refactor stored chunk data pack
2 parents 8b0d6b2 + b651d90 commit fc546a5

30 files changed

+1527
-487
lines changed

cmd/execution_builder.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -762,8 +762,12 @@ func (exeNode *ExecutionNode) LoadExecutionState(
762762
}
763763
return nil
764764
})
765+
766+
chunkDB := pebbleimpl.ToDB(chunkDataPackDB)
767+
storedChunkDataPacks := store.NewStoredChunkDataPacks(
768+
node.Metrics.Cache, chunkDB, exeNode.exeConf.chunkDataPackCacheSize)
765769
chunkDataPacks := store.NewChunkDataPacks(node.Metrics.Cache,
766-
pebbleimpl.ToDB(chunkDataPackDB), exeNode.collections, exeNode.exeConf.chunkDataPackCacheSize)
770+
chunkDB, storedChunkDataPacks, exeNode.collections, exeNode.exeConf.chunkDataPackCacheSize)
767771

768772
getLatestFinalized := func() (uint64, error) {
769773
final, err := node.State.Final().Head()

cmd/util/cmd/read-badger/cmd/chunk_data_pack.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@ var chunkDataPackCmd = &cobra.Command{
3535

3636
metrics := metrics.NewNoopCollector()
3737
collections := store.NewCollections(db, store.NewTransactions(metrics, db))
38+
storedChunkDataPacks := store.NewStoredChunkDataPacks(metrics, db, 1)
3839
chunkDataPacks := store.NewChunkDataPacks(metrics,
39-
db, collections, 1)
40+
db, storedChunkDataPacks, collections, 1)
4041

4142
log.Info().Msgf("getting chunk data pack by chunk id: %v", chunkID)
4243
chunkDataPack, err := chunkDataPacks.ByChunkID(chunkID)

cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height.go

Lines changed: 54 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -87,16 +87,14 @@ func runE(*cobra.Command, []string) error {
8787
return fmt.Errorf("could not open chunk data pack DB at %v: %w", flagChunkDataPackDir, err)
8888
}
8989
chunkDataPacksDB := pebbleimpl.ToDB(chunkDataPacksPebbleDB)
90-
chunkDataPacks := store.NewChunkDataPacks(metrics, chunkDataPacksDB, collections, 1000)
91-
chunkBatch := chunkDataPacksDB.NewBatch()
92-
defer chunkBatch.Close()
93-
94-
writeBatch := db.NewBatch()
95-
defer writeBatch.Close()
96-
97-
err = removeExecutionResultsFromHeight(
98-
writeBatch,
99-
chunkBatch,
90+
storedChunkDataPacks := store.NewStoredChunkDataPacks(metrics, chunkDataPacksDB, 1000)
91+
chunkDataPacks := store.NewChunkDataPacks(metrics, chunkDataPacksDB, storedChunkDataPacks, collections, 1000)
92+
protocolDBBatch := db.NewBatch()
93+
defer protocolDBBatch.Close()
94+
95+
// collect chunk IDs to be removed
96+
chunkIDs, err := removeExecutionResultsFromHeight(
97+
protocolDBBatch,
10098
state,
10199
transactionResults,
102100
commits,
@@ -112,12 +110,14 @@ func runE(*cobra.Command, []string) error {
112110
}
113111

114112
// remove chunk data packs first, because otherwise the index to find chunk data pack will be removed.
115-
err = chunkBatch.Commit()
116-
if err != nil {
117-
return fmt.Errorf("could not commit chunk batch at %v: %w", flagHeight, err)
113+
if len(chunkIDs) > 0 {
114+
_, err := chunkDataPacks.BatchRemove(chunkIDs, protocolDBBatch)
115+
if err != nil {
116+
return fmt.Errorf("could not remove chunk data packs at %v: %w", flagHeight, err)
117+
}
118118
}
119119

120-
err = writeBatch.Commit()
120+
err = protocolDBBatch.Commit()
121121
if err != nil {
122122
return fmt.Errorf("could not flush write batch at %v: %w", flagHeight, err)
123123
}
@@ -138,11 +138,13 @@ func runE(*cobra.Command, []string) error {
138138
})
139139
}
140140

141-
// use badger instances directly instead of stroage interfaces so that the interface don't
142-
// need to include the Remove methods
141+
// removeExecutionResultsFromHeight removes all execution results and related data
142+
// from the specified block height onward to roll back the protocol state.
143+
// It returns the chunk IDs removed from the protocol state DB,
144+
// which can then be used to delete the corresponding chunk data packs from chunk
145+
// data pack database.
143146
func removeExecutionResultsFromHeight(
144-
writeBatch storage.Batch,
145-
chunkBatch storage.Batch,
147+
protocolDBBatch storage.Batch,
146148
protoState protocol.State,
147149
transactionResults storage.TransactionResults,
148150
commits storage.Commits,
@@ -151,40 +153,43 @@ func removeExecutionResultsFromHeight(
151153
myReceipts storage.MyExecutionReceipts,
152154
events storage.Events,
153155
serviceEvents storage.ServiceEvents,
154-
fromHeight uint64) error {
156+
fromHeight uint64,
157+
) ([]flow.Identifier, error) {
155158
log.Info().Msgf("removing results for blocks from height: %v", fromHeight)
156159

157160
root := protoState.Params().FinalizedRoot()
158161

159162
if fromHeight <= root.Height {
160-
return fmt.Errorf("can only remove results for block above root block. fromHeight: %v, rootHeight: %v", fromHeight, root.Height)
163+
return nil, fmt.Errorf("can only remove results for block above root block. fromHeight: %v, rootHeight: %v", fromHeight, root.Height)
161164
}
162165

163166
final, err := protoState.Final().Head()
164167
if err != nil {
165-
return fmt.Errorf("could get not finalized height: %w", err)
168+
return nil, fmt.Errorf("could get not finalized height: %w", err)
166169
}
167170

168171
if fromHeight > final.Height {
169-
return fmt.Errorf("could not remove results for unfinalized height: %v, finalized height: %v", fromHeight, final.Height)
172+
return nil, fmt.Errorf("could not remove results for unfinalized height: %v, finalized height: %v", fromHeight, final.Height)
170173
}
171174

172175
finalRemoved := 0
173176
total := int(final.Height-fromHeight) + 1
177+
var allChunkIDs []flow.Identifier
174178

175179
// removing for finalized blocks
176180
for height := fromHeight; height <= final.Height; height++ {
177181
head, err := protoState.AtHeight(height).Head()
178182
if err != nil {
179-
return fmt.Errorf("could not get header at height: %w", err)
183+
return nil, fmt.Errorf("could not get header at height: %w", err)
180184
}
181185

182186
blockID := head.ID()
183187

184-
err = removeForBlockID(writeBatch, chunkBatch, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, blockID)
188+
chunkIDs, err := removeForBlockID(protocolDBBatch, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, blockID)
185189
if err != nil {
186-
return fmt.Errorf("could not remove result for finalized block: %v, %w", blockID, err)
190+
return nil, fmt.Errorf("could not remove result for finalized block: %v, %w", blockID, err)
187191
}
192+
allChunkIDs = append(allChunkIDs, chunkIDs...)
188193

189194
finalRemoved++
190195
log.Info().Msgf("result at height %v has been removed. progress (%v/%v)", height, finalRemoved, total)
@@ -193,18 +198,18 @@ func removeExecutionResultsFromHeight(
193198
// removing for pending blocks
194199
pendings, err := protoState.Final().Descendants()
195200
if err != nil {
196-
return fmt.Errorf("could not get pending block: %w", err)
201+
return nil, fmt.Errorf("could not get pending block: %w", err)
197202
}
198203

199204
pendingRemoved := 0
200205
total = len(pendings)
201206

202207
for _, pending := range pendings {
203-
err = removeForBlockID(writeBatch, chunkBatch, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, pending)
204-
208+
chunkIDs, err := removeForBlockID(protocolDBBatch, commits, transactionResults, results, chunkDataPacks, myReceipts, events, serviceEvents, pending)
205209
if err != nil {
206-
return fmt.Errorf("could not remove result for pending block %v: %w", pending, err)
210+
return nil, fmt.Errorf("could not remove result for pending block %v: %w", pending, err)
207211
}
212+
allChunkIDs = append(allChunkIDs, chunkIDs...)
208213

209214
pendingRemoved++
210215
log.Info().Msgf("result for pending block %v has been removed. progress (%v/%v) ", pending, pendingRemoved, total)
@@ -213,15 +218,14 @@ func removeExecutionResultsFromHeight(
213218
log.Info().Msgf("removed height from %v. removed for %v finalized blocks, and %v pending blocks",
214219
fromHeight, finalRemoved, pendingRemoved)
215220

216-
return nil
221+
return allChunkIDs, nil
217222
}
218223

219224
// removeForBlockID remove block execution related data for a given block.
220225
// All data to be removed will be removed in a batch write.
221226
// It bubbles up any error encountered
222227
func removeForBlockID(
223-
writeBatch storage.Batch,
224-
chunkBatch storage.Batch,
228+
protocolDBBatch storage.Batch,
225229
commits storage.Commits,
226230
transactionResults storage.TransactionResults,
227231
results storage.ExecutionResults,
@@ -230,74 +234,70 @@ func removeForBlockID(
230234
events storage.Events,
231235
serviceEvents storage.ServiceEvents,
232236
blockID flow.Identifier,
233-
) error {
237+
) ([]flow.Identifier, error) {
234238
result, err := results.ByBlockID(blockID)
235239
if errors.Is(err, storage.ErrNotFound) {
236240
log.Info().Msgf("result not found for block %v", blockID)
237-
return nil
241+
return nil, nil
238242
}
239243

240244
if err != nil {
241-
return fmt.Errorf("could not find result for block %v: %w", blockID, err)
245+
return nil, fmt.Errorf("could not find result for block %v: %w", blockID, err)
242246
}
243247

248+
chunkIDs := make([]flow.Identifier, 0, len(result.Chunks))
244249
for _, chunk := range result.Chunks {
245250
chunkID := chunk.ID()
246-
// remove chunk data pack
247-
err := chunks.BatchRemove(chunkID, chunkBatch)
248-
if err != nil {
249-
return fmt.Errorf("could not remove chunk id %v for block id %v: %w", chunkID, blockID, err)
250-
}
251-
251+
chunkIDs = append(chunkIDs, chunkID)
252252
}
253253

254254
// remove commits
255-
err = commits.BatchRemoveByBlockID(blockID, writeBatch)
255+
err = commits.BatchRemoveByBlockID(blockID, protocolDBBatch)
256256
if err != nil {
257257
if errors.Is(err, storage.ErrNotFound) {
258-
return fmt.Errorf("could not remove by block ID %v: %w", blockID, err)
258+
return nil, fmt.Errorf("could not remove by block ID %v: %w", blockID, err)
259259
}
260260

261261
log.Warn().Msgf("statecommitment not found for block %v", blockID)
262262
}
263263

264264
// remove transaction results
265-
err = transactionResults.BatchRemoveByBlockID(blockID, writeBatch)
265+
err = transactionResults.BatchRemoveByBlockID(blockID, protocolDBBatch)
266266
if err != nil {
267-
return fmt.Errorf("could not remove transaction results by BlockID %v: %w", blockID, err)
267+
return nil, fmt.Errorf("could not remove transaction results by BlockID %v: %w", blockID, err)
268268
}
269269

270270
// remove own execution results index
271-
err = myReceipts.BatchRemoveIndexByBlockID(blockID, writeBatch)
271+
err = myReceipts.BatchRemoveIndexByBlockID(blockID, protocolDBBatch)
272272
if err != nil {
273273
if !errors.Is(err, storage.ErrNotFound) {
274-
return fmt.Errorf("could not remove own receipt by BlockID %v: %w", blockID, err)
274+
return nil, fmt.Errorf("could not remove own receipt by BlockID %v: %w", blockID, err)
275275
}
276276

277277
log.Warn().Msgf("own receipt not found for block %v", blockID)
278278
}
279279

280280
// remove events
281-
err = events.BatchRemoveByBlockID(blockID, writeBatch)
281+
err = events.BatchRemoveByBlockID(blockID, protocolDBBatch)
282282
if err != nil {
283-
return fmt.Errorf("could not remove events by BlockID %v: %w", blockID, err)
283+
return nil, fmt.Errorf("could not remove events by BlockID %v: %w", blockID, err)
284284
}
285285

286286
// remove service events
287-
err = serviceEvents.BatchRemoveByBlockID(blockID, writeBatch)
287+
err = serviceEvents.BatchRemoveByBlockID(blockID, protocolDBBatch)
288288
if err != nil {
289-
return fmt.Errorf("could not remove service events by blockID %v: %w", blockID, err)
289+
return nil, fmt.Errorf("could not remove service events by blockID %v: %w", blockID, err)
290290
}
291291

292292
// remove execution result index
293-
err = results.BatchRemoveIndexByBlockID(blockID, writeBatch)
293+
err = results.BatchRemoveIndexByBlockID(blockID, protocolDBBatch)
294294
if err != nil {
295295
if !errors.Is(err, storage.ErrNotFound) {
296-
return fmt.Errorf("could not remove result by BlockID %v: %w", blockID, err)
296+
return nil, fmt.Errorf("could not remove result by BlockID %v: %w", blockID, err)
297297
}
298298

299299
log.Warn().Msgf("result not found for block %v", blockID)
300300
}
301301

302-
return nil
302+
return chunkIDs, nil
303303
}

0 commit comments

Comments
 (0)