Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ require (

replace github.com/ledgerwatch/erigon-lib => ./erigon-lib

replace github.com/0xPolygonHermez/zkevm-data-streamer => github.com/okx/xlayer-data-streamer v0.4.1-rc1

require (
gfx.cafe/util/go/generic v0.0.0-20230721185457-c559e86c829c
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.9
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ filippo.io/edwards25519 v1.0.0-rc.1/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7
gfx.cafe/util/go/generic v0.0.0-20230721185457-c559e86c829c h1:alCfDKmPC0EC0KGlZWrNF0hilVWBkzMz+aAYTJ/2hY4=
gfx.cafe/util/go/generic v0.0.0-20230721185457-c559e86c829c/go.mod h1:WvSX4JsCRBuIXj0FRBFX9YLg+2SoL3w8Ww19uZO9yNE=
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.8 h1:0Sc91seArqR3BQs49SGwGXWjf0MQYW98sEUYrao7duU=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.8/go.mod h1:7nM7Ihk+fTG1TQPwdZoGOYd3wprqqyIyjtS514uHzWE=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.9 h1:9F73F/hDSwG7tF85TVdu9t47/qZ6KHXc4WZhsirCSxw=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.9/go.mod h1:7nM7Ihk+fTG1TQPwdZoGOYd3wprqqyIyjtS514uHzWE=
github.com/99designs/gqlgen v0.17.40 h1:/l8JcEVQ93wqIfmH9VS1jsAkwm6eAF1NwQn3N+SDqBY=
github.com/99designs/gqlgen v0.17.40/go.mod h1:b62q1USk82GYIVjC60h02YguAZLqYZtvWml8KkhJps4=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
Expand Down Expand Up @@ -853,6 +849,8 @@ github.com/nxadm/tail v1.4.9-0.20211216163028-4472660a31a6 h1:iZ5rEHU561k2tdi/at
github.com/nxadm/tail v1.4.9-0.20211216163028-4472660a31a6/go.mod h1:A+9rV4WFp4DKg1Ym1v6YtCrJ2vvlt1ZA/iml0CNuu2A=
github.com/okx/poseidongold v0.0.3 h1:bb6cN2J2WVJDcqOUvIQI9wkLaEwRdqhB9eCEm/Cpb4E=
github.com/okx/poseidongold v0.0.3/go.mod h1:5ZSHtmCUvyWfMhQH+9wi6b41z4JJwgO06baYZdBPRXw=
github.com/okx/xlayer-data-streamer v0.4.1-rc1 h1:kXcyDMWKOq12TgzjY+I9Hs++M+KgwzVPmpEi90wacVY=
github.com/okx/xlayer-data-streamer v0.4.1-rc1/go.mod h1:7nM7Ihk+fTG1TQPwdZoGOYd3wprqqyIyjtS514uHzWE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
Expand Down
10 changes: 10 additions & 0 deletions zk/datastream/client/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ const (
CmdBookmark // CmdBookmark for the get bookmark TCP client command
)

const (
// Custom X Layer commands - use 1000+ range to avoid conflicts with upstream
CmdLatestL2Block Command = 1001 // CmdLatestL2Block for the optimized get latest L2Block command
)

// sendHeaderCmd sends the header command to the server.
func (c *StreamClient) sendHeaderCmd() error {
return c.sendCommand(CmdHeader)
Expand Down Expand Up @@ -61,6 +66,11 @@ func (c *StreamClient) sendEntryCmd(entryNum uint64) error {
return c.writeToConn(entryNum)
}

// sendLatestL2BlockCmd sends the optimized get latest L2Block command to the server.
func (c *StreamClient) sendLatestL2BlockCmd() error {
return c.sendCommand(CmdLatestL2Block)
}

// sendHeaderCmd sends the header command to the server.
func (c *StreamClient) sendStopCmd() error {
return c.sendCommand(CmdStop)
Expand Down
63 changes: 63 additions & 0 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type StreamClient struct {

useTLS bool
tlsConfig *tls.Config

// X Layer optimization: track last used API type
lastUsedOptimizedAPI atomic.Bool
}

const (
Expand Down Expand Up @@ -258,6 +261,66 @@ func (c *StreamClient) stopStreaming() error {
}

func (c *StreamClient) getLatestL2Block() (l2Block *types.FullL2Block, err error) {
// Try optimized API first (X Layer enhancement)
l2Block, err = c.getLatestL2BlockOptimized()
if err == nil {
c.lastUsedOptimizedAPI.Store(true)
log.Debug("[Datastream client] getLatestL2Block using optimized API succeeded")
return l2Block, nil
}

// Optimized API failed, fall back to legacy method
c.lastUsedOptimizedAPI.Store(false)
log.Debug("[Datastream client] getLatestL2Block using optimized API failed, using legacy method", "error", err)
return c.getLatestL2BlockLegacy()
}

// LastUsedOptimizedAPI returns whether the last GetLatestL2Block call used the optimized API
func (c *StreamClient) LastUsedOptimizedAPI() bool {
return c.lastUsedOptimizedAPI.Load()
}

// getLatestL2BlockOptimized tries to use the new CmdLatestL2Block command
func (c *StreamClient) getLatestL2BlockOptimized() (l2Block *types.FullL2Block, err error) {
if err := c.sendLatestL2BlockCmd(); err != nil {
return nil, fmt.Errorf("sendLatestL2BlockCmd: %w", err)
}

// Read packet
packet, err := c.readBuffer(1)
if err != nil {
return nil, fmt.Errorf("readBuffer: %w", err)
}

// Check packet type
if packet[0] != PtResult {
return nil, fmt.Errorf("expecting result packet type %d and received %d", PtResult, packet[0])
}

// Read server result entry for the command
if _, err := c.readResultEntry(packet); err != nil {
return nil, fmt.Errorf("readResultEntry: %w", err)
}

// Read the L2Block entry directly
entry, err := c.NextFileEntry()
if err != nil {
return nil, fmt.Errorf("NextFileEntry: %w", err)
}

if entry.EntryType != types.EntryTypeL2Block {
return nil, fmt.Errorf("expected L2Block entry type %d but got %d", types.EntryTypeL2Block, entry.EntryType)
}

if l2Block, err = types.UnmarshalL2Block(entry.Data); err != nil {
return nil, fmt.Errorf("UnmarshalL2Block: %w", err)
}

return l2Block, nil
}

// getLatestL2BlockLegacy uses the original backward search method
func (c *StreamClient) getLatestL2BlockLegacy() (l2Block *types.FullL2Block, err error) {
h, err := c.GetHeader()
if err != nil {
return nil, fmt.Errorf("GetHeader: %w", err)
Expand Down
49 changes: 39 additions & 10 deletions zk/datastream/client/stream_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,23 +304,51 @@ func TestStreamClientGetLatestL2Block(t *testing.T) {
go func() {
defer wg.Done()

// Read the Command
// First, try to read CmdLatestL2Block (optimized API)
// Read the Command - expect CmdLatestL2Block first
if err := readAndValidateUint(t, serverConn, uint64(CmdLatestL2Block), "command"); err != nil {
errCh <- fmt.Errorf("failed to read CmdLatestL2Block: %w", err)
return
}

// Read the StreamType
if err := readAndValidateUint(t, serverConn, uint64(StSequencer), streamTypeFieldName); err != nil {
errCh <- fmt.Errorf("failed to read stream type for optimized API: %w", err)
return
}

// Simulate optimized API failure by sending an error result
errorRe := &types.ResultEntry{
PacketType: PtResult,
ErrorNum: 1, // Non-zero indicates error
Length: types.ResultEntryMinSize + 20,
ErrorStr: []byte("optimized API not supported"),
}
_, err = serverConn.Write(errorRe.Encode())
if err != nil {
errCh <- fmt.Errorf("failed to write error result entry: %w", err)
return
}

// Now handle the fallback to legacy method
// Read the Command - expect CmdHeader
if err := readAndValidateUint(t, serverConn, uint64(CmdHeader), "command"); err != nil {
errCh <- err
errCh <- fmt.Errorf("failed to read CmdHeader: %w", err)
return
}

// Read the StreamType
if err := readAndValidateUint(t, serverConn, uint64(StSequencer), streamTypeFieldName); err != nil {
errCh <- err
errCh <- fmt.Errorf("failed to read stream type for header: %w", err)
return
}

// Write ResultEntry
// Write ResultEntry for header command
re := createResultEntry(t)
_, err = serverConn.Write(re.Encode())
if err != nil {
errCh <- fmt.Errorf("failed to write result entry to the connection: %w", err)
return
}

// Write HeaderEntry
Expand All @@ -335,30 +363,31 @@ func TestStreamClientGetLatestL2Block(t *testing.T) {
_, err = serverConn.Write(he.Encode())
if err != nil {
errCh <- fmt.Errorf("failed to write header entry to the connection: %w", err)
return
}

// Read the Command
// Read the Command - expect CmdEntry
if err := readAndValidateUint(t, serverConn, uint64(CmdEntry), "command"); err != nil {
errCh <- err
errCh <- fmt.Errorf("failed to read CmdEntry: %w", err)
return
}

// Read the StreamType
if err := readAndValidateUint(t, serverConn, uint64(StSequencer), streamTypeFieldName); err != nil {
errCh <- err
errCh <- fmt.Errorf("failed to read stream type for entry: %w", err)
return
}

// Read the EntryNumber
if err := readAndValidateUint(t, serverConn, he.TotalEntries-1, "entry number"); err != nil {
errCh <- err
errCh <- fmt.Errorf("failed to read entry number: %w", err)
return
}

// Write the ResultEntry
// Write the ResultEntry for entry command
_, err = serverConn.Write(re.Encode())
if err != nil {
errCh <- fmt.Errorf("failed to write result entry to the connection: %w", err)
errCh <- fmt.Errorf("failed to write result entry for entry command: %w", err)
return
}

Expand Down
44 changes: 12 additions & 32 deletions zk/stages/stage_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type DatastreamClient interface {
GetEntryChan() *chan interface{}
GetL2BlockByNumber(blockNum uint64) (*types.FullL2Block, error)
GetLatestL2Block() (*types.FullL2Block, error)
LastUsedOptimizedAPI() bool
GetProgressAtomic() *atomic.Uint64
Start() error
Stop() error
Expand Down Expand Up @@ -238,7 +239,7 @@ func SpawnStageBatches(
}
}
// For X Layer
time.Sleep(5 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
}
getHighestDSL2BlockCost := time.Since(getHighestDSL2Blockstart)

Expand Down Expand Up @@ -854,60 +855,39 @@ func newStreamClient(ctx context.Context, cfg BatchesCfg, latestForkId uint64) (
}

type getHighestDSL2BlockStats struct {
getSeqCost time.Duration
getSeqCounter int
dsStart time.Duration
dsStartCounter int
dsGetBlockCost time.Duration
dsGetBlockCounter int
dsStopCost time.Duration
dsStopCounter int
dsUseOptimizedAPI bool
}

func (stats getHighestDSL2BlockStats) toString() string {
return fmt.Sprintf("getHighestDSL2BlockStats {getSeqCost: %v, getSeqCounter: %d, dsStart: %v, dsStartCounter: %d, dsGetBlockCost: %v, dsGetBlockCounter: %d, dsStopCost: %v, dsStopCounter: %d}",
stats.getSeqCost, stats.getSeqCounter, stats.dsStart, stats.dsStartCounter, stats.dsGetBlockCost, stats.dsGetBlockCounter, stats.dsStopCost, stats.dsStopCounter)
return fmt.Sprintf("getHighestDSL2BlockStats {dsStart: %v, dsStartCounter: %d, dsGetBlockCost: %v, dsGetBlockCounter: %d, dsStopCost: %v, dsStopCounter: %d, dsUseOptimizedAPI: %t}",
stats.dsStart, stats.dsStartCounter, stats.dsGetBlockCost, stats.dsGetBlockCounter, stats.dsStopCost, stats.dsStopCounter, stats.dsUseOptimizedAPI)
}

func getHighestDSL2Block(logPrefix string, ctx context.Context, batchCfg BatchesCfg, latestFork uint16, stats *getHighestDSL2BlockStats) (uint64, error) {
cfg := batchCfg.zkCfg

// first try the sequencer rpc endpoint, it might not have been upgraded to the
// latest version yet so if we get an error back from this call we can try the older
// method of calling the datastream directly
getSeqStart := time.Now()
highestBlock, err := GetSequencerHighestDataStreamBlock(cfg.L2RpcUrl)
stats.getSeqCost += time.Since(getSeqStart)
stats.getSeqCounter += 1
if err == nil {
return highestBlock, nil
}

// so something went wrong with the rpc call, let's try the older method,
// but we're going to open a new connection rather than use the one for syncing blocks.
// This is so we can keep the logic simple and just dispose of the connection when we're done
// greatly simplifying state juggling of the connection if it errors
// Get or create reusable client (X Layer optimization)
getDSStart := time.Now()
dsClient := buildNewStreamClient(ctx, batchCfg, latestFork)
err = dsClient.Start()
dsClient, err := getOrCreateQueryClient(ctx, batchCfg, latestFork)
stats.dsStart += time.Since(getDSStart)
stats.dsStartCounter += 1
if err != nil {
return 0, err
}
defer func() {
dsStopStart := time.Now()
if err := dsClient.Stop(); err != nil {
log.Error("problem stopping datastream client looking up latest ds l2 block", "err", err)
}
stats.dsStopCost += time.Since(dsStopStart)
stats.dsStopCounter += 1
}()

// Query latest L2Block using reused connection
dsGetlockStart := time.Now()
fullBlock, err := dsClient.GetLatestL2Block()
stats.dsGetBlockCost += time.Since(dsGetlockStart)
stats.dsGetBlockCounter += 1
stats.dsUseOptimizedAPI = dsClient.LastUsedOptimizedAPI()
if err != nil {
// Mark client as failed for next call to recreate
markQueryClientError(err)
return 0, err
}

Expand Down
Loading
Loading