Skip to content

Commit 9aee0d6

Browse files
committed
feat: add blob uploader service
1 parent 5204ad5 commit 9aee0d6

File tree

17 files changed

+658
-1
lines changed

17 files changed

+658
-1
lines changed

common/testdata/blobdata.txt

Whitespace-only changes.

common/types/db.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,3 +326,53 @@ func (s TxStatus) String() string {
326326
return fmt.Sprintf("Unknown TxStatus (%d)", int32(s))
327327
}
328328
}
329+
330+
// BlobUploadStatus represents the status of a blob upload
331+
type BlobUploadStatus int
332+
333+
const (
334+
// BlobUploadStatusUndefined indicates an undefined status
335+
BlobUploadStatusUndefined BlobUploadStatus = iota
336+
// BlobUploadStatusPending indicates a pending upload status
337+
BlobUploadStatusPending
338+
// BlobUploadStatusUploaded indicates a successful upload status
339+
BlobUploadStatusUploaded
340+
// BlobUploadStatusFailed indicates a failed upload status
341+
BlobUploadStatusFailed
342+
)
343+
344+
func (s BlobUploadStatus) String() string {
345+
switch s {
346+
case BlobUploadStatusPending:
347+
return "BlobUploadStatusPending"
348+
case BlobUploadStatusUploaded:
349+
return "BlobUploadStatusUploaded"
350+
case BlobUploadStatusFailed:
351+
return "BlobUploadStatusFailed"
352+
default:
353+
return fmt.Sprintf("Unknown BlobUploadStatus (%d)", int32(s))
354+
}
355+
}
356+
357+
// BlobStoragePlatform represents the platform a blob upload to
358+
type BlobStoragePlatform int
359+
360+
const (
361+
// BlobStoragePlatformUndefined indicates an undefined platform
362+
BlobStoragePlatformUndefined BlobStoragePlatform = iota
363+
// BlobStoragePlatformS3 represents AWS S3
364+
BlobStoragePlatformS3
365+
// BlobUploadStatusUploaded represents storage blockchain Arweave
366+
BlobStoragePlatformArweave
367+
)
368+
369+
func (s BlobStoragePlatform) String() string {
370+
switch s {
371+
case BlobStoragePlatformS3:
372+
return "BlobStoragePlatformS3"
373+
case BlobStoragePlatformArweave:
374+
return "BlobStoragePlatformArweave"
375+
default:
376+
return fmt.Sprintf("Unknown BlobStoragePlatform (%d)", int32(s))
377+
}
378+
}

common/utils/ethereum.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package utils
2+
3+
import "crypto/sha256"
4+
5+
// CalculateVersionedBlobHash computes the versioned hash for blob data
6+
// Following Ethereum's approach where:
7+
// version = 0x01
8+
// hash = sha256(blob)
9+
// versionedHash = version + hash[1:]
10+
func CalculateVersionedBlobHash(blobData []byte) [32]byte {
11+
// Step 1: Compute SHA-256 hash of the blob data
12+
hash := sha256.Sum256(blobData)
13+
14+
// Step 2: Create versioned hash (version byte + hash[1:])
15+
var versionedHash [32]byte
16+
versionedHash[0] = 0x01 // Version byte
17+
copy(versionedHash[1:], hash[1:])
18+
19+
return versionedHash
20+
}

