Skip to content

Commit ed394a6

Browse files
committed
make sure that all batches committed in the same tx are part of the same bundle
1 parent 121ce09 commit ed394a6

File tree

4 files changed

+179
-52
lines changed

4 files changed

+179
-52
lines changed

rollup/internal/controller/watcher/bundle_proposer.go

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func (p *BundleProposer) proposeBundle() error {
133133

134134
// select at most maxBlocksThisChunk blocks
135135
maxBatchesThisBundle := p.maxBatchNumPerBundle
136-
batches, err := p.batchOrm.GetBatchesGEIndexGECodecVersion(p.ctx, firstUnbundledBatchIndex, p.minCodecVersion, int(maxBatchesThisBundle))
136+
batches, err := p.batchOrm.GetCommittedBatchesGEIndexGECodecVersion(p.ctx, firstUnbundledBatchIndex, p.minCodecVersion, int(maxBatchesThisBundle))
137137
if err != nil {
138138
return err
139139
}
@@ -167,6 +167,11 @@ func (p *BundleProposer) proposeBundle() error {
167167
}
168168

169169
for i := 1; i < len(batches); i++ {
170+
// Make sure that all batches have been committed.
171+
if len(batches[i].CommitTxHash) == 0 {
172+
return fmt.Errorf("commit tx hash is empty for batch %v %s", batches[0].Index, batches[0].Hash)
173+
}
174+
170175
chunk, err := p.chunkOrm.GetChunkByIndex(p.ctx, batches[i].StartChunkIndex)
171176
if err != nil {
172177
return err
@@ -181,6 +186,12 @@ func (p *BundleProposer) proposeBundle() error {
181186

182187
if uint64(len(batches)) == maxBatchesThisBundle {
183188
log.Info("reached maximum number of batches per bundle", "batch count", len(batches), "start batch index", batches[0].Index, "end batch index", batches[len(batches)-1].Index)
189+
190+
batches, err = p.allBatchesCommittedInSameTXIncluded(batches)
191+
if err != nil {
192+
return fmt.Errorf("failed to include all batches committed in the same tx: %w", err)
193+
}
194+
184195
p.bundleFirstBlockTimeoutReached.Inc()
185196
p.bundleBatchesNum.Set(float64(len(batches)))
186197
return p.updateDBBundleInfo(batches, codecVersion)
@@ -189,6 +200,12 @@ func (p *BundleProposer) proposeBundle() error {
189200
currentTimeSec := uint64(time.Now().Unix())
190201
if firstChunk.StartBlockTime+p.bundleTimeoutSec < currentTimeSec {
191202
log.Info("first block timeout", "batch count", len(batches), "start block number", firstChunk.StartBlockNumber, "start block timestamp", firstChunk.StartBlockTime, "current time", currentTimeSec)
203+
204+
batches, err = p.allBatchesCommittedInSameTXIncluded(batches)
205+
if err != nil {
206+
return fmt.Errorf("failed to include all batches committed in the same tx: %w", err)
207+
}
208+
192209
p.bundleFirstBlockTimeoutReached.Inc()
193210
p.bundleBatchesNum.Set(float64(len(batches)))
194211
return p.updateDBBundleInfo(batches, codecVersion)
@@ -198,3 +215,43 @@ func (p *BundleProposer) proposeBundle() error {
198215
p.bundleBatchesProposeNotEnoughTotal.Inc()
199216
return nil
200217
}
218+
219+
// allBatchesCommittedInSameTXIncluded makes sure that all batches that were committed in the same tx are included in the bundle.
220+
// If the last batch of the input batches was committed in the same tx as other batches but has not the highest index amongst those,
221+
// we need to remove all batches with the same commit tx hash.
222+
// As a result, all batches with the same commit tx hash will always be included in a single bundle.
223+
func (p *BundleProposer) allBatchesCommittedInSameTXIncluded(batches []*orm.Batch) ([]*orm.Batch, error) {
224+
lastBatch := batches[len(batches)-1]
225+
fields := map[string]interface{}{
226+
"commit_tx_hash = ?": lastBatch.CommitTxHash,
227+
}
228+
229+
// get all batches with the same commit tx hash as lastBatch
230+
batchesWithSameCommitTX, err := p.batchOrm.GetBatches(p.ctx, fields, nil, 0)
231+
if err != nil {
232+
return nil, fmt.Errorf("failed to get batches with the same commit tx hash: %w", err)
233+
}
234+
235+
// get the batch with the highest index amongst the batches with the same commit tx hash as lastBatch
236+
lastBatchWithSameCommitTX := batchesWithSameCommitTX[len(batchesWithSameCommitTX)-1]
237+
238+
// check if lastBatchWithSameCommitTX is included in the input batches -> if not, we need to remove all batches with the same commit tx hash
239+
batchIncluded := lastBatch.Index == lastBatchWithSameCommitTX.Index
240+
if !batchIncluded {
241+
// we need to remove all batches with the same commit tx hash
242+
for i := 0; i < len(batches); i++ {
243+
if batches[i].CommitTxHash != lastBatchWithSameCommitTX.CommitTxHash {
244+
continue
245+
}
246+
247+
batches = batches[:i]
248+
break
249+
}
250+
}
251+
252+
if len(batches) == 0 {
253+
return nil, fmt.Errorf("no batches anymore after cleaning up batches with the same commit tx hash %s", lastBatch.CommitTxHash)
254+
}
255+
256+
return batches, nil
257+
}

rollup/internal/orm/batch.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,13 +165,14 @@ func (o *Batch) GetFirstUnbatchedChunkIndex(ctx context.Context) (uint64, error)
165165
return latestBatch.EndChunkIndex + 1, nil
166166
}
167167

168-
// GetBatchesGEIndexGECodecVersion retrieves batches that have a batch index greater than or equal to the given index and codec version.
168+
// GetCommittedBatchesGEIndexGECodecVersion retrieves batches that have been committed (commit_tx_hash is set) and have a batch index greater than or equal to the given index and codec version.
169169
// The returned batches are sorted in ascending order by their index.
170-
func (o *Batch) GetBatchesGEIndexGECodecVersion(ctx context.Context, index uint64, codecv encoding.CodecVersion, limit int) ([]*Batch, error) {
170+
func (o *Batch) GetCommittedBatchesGEIndexGECodecVersion(ctx context.Context, index uint64, codecv encoding.CodecVersion, limit int) ([]*Batch, error) {
171171
db := o.db.WithContext(ctx)
172172
db = db.Model(&Batch{})
173173
db = db.Where("index >= ?", index)
174174
db = db.Where("codec_version >= ?", codecv)
175+
db = db.Where("commit_tx_hash IS NOT NULL") // only include committed batches
175176
db = db.Order("index ASC")
176177

177178
if limit > 0 {
@@ -180,7 +181,7 @@ func (o *Batch) GetBatchesGEIndexGECodecVersion(ctx context.Context, index uint6
180181

181182
var batches []*Batch
182183
if err := db.Find(&batches).Error; err != nil {
183-
return nil, fmt.Errorf("Batch.GetBatchesGEIndexGECodecVersion error: %w", err)
184+
return nil, fmt.Errorf("Batch.GetCommittedBatchesGEIndexGECodecVersion error: %w", err)
184185
}
185186
return batches, nil
186187
}

rollup/internal/orm/orm_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -327,23 +327,23 @@ func TestBatchOrm(t *testing.T) {
327327
assert.Equal(t, "finalizeTxHash", updatedBatch.FinalizeTxHash)
328328
assert.Equal(t, types.RollupFinalizeFailed, types.RollupStatus(updatedBatch.RollupStatus))
329329

330-
batches, err := batchOrm.GetBatchesGEIndexGECodecVersion(context.Background(), 0, codecVersion, 0)
330+
batches, err := batchOrm.GetCommittedBatchesGEIndexGECodecVersion(context.Background(), 0, codecVersion, 0)
331331
assert.NoError(t, err)
332332
assert.Equal(t, 2, len(batches))
333333
assert.Equal(t, batchHash1, batches[0].Hash)
334334
assert.Equal(t, batchHash2, batches[1].Hash)
335335

336-
batches, err = batchOrm.GetBatchesGEIndexGECodecVersion(context.Background(), 0, codecVersion, 1)
336+
batches, err = batchOrm.GetCommittedBatchesGEIndexGECodecVersion(context.Background(), 0, codecVersion, 1)
337337
assert.NoError(t, err)
338338
assert.Equal(t, 1, len(batches))
339339
assert.Equal(t, batchHash1, batches[0].Hash)
340340

341-
batches, err = batchOrm.GetBatchesGEIndexGECodecVersion(context.Background(), 1, codecVersion, 0)
341+
batches, err = batchOrm.GetCommittedBatchesGEIndexGECodecVersion(context.Background(), 1, codecVersion, 0)
342342
assert.NoError(t, err)
343343
assert.Equal(t, 1, len(batches))
344344
assert.Equal(t, batchHash2, batches[0].Hash)
345345

346-
batches, err = batchOrm.GetBatchesGEIndexGECodecVersion(context.Background(), 0, codecVersion+1, 0)
346+
batches, err = batchOrm.GetCommittedBatchesGEIndexGECodecVersion(context.Background(), 0, codecVersion+1, 0)
347347
assert.NoError(t, err)
348348
assert.Equal(t, 0, len(batches))
349349

@@ -356,7 +356,7 @@ func TestBatchOrm(t *testing.T) {
356356
err = batchOrm.UpdateFinalizeTxHashAndRollupStatusByBundleHash(context.Background(), "test hash", "tx hash", types.RollupCommitFailed)
357357
assert.NoError(t, err)
358358

359-
batches, err = batchOrm.GetBatchesGEIndexGECodecVersion(context.Background(), 0, codecVersion, 0)
359+
batches, err = batchOrm.GetCommittedBatchesGEIndexGECodecVersion(context.Background(), 0, codecVersion, 0)
360360
assert.NoError(t, err)
361361
assert.Equal(t, 2, len(batches))
362362
assert.Equal(t, batchHash1, batches[0].Hash)

rollup/tests/rollup_test.go

Lines changed: 112 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/scroll-tech/go-ethereum/params"
1515
"github.com/stretchr/testify/assert"
1616
"github.com/stretchr/testify/require"
17+
"gorm.io/gorm"
1718

1819
"scroll-tech/common/database"
1920
"scroll-tech/common/types"
@@ -295,41 +296,64 @@ func testCommitBatchAndFinalizeBundleCodecV7(t *testing.T) {
295296
}, encoding.CodecV7, chainConfig, db, nil)
296297

297298
bup := watcher.NewBundleProposer(context.Background(), &config.BundleProposerConfig{
298-
MaxBatchNumPerBundle: 1000000,
299+
MaxBatchNumPerBundle: 2,
299300
BundleTimeoutSec: 300,
300301
}, encoding.CodecV7, chainConfig, db, nil)
301302

302303
l2BlockOrm := orm.NewL2Block(db)
303304
batchOrm := orm.NewBatch(db)
304305
bundleOrm := orm.NewBundle(db)
305306

306-
fmt.Println("insert first 5 blocks ------------------------")
307-
err = l2BlockOrm.InsertL2Blocks(context.Background(), blocks[:5])
308-
require.NoError(t, err)
309-
batch1ExpectedLastL1MessageQueueHash, err := encoding.MessageQueueV2ApplyL1MessagesFromBlocks(common.Hash{}, blocks[:5])
310-
require.NoError(t, err)
307+
var batch1ExpectedLastL1MessageQueueHash common.Hash
308+
{
309+
fmt.Println("insert first 5 blocks ------------------------")
310+
err = l2BlockOrm.InsertL2Blocks(context.Background(), blocks[:5])
311+
require.NoError(t, err)
312+
batch1ExpectedLastL1MessageQueueHash, err = encoding.MessageQueueV2ApplyL1MessagesFromBlocks(common.Hash{}, blocks[:5])
313+
require.NoError(t, err)
311314

312-
cp.TryProposeChunk()
313-
bap.TryProposeBatch()
315+
cp.TryProposeChunk()
316+
bap.TryProposeBatch()
317+
}
314318

315-
fmt.Println("insert last 5 blocks ------------------------")
316-
err = l2BlockOrm.InsertL2Blocks(context.Background(), blocks[5:])
317-
require.NoError(t, err)
318-
batch2ExpectedLastL1MessageQueueHash, err := encoding.MessageQueueV2ApplyL1MessagesFromBlocks(batch1ExpectedLastL1MessageQueueHash, blocks[5:])
319-
require.NoError(t, err)
319+
var batch2ExpectedLastL1MessageQueueHash common.Hash
320+
{
321+
fmt.Println("insert next 3 blocks ------------------------")
322+
err = l2BlockOrm.InsertL2Blocks(context.Background(), blocks[5:8])
323+
for _, block := range blocks[5:8] {
324+
fmt.Println("insert[5:8] block number: ", block.Header.Number, block.Header.Hash())
325+
}
326+
require.NoError(t, err)
327+
batch2ExpectedLastL1MessageQueueHash, err = encoding.MessageQueueV2ApplyL1MessagesFromBlocks(batch1ExpectedLastL1MessageQueueHash, blocks[5:8])
328+
require.NoError(t, err)
320329

321-
cp.TryProposeChunk()
322-
bap.TryProposeBatch()
330+
cp.TryProposeChunk()
331+
bap.TryProposeBatch()
332+
}
323333

324-
bup.TryProposeBundle() // The proposed bundle contains two batches when codec version is codecv3.
334+
var batch3ExpectedLastL1MessageQueueHash common.Hash
335+
{
336+
fmt.Println("insert last 2 blocks ------------------------")
337+
err = l2BlockOrm.InsertL2Blocks(context.Background(), blocks[8:])
338+
for _, block := range blocks[8:] {
339+
fmt.Println("insert[:8] block number: ", block.Header.Number, block.Header.Hash())
340+
}
341+
require.NoError(t, err)
342+
batch3ExpectedLastL1MessageQueueHash, err = encoding.MessageQueueV2ApplyL1MessagesFromBlocks(batch2ExpectedLastL1MessageQueueHash, blocks[8:])
343+
require.NoError(t, err)
344+
345+
cp.TryProposeChunk()
346+
bap.TryProposeBatch()
347+
}
325348

349+
var batches []*orm.Batch
326350
// make sure that batches are created as expected
327351
require.Eventually(t, func() bool {
328-
batches, getErr := batchOrm.GetBatches(context.Background(), map[string]interface{}{}, nil, 0)
329-
if getErr != nil {
352+
batches, err = batchOrm.GetBatches(context.Background(), map[string]interface{}{}, nil, 0)
353+
if err != nil {
330354
return false
331355
}
332-
if len(batches) != 3 {
356+
if len(batches) != 4 {
333357
return false
334358
}
335359

@@ -340,46 +364,91 @@ func testCommitBatchAndFinalizeBundleCodecV7(t *testing.T) {
340364
require.Equal(t, batch1ExpectedLastL1MessageQueueHash, common.HexToHash(batches[1].PostL1MessageQueueHash))
341365
require.Equal(t, batch1ExpectedLastL1MessageQueueHash, common.HexToHash(batches[2].PrevL1MessageQueueHash))
342366
require.Equal(t, batch2ExpectedLastL1MessageQueueHash, common.HexToHash(batches[2].PostL1MessageQueueHash))
367+
require.Equal(t, batch2ExpectedLastL1MessageQueueHash, common.HexToHash(batches[3].PrevL1MessageQueueHash))
368+
require.Equal(t, batch3ExpectedLastL1MessageQueueHash, common.HexToHash(batches[3].PostL1MessageQueueHash))
343369

344370
return true
345371
}, 30*time.Second, time.Second)
346372

347-
// simulate proof generation -> all batches and bundle are verified
373+
// Nothing should happen since no batch is committed yet.
348374
{
349-
batchProof := &message.OpenVMBatchProof{}
350-
batches, err := batchOrm.GetBatches(context.Background(), map[string]interface{}{}, nil, 0)
375+
bup.TryProposeBundle()
376+
bundles, err := bundleOrm.GetBundles(context.Background(), map[string]interface{}{}, nil, 0)
351377
require.NoError(t, err)
352-
batches = batches[1:]
353-
for _, batch := range batches {
354-
err = batchOrm.UpdateProofByHash(context.Background(), batch.Hash, batchProof, 100)
355-
require.NoError(t, err)
356-
err = batchOrm.UpdateProvingStatus(context.Background(), batch.Hash, types.ProvingTaskVerified)
357-
require.NoError(t, err)
378+
require.Len(t, bundles, 0)
379+
}
380+
381+
// simulate batches 2 and 3 being submitted together in a single transaction
382+
err = db.Transaction(func(dbTX *gorm.DB) error {
383+
if err = batchOrm.UpdateCommitTxHashAndRollupStatus(context.Background(), batches[1].Hash, "0xdefdef", types.RollupCommitted, dbTX); err != nil {
384+
return fmt.Errorf("UpdateCommitTxHashAndRollupStatus failed for batch %d: %s, err %v", batches[1].Index, batches[1].Hash, err)
385+
}
386+
387+
for _, batch := range batches[2:] {
388+
if err = batchOrm.UpdateCommitTxHashAndRollupStatus(context.Background(), batch.Hash, "0xabcabc", types.RollupCommitted, dbTX); err != nil {
389+
return fmt.Errorf("UpdateCommitTxHashAndRollupStatus failed for batch %d: %s, err %v", batch.Index, batch.Hash, err)
390+
}
358391
}
359392

360-
bundleProof := &message.OpenVMBundleProof{}
393+
return nil
394+
})
395+
require.NoError(t, err)
396+
397+
// We only allow bundles up to 2 batches. We should have 2 bundles:
398+
// 1. batch 1 -> because it was committed by itself and the next set of batches could not fit the bundle
399+
// 2. batch 2 and 3 -> because they were committed together in a single transaction
400+
{
401+
// need to propose 2 times to get 2 bundles with all batches
402+
bup.TryProposeBundle()
403+
bup.TryProposeBundle()
404+
361405
bundles, err := bundleOrm.GetBundles(context.Background(), map[string]interface{}{}, nil, 0)
362406
require.NoError(t, err)
363-
for _, bundle := range bundles {
364-
err = bundleOrm.UpdateProofAndProvingStatusByHash(context.Background(), bundle.Hash, bundleProof, types.ProvingTaskVerified, 100)
365-
require.NoError(t, err)
366-
}
407+
require.Len(t, bundles, 2)
408+
409+
require.Equal(t, bundles[0].StartBatchIndex, batches[1].Index)
410+
require.Equal(t, bundles[0].EndBatchIndex, batches[1].Index)
411+
require.Equal(t, bundles[0].StartBatchHash, batches[1].Hash)
412+
require.Equal(t, bundles[0].EndBatchHash, batches[1].Hash)
413+
414+
require.Equal(t, bundles[1].StartBatchIndex, batches[2].Index)
415+
require.Equal(t, bundles[1].EndBatchIndex, batches[3].Index)
416+
require.Equal(t, bundles[1].StartBatchHash, batches[2].Hash)
417+
require.Equal(t, bundles[1].EndBatchHash, batches[3].Hash)
367418
}
368419

369-
//return
370-
// TODO: assert that batches have been submitted together in a single transaction after contract ABI is updated
371-
//for _, batch := range batches {
372-
// fmt.Println("batch hash: ", batch.Hash, batch.Index, batch.RollupStatus)
373-
// //if types.RollupCommitted != types.RollupStatus(batch.RollupStatus) {
374-
// // return false
375-
// //}
420+
return
421+
// simulate proof generation -> all batches and bundle are verified
422+
//{
423+
// batchProof := &message.OpenVMBatchProof{}
424+
// batches, err := batchOrm.GetBatches(context.Background(), map[string]interface{}{}, nil, 0)
425+
// require.NoError(t, err)
426+
// batches = batches[1:]
427+
// for _, batch := range batches {
428+
// err = batchOrm.UpdateProofByHash(context.Background(), batch.Hash, batchProof, 100)
429+
// require.NoError(t, err)
430+
// err = batchOrm.UpdateProvingStatus(context.Background(), batch.Hash, types.ProvingTaskVerified)
431+
// require.NoError(t, err)
432+
// }
433+
//
434+
// bundleProof := &message.OpenVMBundleProof{}
435+
// bundles, err := bundleOrm.GetBundles(context.Background(), map[string]interface{}{}, nil, 0)
436+
// require.NoError(t, err)
437+
// for _, bundle := range bundles {
438+
// err = bundleOrm.UpdateProofAndProvingStatusByHash(context.Background(), bundle.Hash, bundleProof, types.ProvingTaskVerified, 100)
439+
// require.NoError(t, err)
440+
// }
376441
//}
442+
443+
// TODO: assert that batches have been submitted together in a single transaction after contract ABI is updated
444+
377445
//l2Relayer.ProcessPendingBatches()
378-
//
446+
//l2Relayer.ProcessPendingBundles()
447+
379448
//assert.Eventually(t, func() bool {
380449
// l2Relayer.ProcessPendingBundles()
381450
//
382-
// batches, err := batchOrm.GetBatches(context.Background(), map[string]interface{}{}, nil, 0)
451+
// batches, err = batchOrm.GetBatches(context.Background(), map[string]interface{}{}, nil, 0)
383452
// assert.NoError(t, err)
384453
// assert.Len(t, batches, 3)
385454
// batches = batches[1:]
@@ -415,5 +484,5 @@ func testCommitBatchAndFinalizeBundleCodecV7(t *testing.T) {
415484
// }
416485
//
417486
// return true
418-
//}, 30*time.Second, time.Second)
487+
//}, 10*time.Second, time.Second)
419488
}

0 commit comments

Comments
 (0)