Skip to content
This repository was archived by the owner on Feb 17, 2025. It is now read-only.

Commit eb155ae

Browse files
authored
Do fatal when datastream channel is full (workaround to fix datastream blocking issue) (#3650)
* Do fatal when datastream channel is full (this will restart sequencer automatically) * update datastream library (more ds-debug logs)
1 parent 1b091ec commit eb155ae

File tree

8 files changed

+37
-43
lines changed

8 files changed

+37
-43
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/0xPolygonHermez/zkevm-node
33
go 1.21
44

55
require (
6-
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1
6+
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b
77
github.com/didip/tollbooth/v6 v6.1.2
88
github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127
99
github.com/ethereum/go-ethereum v1.13.11

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
3939
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
4040
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
4141
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
42-
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1 h1:4wbCJOGcZ8BTuOfNFrcZ1cAVfTWaX1W9EYHaDx3imLc=
43-
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE=
42+
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b h1:BzQRXbSnW7BsFvJrnZbCgnxD5+nCGyrYUgqH+3vsnrM=
43+
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE=
4444
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
4545
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
4646
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=

sequencer/batch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ func (f *finalizer) insertSIPBatch(ctx context.Context, batchNumber uint64, stat
335335
}
336336

337337
// Send batch bookmark to the datastream
338-
f.DSSendBatchBookmark(batchNumber)
338+
f.DSSendBatchBookmark(ctx, batchNumber)
339339

340340
// Check if synchronizer is up-to-date
341341
//TODO: review if this is needed

sequencer/datastreamer.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package sequencer
22

33
import (
4-
"github.com/0xPolygonHermez/zkevm-node/log"
4+
"context"
5+
"fmt"
6+
57
"github.com/0xPolygonHermez/zkevm-node/state"
68
)
79

8-
func (f *finalizer) DSSendL2Block(batchNumber uint64, blockResponse *state.ProcessBlockResponse, l1InfoTreeIndex uint32) error {
10+
func (f *finalizer) DSSendL2Block(ctx context.Context, batchNumber uint64, blockResponse *state.ProcessBlockResponse, l1InfoTreeIndex uint32) error {
911
forkID := f.stateIntf.GetForkIDByBatchNumber(batchNumber)
1012

1113
// Send data to streamer
@@ -43,23 +45,36 @@ func (f *finalizer) DSSendL2Block(batchNumber uint64, blockResponse *state.Proce
4345
l2Transactions = append(l2Transactions, l2Transaction)
4446
}
4547

46-
log.Infof("[ds-debug] sending l2block %d to datastream channel", blockResponse.BlockNumber)
48+
f.checkDSBufferIsFull(ctx)
49+
4750
f.dataToStream <- state.DSL2FullBlock{
4851
DSL2Block: l2Block,
4952
Txs: l2Transactions,
5053
}
54+
55+
f.dataToStreamCount.Add(1)
5156
}
5257

5358
return nil
5459
}
5560

56-
func (f *finalizer) DSSendBatchBookmark(batchNumber uint64) {
61+
func (f *finalizer) DSSendBatchBookmark(ctx context.Context, batchNumber uint64) {
5762
// Check if stream server enabled
5863
if f.streamServer != nil {
64+
f.checkDSBufferIsFull(ctx)
65+
5966
// Send batch bookmark to the streamer
6067
f.dataToStream <- state.DSBookMark{
6168
Type: state.BookMarkTypeBatch,
6269
Value: batchNumber,
6370
}
71+
72+
f.dataToStreamCount.Add(1)
73+
}
74+
}
75+
76+
func (f *finalizer) checkDSBufferIsFull(ctx context.Context) {
77+
if f.dataToStreamCount.Load() == datastreamChannelBufferSize {
78+
f.Halt(ctx, fmt.Errorf("datastream channel buffer full"), true)
6479
}
6580
}

sequencer/finalizer.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,9 @@ type finalizer struct {
8383
// interval metrics
8484
metrics *intervalMetrics
8585
// stream server
86-
streamServer *datastreamer.StreamServer
87-
dataToStream chan interface{}
86+
streamServer *datastreamer.StreamServer
87+
dataToStream chan interface{}
88+
dataToStreamCount atomic.Int32
8889
}
8990

9091
// newFinalizer returns a new instance of Finalizer.
@@ -885,6 +886,11 @@ func (f *finalizer) logZKCounters(counters state.ZKCounters) string {
885886
counters.Binaries, counters.Sha256Hashes_V2, counters.Steps)
886887
}
887888

889+
// Decrease datastreamChannelCount variable
890+
func (f *finalizer) DatastreamChannelCountAdd(ct int32) {
891+
f.dataToStreamCount.Add(ct)
892+
}
893+
888894
// Halt halts the finalizer
889895
func (f *finalizer) Halt(ctx context.Context, err error, isFatal bool) {
890896
f.haltFinalizer.Store(true)

sequencer/forcedbatch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ func (f *finalizer) handleProcessForcedBatchResponse(ctx context.Context, newBat
198198
}
199199

200200
// Send L2 block to data streamer
201-
err = f.DSSendL2Block(newBatchNumber, forcedL2BlockResponse, 0)
201+
err = f.DSSendL2Block(ctx, newBatchNumber, forcedL2BlockResponse, 0)
202202
if err != nil {
203203
//TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer?
204204
log.Errorf("error sending L2 block %d to data streamer, error: %v", forcedL2BlockResponse.BlockNumber, err)

sequencer/l2block.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -479,9 +479,6 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error {
479479
return err
480480
}
481481

482-
//TODO: remove this Log
483-
log.Infof("[ds-debug] l2 block %d [%d] stored in statedb", blockResponse.BlockNumber, l2Block.trackingNum)
484-
485482
// Update txs status in the pool
486483
for _, txResponse := range blockResponse.TransactionResponses {
487484
// Change Tx status to selected
@@ -491,19 +488,13 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error {
491488
}
492489
}
493490

494-
//TODO: remove this log
495-
log.Infof("[ds-debug] l2 block %d [%d] transactions updated as selected in the pooldb", blockResponse.BlockNumber, l2Block.trackingNum)
496-
497491
// Send L2 block to data streamer
498-
err = f.DSSendL2Block(l2Block.batch.batchNumber, blockResponse, l2Block.getL1InfoTreeIndex())
492+
err = f.DSSendL2Block(ctx, l2Block.batch.batchNumber, blockResponse, l2Block.getL1InfoTreeIndex())
499493
if err != nil {
500494
//TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer?
501495
log.Errorf("error sending L2 block %d [%d] to data streamer, error: %v", blockResponse.BlockNumber, l2Block.trackingNum, err)
502496
}
503497

504-
//TODO: remove this log
505-
log.Infof("[ds-debug] l2 block %d [%d] sent to datastream", blockResponse.BlockNumber, l2Block.trackingNum)
506-
507498
for _, tx := range l2Block.transactions {
508499
// Delete the tx from the pending list in the worker (addrQueue)
509500
f.workerIntf.DeleteTxPendingToStore(tx.Hash, tx.From)

sequencer/sequencer.go

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
)
1616

1717
const (
18-
datastreamChannelMultiplier = 2
18+
datastreamChannelBufferSize = 10
1919
)
2020

2121
// Sequencer represents a sequencer
@@ -59,9 +59,7 @@ func New(cfg Config, batchCfg state.BatchConfig, poolCfg pool.Config, txPool txP
5959
eventLog: eventLog,
6060
}
6161

62-
// TODO: Make configurable
63-
channelBufferSize := 200 * datastreamChannelMultiplier // nolint:gomnd
64-
sequencer.dataToStream = make(chan interface{}, channelBufferSize)
62+
sequencer.dataToStream = make(chan interface{}, datastreamChannelBufferSize)
6563

6664
return sequencer, nil
6765
}
@@ -270,8 +268,6 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
270268
case state.DSL2FullBlock:
271269
l2Block := data
272270

273-
//TODO: remove this log
274-
log.Infof("[ds-debug] start atomic op for l2block %d", l2Block.L2BlockNumber)
275271
err = s.streamServer.StartAtomicOp()
276272
if err != nil {
277273
log.Errorf("failed to start atomic op for l2block %d, error: %v ", l2Block.L2BlockNumber, err)
@@ -283,8 +279,6 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
283279
Value: l2Block.L2BlockNumber,
284280
}
285281

286-
//TODO: remove this log
287-
log.Infof("[ds-debug] add stream bookmark for l2block %d", l2Block.L2BlockNumber)
288282
_, err = s.streamServer.AddStreamBookmark(bookMark.Encode())
289283
if err != nil {
290284
log.Errorf("failed to add stream bookmark for l2block %d, error: %v", l2Block.L2BlockNumber, err)
@@ -299,8 +293,6 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
299293
Value: l2Block.L2BlockNumber - 1,
300294
}
301295

302-
//TODO: remove this log
303-
log.Infof("[ds-debug] get previous l2block %d", l2Block.L2BlockNumber-1)
304296
previousL2BlockEntry, err := s.streamServer.GetFirstEventAfterBookmark(bookMark.Encode())
305297
if err != nil {
306298
log.Errorf("failed to get previous l2block %d, error: %v", l2Block.L2BlockNumber-1, err)
@@ -323,16 +315,12 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
323315
ChainID: uint32(chainID),
324316
}
325317

326-
//TODO: remove this log
327-
log.Infof("[ds-debug] add l2blockStart stream entry for l2block %d", l2Block.L2BlockNumber)
328318
_, err = s.streamServer.AddStreamEntry(state.EntryTypeL2BlockStart, blockStart.Encode())
329319
if err != nil {
330320
log.Errorf("failed to add stream entry for l2block %d, error: %v", l2Block.L2BlockNumber, err)
331321
continue
332322
}
333323

334-
//TODO: remove this log
335-
log.Infof("[ds-debug] adding l2tx stream entries for l2block %d", l2Block.L2BlockNumber)
336324
for _, l2Transaction := range l2Block.Txs {
337325
_, err = s.streamServer.AddStreamEntry(state.EntryTypeL2Tx, l2Transaction.Encode())
338326
if err != nil {
@@ -347,25 +335,17 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
347335
StateRoot: l2Block.StateRoot,
348336
}
349337

350-
//TODO: remove this log
351-
log.Infof("[ds-debug] add l2blockEnd stream entry for l2block %d", l2Block.L2BlockNumber)
352338
_, err = s.streamServer.AddStreamEntry(state.EntryTypeL2BlockEnd, blockEnd.Encode())
353339
if err != nil {
354340
log.Errorf("failed to add stream entry for l2block %d, error: %v", l2Block.L2BlockNumber, err)
355341
continue
356342
}
357343

358-
//TODO: remove this log
359-
log.Infof("[ds-debug] commit atomic op for l2block %d", l2Block.L2BlockNumber)
360344
err = s.streamServer.CommitAtomicOp()
361345
if err != nil {
362346
log.Errorf("failed to commit atomic op for l2block %d, error: %v ", l2Block.L2BlockNumber, err)
363347
continue
364348
}
365-
366-
//TODO: remove this log
367-
log.Infof("[ds-debug] l2block %d sent to datastream", l2Block.L2BlockNumber)
368-
369349
// Stream a bookmark
370350
case state.DSBookMark:
371351
bookmark := data
@@ -392,6 +372,8 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
392372
log.Errorf("invalid stream message type received")
393373
}
394374
}
375+
376+
s.finalizer.DatastreamChannelCountAdd(-1)
395377
}
396378
}
397379

0 commit comments

Comments
 (0)