Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/utils/flags_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,11 @@ var (
Usage: "Whether the sequencer is paused (true) or active (false). Can be controlled via Apollo config.",
Value: false,
}
DataStreamBatchOptimizationEnabled = cli.BoolFlag{
Name: "zkevm.data-stream-batch-optimization-enabled",
Usage: "Enable data stream batch optimization",
Value: false,
}
)

func setGPOXLayer(ctx *cli.Context, cfg *gaspricecfg.Config) {
Expand Down
2 changes: 2 additions & 0 deletions eth/ethconfig/config_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type XLayerConfig struct {
SyncSeqLogs bool

SequencerPaused bool

DataStreamBatchOptimizationEnabled bool
}

var DefaultXLayerConfig = XLayerConfig{}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ require (

replace github.com/ledgerwatch/erigon-lib => ./erigon-lib

replace github.com/0xPolygonHermez/zkevm-data-streamer => github.com/okx/xlayer-data-streamer v0.4.1-rc2

require (
gfx.cafe/util/go/generic v0.0.0-20230721185457-c559e86c829c
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.9
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ filippo.io/edwards25519 v1.0.0-rc.1/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7
gfx.cafe/util/go/generic v0.0.0-20230721185457-c559e86c829c h1:alCfDKmPC0EC0KGlZWrNF0hilVWBkzMz+aAYTJ/2hY4=
gfx.cafe/util/go/generic v0.0.0-20230721185457-c559e86c829c/go.mod h1:WvSX4JsCRBuIXj0FRBFX9YLg+2SoL3w8Ww19uZO9yNE=
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.8 h1:0Sc91seArqR3BQs49SGwGXWjf0MQYW98sEUYrao7duU=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.8/go.mod h1:7nM7Ihk+fTG1TQPwdZoGOYd3wprqqyIyjtS514uHzWE=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.9 h1:9F73F/hDSwG7tF85TVdu9t47/qZ6KHXc4WZhsirCSxw=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.9/go.mod h1:7nM7Ihk+fTG1TQPwdZoGOYd3wprqqyIyjtS514uHzWE=
github.com/99designs/gqlgen v0.17.40 h1:/l8JcEVQ93wqIfmH9VS1jsAkwm6eAF1NwQn3N+SDqBY=
github.com/99designs/gqlgen v0.17.40/go.mod h1:b62q1USk82GYIVjC60h02YguAZLqYZtvWml8KkhJps4=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
Expand Down Expand Up @@ -853,6 +849,8 @@ github.com/nxadm/tail v1.4.9-0.20211216163028-4472660a31a6 h1:iZ5rEHU561k2tdi/at
github.com/nxadm/tail v1.4.9-0.20211216163028-4472660a31a6/go.mod h1:A+9rV4WFp4DKg1Ym1v6YtCrJ2vvlt1ZA/iml0CNuu2A=
github.com/okx/poseidongold v0.0.3 h1:bb6cN2J2WVJDcqOUvIQI9wkLaEwRdqhB9eCEm/Cpb4E=
github.com/okx/poseidongold v0.0.3/go.mod h1:5ZSHtmCUvyWfMhQH+9wi6b41z4JJwgO06baYZdBPRXw=
github.com/okx/xlayer-data-streamer v0.4.1-rc2 h1:tK3qk6Y2B3EsnyUCIUdbPNokcyfNXfwwZI4ikBhK1Eg=
github.com/okx/xlayer-data-streamer v0.4.1-rc2/go.mod h1:7nM7Ihk+fTG1TQPwdZoGOYd3wprqqyIyjtS514uHzWE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
Expand Down
3 changes: 2 additions & 1 deletion test/config/test.erigon.rpc.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,5 @@ realtime.kafka-sync-block-topic: xlayer-header
realtime.kafka-sync-error-topic: xlayer-error
realtime.kafka-sync-client-id: "xlayer-consumer"
realtime.kafka-sync-group-id: "xlayer-consumer-1"
realtime.cache-dump-path: /home/erigon/data/cache
realtime.cache-dump-path: /home/erigon/data/cache
zkevm.data-stream-batch-optimization-enabled: true
1 change: 1 addition & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,4 +397,5 @@ var DefaultFlags = []cli.Flag{
&utils.SyncSeqLogs,

&utils.SequencerPaused,
&utils.DataStreamBatchOptimizationEnabled,
}
1 change: 1 addition & 0 deletions turbo/cli/flags_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func ApplyFlagsForEthXLayerConfig(ctx *cli.Context, cfg *ethconfig.Config) {
DataStreamUnwindToBlock: ctx.Uint64(utils.DataStreamUnwindToBlock.Name),
SyncSeqLogs: ctx.Bool(utils.SyncSeqLogs.Name),
SequencerPaused: ctx.Bool(utils.SequencerPaused.Name),
DataStreamBatchOptimizationEnabled: ctx.Bool(utils.DataStreamBatchOptimizationEnabled.Name),
}
if cfg.XLayer.BlockInfoConcurrent {
blockinfo.SetUseBlockInfoTree(true)
Expand Down
28 changes: 28 additions & 0 deletions zk/datastream/client/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ const (
CmdBookmark // CmdBookmark for the get bookmark TCP client command
)

const (
// Custom X Layer commands - use 1000+ range to avoid conflicts with upstream
CmdLatestL2Block Command = 1001 // CmdLatestL2Block for the optimized get latest L2Block command
CmdStartBookmarkBatch Command = 1002 // CmdStartBookmarkBatch for the optimized batch streaming from bookmark
)

// sendHeaderCmd sends the header command to the server.
func (c *StreamClient) sendHeaderCmd() error {
return c.sendCommand(CmdHeader)
Expand Down Expand Up @@ -61,6 +67,28 @@ func (c *StreamClient) sendEntryCmd(entryNum uint64) error {
return c.writeToConn(entryNum)
}

// sendLatestL2BlockCmd sends the optimized get latest L2Block command to the server.
func (c *StreamClient) sendLatestL2BlockCmd() error {
return c.sendCommand(CmdLatestL2Block)
}

// sendBookmarkBatchCmd sends the optimized batch streaming command for the provided bookmark value.
// This replaces the need for separate stopStreaming + GetHeader + initiateDownloadBookmark calls
func (c *StreamClient) sendBookmarkBatchCmd(bookmark []byte) error {
// Send the CmdStartBookmarkBatch command
if err := c.sendCommand(CmdStartBookmarkBatch); err != nil {
return err
}

// Send bookmark length
if err := c.writeToConn(uint32(len(bookmark))); err != nil {
return err
}

// Send the bookmark to retrieve
return c.writeToConn(bookmark)
}

// sendHeaderCmd sends the header command to the server.
func (c *StreamClient) sendStopCmd() error {
return c.sendCommand(CmdStop)
Expand Down
Loading
Loading