common/utils/ethereum_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package blob_uploader
2+
3+
import "testing"
4+
5+
// testCalculateVersionedBlobHash test function CalculateVersionedBlobHash
6+
func testCalculateVersionedBlobHash(t *testing.T) {
7+
8+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
-- +goose Up
2+
-- +goose StatementBegin
3+
4+
CREATE TABLE blob_upload (
5+
batch_index BIGINT NOT NULL,
6+
7+
platform TEXT NOT NULL,
8+
status SMALLINT NOT NULL,
9+
updated_at TIMESTAMP NOT NULL DEFAULT now(),
10+
11+
PRIMARY KEY (batch_index, platform),
12+
FOREIGN KEY (batch_index) REFERENCES batch(index)
13+
);
14+
15+
COMMENT ON COLUMN blob_upload.status IS 'undefined, pending, uploaded, failed';
16+
17+
-- +goose StatementEnd
18+
19+
-- +goose Down
20+
-- +goose StatementBegin
21+
DROP TABLE blob_upload;
22+
-- +goose StatementEnd
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
-- +goose Up
2+
-- +goose StatementBegin
3+
4+
-- Add index on status for faster filtering by status
5+
CREATE INDEX idx_blob_upload_status ON blob_upload(status);
6+
7+
-- Add index on updated_at for faster sorting and filtering by time
8+
CREATE INDEX idx_blob_upload_updated_at ON blob_upload(updated_at);
9+
10+
-- Add index on (batch_index, status) for faster filtering by both fields
11+
CREATE INDEX idx_blob_upload_batch_index_status ON blob_upload(batch_index, status);
12+
13+
-- +goose StatementEnd
14+
15+
-- +goose Down
16+
-- +goose StatementBegin
17+
DROP INDEX IF EXISTS idx_blob_upload_status;
18+
DROP INDEX IF EXISTS idx_blob_upload_updated_at;
19+
DROP INDEX IF EXISTS idx_blob_upload_batch_index_status;
20+
-- +goose StatementEnd

go.work.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,8 @@ github.com/aws/aws-sdk-go-v2/service/s3 v1.30.6/go.mod h1:PudwVKUTApfm0nYaPutOXa
646646
github.com/aws/aws-sdk-go-v2/service/sso v1.1.1/go.mod h1:SuZJxklHxLAXgLTc1iFXbEWkXs7QRTQpCLGaKIprQW0=
647647
github.com/aws/aws-sdk-go-v2/service/sts v1.1.1/go.mod h1:Wi0EBZwiz/K44YliU0EKxqTCJGUfYTWXrrBwkq736bM=
648648
github.com/aws/smithy-go v1.1.0/go.mod h1:EzMw8dbp/YJL4A5/sbhGddag+NPT7q084agLbB9LgIw=
649+
github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ=
650+
github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
649651
github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4=
650652
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
651653
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
@@ -1357,7 +1359,6 @@ github.com/scroll-tech/da-codec v0.1.1-0.20241014152913-2703f226fb0b h1:5H6V6yba
13571359
github.com/scroll-tech/da-codec v0.1.1-0.20241014152913-2703f226fb0b/go.mod h1:48uxaqVgpD8ulH8p+nrBtfeLHZ9tX82bVVdPNkW3rPE=
13581360
github.com/scroll-tech/da-codec v0.1.3-0.20250227072756-a1482833595f h1:YYbhuUwjowqI4oyXtECRofck7Fyj18e1tcRjuQlZpJE=
13591361
github.com/scroll-tech/da-codec v0.1.3-0.20250227072756-a1482833595f/go.mod h1:xECEHZLVzbdUn+tNbRJhRIjLGTOTmnFQuTgUTeVLX58=
1360-
github.com/scroll-tech/da-codec v0.1.3-0.20250519114140-bfa7133d4ad1/go.mod h1:yhTS9OVC0xQGhg7DN5iV5KZJvnSIlFWAxDdp+6jxQtY=
13611362
github.com/scroll-tech/go-ethereum v1.10.14-0.20240607130425-e2becce6a1a4/go.mod h1:byf/mZ8jLYUCnUePTicjJWn+RvKdxDn7buS6glTnMwQ=
13621363
github.com/scroll-tech/go-ethereum v1.10.14-0.20240821074444-b3fa00861e5e/go.mod h1:swB5NSp8pKNDuYsTxfR08bHS6L56i119PBx8fxvV8Cs=
13631364
github.com/scroll-tech/go-ethereum v1.10.14-0.20241010064814-3d88e870ae22/go.mod h1:r9FwtxCtybMkTbWYCyBuevT9TW3zHmOTHqD082Uh+Oo=
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package app
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"os/signal"
8+
"time"
9+
10+
"github.com/prometheus/client_golang/prometheus"
11+
"github.com/scroll-tech/da-codec/encoding"
12+
"github.com/scroll-tech/go-ethereum/ethclient"
13+
"github.com/scroll-tech/go-ethereum/log"
14+
"github.com/urfave/cli/v2"
15+
16+
"scroll-tech/common/database"
17+
"scroll-tech/common/observability"
18+
"scroll-tech/common/utils"
19+
"scroll-tech/common/version"
20+
21+
"scroll-tech/rollup/internal/config"
22+
"scroll-tech/rollup/internal/controller/relayer"
23+
"scroll-tech/rollup/internal/controller/watcher"
24+
rutils "scroll-tech/rollup/internal/utils"
25+
)
26+
27+
var app *cli.App
28+
29+
func init() {
30+
// Set up blob-uploader app info.
31+
app = cli.NewApp()
32+
app.Action = action
33+
app.Name = "blob-uploader"
34+
app.Usage = "The Scroll Blob Uploader"
35+
app.Version = version.Version
36+
app.Flags = append(app.Flags, utils.CommonFlags...)
37+
app.Flags = append(app.Flags, utils.RollupRelayerFlags...)
38+
app.Commands = []*cli.Command{}
39+
app.Before = func(ctx *cli.Context) error {
40+
return utils.LogSetup(ctx)
41+
}
42+
// Register `rollup-relayer-test` app for integration-test.
43+
utils.RegisterSimulation(app, utils.RollupRelayerApp)
44+
}
45+
46+
func action(ctx *cli.Context) error {
47+
// Load config file.
48+
cfgFile := ctx.String(utils.ConfigFileFlag.Name)
49+
cfg, err := config.NewConfig(cfgFile)
50+
if err != nil {
51+
log.Crit("failed to load config file", "config file", cfgFile, "error", err)
52+
}
53+
54+
subCtx, cancel := context.WithCancel(ctx.Context)
55+
// Init db connection
56+
db, err := database.InitDB(cfg.DBConfig)
57+
if err != nil {
58+
log.Crit("failed to init db connection", "err", err)
59+
}
60+
defer func() {
61+
cancel()
62+
if err = database.CloseDB(db); err != nil {
63+
log.Crit("failed to close db connection", "error", err)
64+
}
65+
}()
66+
67+
registry := prometheus.DefaultRegisterer
68+
observability.Server(ctx, db)
69+
70+
// Init l2geth connection
71+
l2client, err := ethclient.Dial(cfg.L2Config.Endpoint)
72+
if err != nil {
73+
log.Crit("failed to connect l2 geth", "config file", cfgFile, "error", err)
74+
}
75+
76+
genesisPath := ctx.String(utils.Genesis.Name)
77+
genesis, err := utils.ReadGenesis(genesisPath)
78+
if err != nil {
79+
log.Crit("failed to read genesis", "genesis file", genesisPath, "error", err)
80+
}
81+
82+
// sanity check config
83+
if cfg.L2Config.RelayerConfig.BatchSubmission == nil {
84+
log.Crit("cfg.L2Config.RelayerConfig.BatchSubmission must not be nil")
85+
}
86+
if cfg.L2Config.RelayerConfig.BatchSubmission.MinBatches < 1 {
87+
log.Crit("cfg.L2Config.RelayerConfig.SenderConfig.BatchSubmission.MinBatches must be at least 1")
88+
}
89+
if cfg.L2Config.RelayerConfig.BatchSubmission.MaxBatches < 1 {
90+
log.Crit("cfg.L2Config.RelayerConfig.SenderConfig.BatchSubmission.MaxBatches must be at least 1")
91+
}
92+
if cfg.L2Config.BatchProposerConfig.MaxChunksPerBatch <= 0 {
93+
log.Crit("cfg.L2Config.BatchProposerConfig.MaxChunksPerBatch must be greater than 0")
94+
}
95+
if cfg.L2Config.ChunkProposerConfig.MaxL2GasPerChunk <= 0 {
96+
log.Crit("cfg.L2Config.ChunkProposerConfig.MaxL2GasPerChunk must be greater than 0")
97+
}
98+
99+
l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, db, cfg.L2Config.RelayerConfig, genesis.Config, relayer.ServiceTypeL2RollupRelayer, registry)
100+
if err != nil {
101+
log.Crit("failed to create l2 relayer", "config file", cfgFile, "error", err)
102+
}
103+
104+
minCodecVersion := encoding.CodecVersion(ctx.Uint(utils.MinCodecVersionFlag.Name))
105+
if minCodecVersion < encoding.CodecV7 {
106+
log.Crit("min codec version must be greater than or equal to CodecV7", "minCodecVersion", minCodecVersion)
107+
}
108+
109+
chunkProposer := watcher.NewChunkProposer(subCtx, cfg.L2Config.ChunkProposerConfig, minCodecVersion, genesis.Config, db, registry)
110+
batchProposer := watcher.NewBatchProposer(subCtx, cfg.L2Config.BatchProposerConfig, minCodecVersion, genesis.Config, db, registry)
111+
bundleProposer := watcher.NewBundleProposer(subCtx, cfg.L2Config.BundleProposerConfig, minCodecVersion, genesis.Config, db, registry)
112+
113+
l2watcher := watcher.NewL2WatcherClient(subCtx, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessageQueueAddress, cfg.L2Config.WithdrawTrieRootSlot, genesis.Config, db, registry)
114+
115+
// Watcher loop to fetch missing blocks
116+
go utils.LoopWithContext(subCtx, 2*time.Second, func(ctx context.Context) {
117+
number, loopErr := rutils.GetLatestConfirmedBlockNumber(ctx, l2client, cfg.L2Config.Confirmations)
118+
if loopErr != nil {
119+
log.Error("failed to get block number", "err", loopErr)
120+
return
121+
}
122+
l2watcher.TryFetchRunningMissingBlocks(number)
123+
})
124+
125+
go utils.Loop(subCtx, time.Duration(cfg.L2Config.ChunkProposerConfig.ProposeIntervalMilliseconds)*time.Millisecond, chunkProposer.TryProposeChunk)
126+
127+
go utils.Loop(subCtx, time.Duration(cfg.L2Config.BatchProposerConfig.ProposeIntervalMilliseconds)*time.Millisecond, batchProposer.TryProposeBatch)
128+
129+
go utils.Loop(subCtx, 10*time.Second, bundleProposer.TryProposeBundle)
130+
131+
go utils.Loop(subCtx, 2*time.Second, l2relayer.ProcessPendingBatches)
132+
133+
go utils.Loop(subCtx, 15*time.Second, l2relayer.ProcessPendingBundles)
134+
135+
// Finish start all blob-uploader functions.
136+
log.Info("Start blob-uploader successfully", "version", version.Version)
137+
138+
// Catch CTRL-C to ensure a graceful shutdown.
139+
interrupt := make(chan os.Signal, 1)
140+
signal.Notify(interrupt, os.Interrupt)
141+
142+
// Wait until the interrupt signal is received from an OS signal.
143+
<-interrupt
144+
145+
return nil
146+
}
147+
148+
// Run rollup relayer cmd instance.
149+
func Run() {
150+
if err := app.Run(os.Args); err != nil {
151+
_, _ = fmt.Fprintln(os.Stderr, err)
152+
os.Exit(1)
153+
}
154+
}

