Skip to content

Commit 030742c

Browse files
committed
feat: support multi codec version
1 parent 03410b7 commit 030742c

File tree

2 files changed

+65
-34
lines changed

2 files changed

+65
-34
lines changed

rollup/internal/controller/blob_uploader/blob_uploader.go

Lines changed: 56 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -75,20 +75,15 @@ func (b *BlobUploader) UploadBlobToS3() {
7575
}
7676

7777
// construct blob
78-
var blob *kzg4844.Blob
7978
codecVersion := encoding.CodecVersion(dbBatch.CodecVersion)
80-
switch codecVersion {
81-
case encoding.CodecV7:
82-
blob, err = b.constructBlobCodecV7(dbBatch)
83-
if err != nil {
84-
log.Error("failed to construct constructBlobCodecV7 payload for V7", "codecVersion", codecVersion, "batch index", dbBatch.Index, "err", err)
85-
return
86-
}
87-
default:
88-
log.Error("unsupported codec version in UploadBlobToS3", "codecVersion", codecVersion, "batch index", dbBatch.Index)
79+
blob, err := b.constructBlobCodec(dbBatch)
80+
if err != nil {
81+
log.Error("failed to construct constructBlobCodec payload ", "codecVersion", codecVersion, "batch index", dbBatch.Index, "err", err)
82+
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
8983
return
9084
}
9185

86+
9287
// calculate versioned blob hash
9388
versionedBlobHash, err := utils.CalculateVersionedBlobHash(*blob)
9489
if err != nil {
@@ -101,6 +96,7 @@ func (b *BlobUploader) UploadBlobToS3() {
10196
err = b.s3Uploader.UploadData(b.ctx, blob[:], key)
10297
if err != nil {
10398
log.Error("failed to upload blob data to AWS S3", "batch index", dbBatch.Index, "versioned blob hash", key, "err", err)
99+
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
104100
// Update status to failed
105101
if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); err != nil {
106102
log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", err)
@@ -111,47 +107,77 @@ func (b *BlobUploader) UploadBlobToS3() {
111107
// Update status to uploaded
112108
if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusUploaded); err != nil {
113109
log.Error("failed to update blob upload status to uploaded", "batch index", dbBatch.Index, "err", err)
110+
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
114111
return
115112
}
116113

117-
b.metrics.rollupBlobUploaderUploadToS3Total.Inc()
114+
b.metrics.rollupBlobUploaderUploadToS3SuccessTotal.Inc()
118115
log.Info("Successfully uploaded blob to S3", "batch index", dbBatch.Index, "versioned blob hash", key)
119116
}
120117

121-
func (b *BlobUploader) constructBlobCodecV7(dbBatch *orm.Batch) (*kzg4844.Blob, error) {
118+
func (b *BlobUploader) constructBlobCodec(dbBatch *orm.Batch) (*kzg4844.Blob, error) {
122119
var dbChunks []*orm.Chunk
123120

124-
// Verify batches compatibility
125121
dbChunks, err := b.chunkOrm.GetChunksInRange(b.ctx, dbBatch.StartChunkIndex, dbBatch.EndChunkIndex)
126122
if err != nil {
127123
return nil, fmt.Errorf("failed to get chunks in range: %v", err)
128124
}
129125

130126
// check codec version
131-
var batchBlocks []*encoding.Block
132127
for _, dbChunk := range dbChunks {
133128
if dbBatch.CodecVersion != dbChunk.CodecVersion {
134129
return nil, fmt.Errorf("batch codec version is different from chunk codec version, batch index: %d, chunk index: %d, batch codec version: %d, chunk codec version: %d", dbBatch.Index, dbChunk.Index, dbBatch.CodecVersion, dbChunk.CodecVersion)
135130
}
131+
}
136132

137-
blocks, err := b.l2BlockOrm.GetL2BlocksInRange(b.ctx, dbChunk.StartBlockNumber, dbChunk.EndBlockNumber)
138-
if err != nil {
139-
return nil, fmt.Errorf("failed to get blocks in range for batch %d: %w", dbBatch.Index, err)
133+
var encodingBatch *encoding.Batch
134+
codecVersion := encoding.CodecVersion(dbBatch.CodecVersion)
135+
switch codecVersion {
136+
case encoding.CodecV0, encoding.CodecV1, encoding.CodecV2, encoding.CodecV3, encoding.CodecV4, encoding.CodecV5, encoding.CodecV6:
137+
chunks := make([]*encoding.Chunk, len(dbChunks))
138+
for i, c := range dbChunks {
139+
blocks, getErr := b.l2BlockOrm.GetL2BlocksInRange(b.ctx, c.StartBlockNumber, c.EndBlockNumber)
140+
if getErr != nil {
141+
return nil, fmt.Errorf("failed to get blocks in range for batch %d: %w", dbBatch.Index, err)
142+
}
143+
chunks[i] = &encoding.Chunk{Blocks: blocks}
144+
}
145+
146+
encodingBatch = &encoding.Batch{
147+
Index: dbBatch.Index,
148+
TotalL1MessagePoppedBefore: dbChunks[0].TotalL1MessagesPoppedBefore,
149+
ParentBatchHash: common.HexToHash(dbBatch.ParentBatchHash),
150+
Chunks: chunks,
140151
}
141152

142-
batchBlocks = append(batchBlocks, blocks...)
143-
}
144-
145-
encodingBatch := &encoding.Batch{
146-
Index: dbBatch.Index,
147-
ParentBatchHash: common.HexToHash(dbBatch.ParentBatchHash),
148-
PrevL1MessageQueueHash: common.HexToHash(dbBatch.PrevL1MessageQueueHash),
149-
PostL1MessageQueueHash: common.HexToHash(dbBatch.PostL1MessageQueueHash),
150-
Blocks: batchBlocks,
153+
case encoding.CodecV7:
154+
var batchBlocks []*encoding.Block
155+
for _, dbChunk := range dbChunks {
156+
if dbBatch.CodecVersion != dbChunk.CodecVersion {
157+
return nil, fmt.Errorf("batch codec version is different from chunk codec version, batch index: %d, chunk index: %d, batch codec version: %d, chunk codec version: %d", dbBatch.Index, dbChunk.Index, dbBatch.CodecVersion, dbChunk.CodecVersion)
158+
}
159+
160+
blocks, err := b.l2BlockOrm.GetL2BlocksInRange(b.ctx, dbChunk.StartBlockNumber, dbChunk.EndBlockNumber)
161+
if err != nil {
162+
return nil, fmt.Errorf("failed to get blocks in range for batch %d: %w", dbBatch.Index, err)
163+
}
164+
165+
batchBlocks = append(batchBlocks, blocks...)
166+
}
167+
168+
encodingBatch = &encoding.Batch{
169+
Index: dbBatch.Index,
170+
ParentBatchHash: common.HexToHash(dbBatch.ParentBatchHash),
171+
PrevL1MessageQueueHash: common.HexToHash(dbBatch.PrevL1MessageQueueHash),
172+
PostL1MessageQueueHash: common.HexToHash(dbBatch.PostL1MessageQueueHash),
173+
Blocks: batchBlocks,
174+
}
175+
default:
176+
log.Error("unsupported codec version in UploadBlobToS3", "codecVersion", codecVersion, "batch index", dbBatch.Index)
177+
return nil, fmt.Errorf("unsupported codec version, batch index: %d, batch codec version: %d, %w", dbBatch.Index, codecVersion, err)
151178
}
152-
153-
version := encoding.CodecVersion(dbBatch.CodecVersion)
154-
codec, err := encoding.CodecFromVersion(version)
179+
180+
codec, err := encoding.CodecFromVersion(codecVersion)
155181
if err != nil {
156182
return nil, fmt.Errorf("failed to get codec from version %d, err: %w", dbBatch.CodecVersion, err)
157183
}
@@ -163,4 +189,4 @@ func (b *BlobUploader) constructBlobCodecV7(dbBatch *orm.Batch) (*kzg4844.Blob,
163189

164190
return daBatch.Blob(), nil
165191

166-
}
192+
}

rollup/internal/controller/blob_uploader/blob_uploader_metrics.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ import (
88
)
99

1010
type blobUploaderMetrics struct {
11-
rollupBlobUploaderUploadToS3Total prometheus.Counter
11+
rollupBlobUploaderUploadToS3SuccessTotal prometheus.Counter
12+
rollupBlobUploaderUploadToS3FailedTotal prometheus.Counter
1213
}
1314

1415
var (
@@ -19,9 +20,13 @@ var (
1920
func initblobUploaderMetrics(reg prometheus.Registerer) *blobUploaderMetrics {
2021
initBlobUploaderMetricsOnce.Do(func() {
2122
blobUploaderMetric = &blobUploaderMetrics{
22-
rollupBlobUploaderUploadToS3Total: promauto.With(reg).NewCounter(prometheus.CounterOpts{
23-
Name: "rollup_blob_uploader_upload_to_s3_total",
24-
Help: "The total number of upload blob to S3 run total",
23+
rollupBlobUploaderUploadToS3SuccessTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
24+
Name: "rollup_blob_uploader_upload_to_s3_success_total",
25+
Help: "The total number of upload blob to S3 run success total",
26+
}),
27+
rollupBlobUploaderUploadToS3FailedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
28+
Name: "rollup_blob_uploader_upload_to_s3_failed_total",
29+
Help: "The total number of upload blob to S3 run failed total",
2530
}),
2631
}
2732
})

0 commit comments

Comments
 (0)