diff --git a/core/txpool/blobpool/conversion.go b/core/txpool/blobpool/conversion.go index 80b97af5d7a..afdc10554f6 100644 --- a/core/txpool/blobpool/conversion.go +++ b/core/txpool/blobpool/conversion.go @@ -161,6 +161,12 @@ func (q *conversionQueue) loop() { case <-done: done, interrupt = nil, nil + if len(txTasks) > 0 { + done, interrupt = make(chan struct{}), new(atomic.Int32) + tasks := slices.Clone(txTasks) + txTasks = txTasks[:0] + go q.run(tasks, done, interrupt) + } case fn := <-q.startBilly: q.billyQueue = append(q.billyQueue, fn) diff --git a/core/txpool/blobpool/conversion_test.go b/core/txpool/blobpool/conversion_test.go index a9fd26dbaf6..7ffffb2e4d3 100644 --- a/core/txpool/blobpool/conversion_test.go +++ b/core/txpool/blobpool/conversion_test.go @@ -19,7 +19,9 @@ package blobpool import ( "crypto/ecdsa" "crypto/sha256" + "sync" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -99,3 +101,71 @@ func TestConversionQueueDoubleClose(t *testing.T) { queue.close() queue.close() // Should not panic } + +func TestConversionQueueAutoRestartBatch(t *testing.T) { + queue := newConversionQueue() + defer queue.close() + + key, _ := crypto.GenerateKey() + + // Create a heavy transaction to ensure the first batch runs long enough + // for subsequent tasks to be queued while it is active. + heavy := makeMultiBlobTx(0, 1, 1, 1, int(params.BlobTxMaxBlobs), 0, key, types.BlobSidecarVersion0) + + var wg sync.WaitGroup + wg.Add(1) + heavyDone := make(chan error, 1) + go func() { + defer wg.Done() + heavyDone <- queue.convert(heavy) + }() + + // Give the conversion worker a head start so that the following tasks are + // enqueued while the first batch is running. + time.Sleep(200 * time.Millisecond) + + tx1 := makeTx(1, 1, 1, 1, key) + tx2 := makeTx(2, 1, 1, 1, key) + + wg.Add(2) + done1 := make(chan error, 1) + done2 := make(chan error, 1) + go func() { defer wg.Done(); done1 <- queue.convert(tx1) }() + go func() { defer wg.Done(); done2 <- queue.convert(tx2) }() + + select { + case err := <-done1: + if err != nil { + t.Fatalf("tx1 conversion error: %v", err) + } + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for tx1 conversion") + } + + select { + case err := <-done2: + if err != nil { + t.Fatalf("tx2 conversion error: %v", err) + } + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for tx2 conversion") + } + + select { + case err := <-heavyDone: + if err != nil { + t.Fatalf("heavy conversion error: %v", err) + } + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for heavy conversion") + } + + wg.Wait() + + if tx1.BlobTxSidecar().Version != types.BlobSidecarVersion1 { + t.Fatalf("tx1 sidecar version mismatch: have %d, want %d", tx1.BlobTxSidecar().Version, types.BlobSidecarVersion1) + } + if tx2.BlobTxSidecar().Version != types.BlobSidecarVersion1 { + t.Fatalf("tx2 sidecar version mismatch: have %d, want %d", tx2.BlobTxSidecar().Version, types.BlobSidecarVersion1) + } +}