Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/txpool/blobpool/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ func (q *conversionQueue) loop() {

case <-done:
done, interrupt = nil, nil
if len(txTasks) > 0 {
done, interrupt = make(chan struct{}), new(atomic.Int32)
tasks := slices.Clone(txTasks)
txTasks = txTasks[:0]
go q.run(tasks, done, interrupt)
}

case fn := <-q.startBilly:
q.billyQueue = append(q.billyQueue, fn)
Expand Down
70 changes: 70 additions & 0 deletions core/txpool/blobpool/conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package blobpool
import (
"crypto/ecdsa"
"crypto/sha256"
"sync"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -99,3 +101,71 @@ func TestConversionQueueDoubleClose(t *testing.T) {
queue.close()
queue.close() // Should not panic
}

func TestConversionQueueAutoRestartBatch(t *testing.T) {
queue := newConversionQueue()
defer queue.close()

key, _ := crypto.GenerateKey()

// Create a heavy transaction to ensure the first batch runs long enough
// for subsequent tasks to be queued while it is active.
heavy := makeMultiBlobTx(0, 1, 1, 1, int(params.BlobTxMaxBlobs), 0, key, types.BlobSidecarVersion0)

var wg sync.WaitGroup
wg.Add(1)
heavyDone := make(chan error, 1)
go func() {
defer wg.Done()
heavyDone <- queue.convert(heavy)
}()

// Give the conversion worker a head start so that the following tasks are
// enqueued while the first batch is running.
time.Sleep(200 * time.Millisecond)

tx1 := makeTx(1, 1, 1, 1, key)
tx2 := makeTx(2, 1, 1, 1, key)

wg.Add(2)
done1 := make(chan error, 1)
done2 := make(chan error, 1)
go func() { defer wg.Done(); done1 <- queue.convert(tx1) }()
go func() { defer wg.Done(); done2 <- queue.convert(tx2) }()

select {
case err := <-done1:
if err != nil {
t.Fatalf("tx1 conversion error: %v", err)
}
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for tx1 conversion")
}

select {
case err := <-done2:
if err != nil {
t.Fatalf("tx2 conversion error: %v", err)
}
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for tx2 conversion")
}

select {
case err := <-heavyDone:
if err != nil {
t.Fatalf("heavy conversion error: %v", err)
}
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for heavy conversion")
}

wg.Wait()

if tx1.BlobTxSidecar().Version != types.BlobSidecarVersion1 {
t.Fatalf("tx1 sidecar version mismatch: have %d, want %d", tx1.BlobTxSidecar().Version, types.BlobSidecarVersion1)
}
if tx2.BlobTxSidecar().Version != types.BlobSidecarVersion1 {
t.Fatalf("tx2 sidecar version mismatch: have %d, want %d", tx2.BlobTxSidecar().Version, types.BlobSidecarVersion1)
}
}