rollup/cmd/blob_uploader/main.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package main
2+
3+
import "scroll-tech/rollup/cmd/rollup_relayer/app"
4+
5+
func main() {
6+
app.Run()
7+
}

rollup/go.mod

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,24 @@ require (
2222

2323
require (
2424
github.com/VictoriaMetrics/fastcache v1.12.2 // indirect
25+
github.com/aws/aws-sdk-go-v2 v1.36.3 // indirect
26+
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect
27+
github.com/aws/aws-sdk-go-v2/config v1.29.14 // indirect
28+
github.com/aws/aws-sdk-go-v2/credentials v1.17.67 // indirect
29+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect
30+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 // indirect
31+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 // indirect
32+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
33+
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 // indirect
34+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 // indirect
35+
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.2 // indirect
36+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect
37+
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 // indirect
38+
github.com/aws/aws-sdk-go-v2/service/s3 v1.80.0 // indirect
39+
github.com/aws/aws-sdk-go-v2/service/sso v1.25.3 // indirect
40+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 // indirect
41+
github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 // indirect
42+
github.com/aws/smithy-go v1.22.2 // indirect
2543
github.com/beorn7/perks v1.0.1 // indirect
2644
github.com/bits-and-blooms/bitset v1.20.0 // indirect
2745
github.com/btcsuite/btcd v0.20.1-beta // indirect

0 commit comments

Comments
 (0)