Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions .github/workflows/nightly-race.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Nightly Race Tests

on:
schedule:
# Run at 2 AM UTC every day
- cron: '0 2 * * *'
workflow_dispatch: # Allow manual trigger

jobs:
race-tests:
runs-on: ubuntu-latest
timeout-minutes: 60

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: 'go.mod'
cache: true

- name: Run race tests
run: make test-race

- name: Notify on failure
if: failure()
uses: slackapi/slack-github-action@v1
with:
payload: |
{
"text": ":warning: Nightly race tests failed on ${{ github.repository }}\n<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|View Run>"
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}
47 changes: 35 additions & 12 deletions consensus/bor/heimdall/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,42 @@ import (
// requests to the mock heimdal server for specific functions. Add more handlers
// according to requirements.
type HttpHandlerFake struct {
mu sync.RWMutex
handleFetchCheckpoint http.HandlerFunc
handleFetchMilestone http.HandlerFunc
}

func (h *HttpHandlerFake) SetCheckpointHandler(handler http.HandlerFunc) {
h.mu.Lock()
defer h.mu.Unlock()
h.handleFetchCheckpoint = handler
}

func (h *HttpHandlerFake) SetMilestoneHandler(handler http.HandlerFunc) {
h.mu.Lock()
defer h.mu.Unlock()
h.handleFetchMilestone = handler
}

func (h *HttpHandlerFake) GetCheckpointHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
h.handleFetchCheckpoint.ServeHTTP(w, r)
h.mu.RLock()
handler := h.handleFetchCheckpoint
h.mu.RUnlock()
if handler != nil {
handler.ServeHTTP(w, r)
}
}
}

func (h *HttpHandlerFake) GetMilestoneHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
h.handleFetchMilestone.ServeHTTP(w, r)
h.mu.RLock()
handler := h.handleFetchMilestone
h.mu.RUnlock()
if handler != nil {
handler.ServeHTTP(w, r)
}
}
}

Expand Down Expand Up @@ -97,7 +120,7 @@ func TestFetchCheckpointFromMockHeimdall(t *testing.T) {
}

handler := &HttpHandlerFake{}
handler.handleFetchCheckpoint = func(w http.ResponseWriter, _ *http.Request) {
handler.SetCheckpointHandler(func(w http.ResponseWriter, _ *http.Request) {
err := json.NewEncoder(w).Encode(gRPCGatewayCheckpointResponseV2{
Result: gRPCGatewayCheckpointV2{
Proposer: common.Address{},
Expand All @@ -112,7 +135,7 @@ func TestFetchCheckpointFromMockHeimdall(t *testing.T) {
if err != nil {
w.WriteHeader(500) // Return 500 Internal Server Error.
}
}
})

// Fetch available port
port, listener, err := network.FindAvailablePort()
Expand Down Expand Up @@ -165,7 +188,7 @@ func TestFetchMilestoneFromMockHeimdall(t *testing.T) {
}

handler := &HttpHandlerFake{}
handler.handleFetchMilestone = func(w http.ResponseWriter, _ *http.Request) {
handler.SetMilestoneHandler(func(w http.ResponseWriter, _ *http.Request) {
err := json.NewEncoder(w).Encode(gRPCGatewayMilestoneResponseV2{
Result: gRPCGatewayMilestoneV2{
Proposer: common.Address{},
Expand All @@ -181,7 +204,7 @@ func TestFetchMilestoneFromMockHeimdall(t *testing.T) {
if err != nil {
w.WriteHeader(500) // Return 500 Internal Server Error.
}
}
})

// Fetch available port
port, listener, err := network.FindAvailablePort()
Expand Down Expand Up @@ -222,7 +245,7 @@ func TestFetchShutdown(t *testing.T) {
// Case1 - Testing context timeout: Create delay in serving requests for simulating timeout. Add delay slightly
// greater than `retryDelay`. This should cause the request to timeout and trigger shutdown
// due to `ctx.Done()`. Expect context timeout error.
handler.handleFetchCheckpoint = func(w http.ResponseWriter, _ *http.Request) {
handler.SetCheckpointHandler(func(w http.ResponseWriter, _ *http.Request) {
time.Sleep(100 * time.Millisecond)

err := json.NewEncoder(w).Encode(checkpoint.CheckpointResponse{
Expand All @@ -239,7 +262,7 @@ func TestFetchShutdown(t *testing.T) {
if err != nil {
w.WriteHeader(500) // Return 500 Internal Server Error.
}
}
})

// Fetch available port
port, listener, err := network.FindAvailablePort()
Expand All @@ -264,10 +287,10 @@ func TestFetchShutdown(t *testing.T) {
// Case2 - Testing context cancellation. Pass a context with timeout to the request and
// cancel it before timeout. This should cause the request to timeout and trigger shutdown
// due to `ctx.Done()`. Expect context cancellation error.
handler.handleFetchCheckpoint = func(w http.ResponseWriter, _ *http.Request) {
handler.SetCheckpointHandler(func(w http.ResponseWriter, _ *http.Request) {
time.Sleep(10 * time.Millisecond)
w.WriteHeader(500) // Return 500 Internal Server Error.
}
})

ctx, cancel = context.WithTimeout(t.Context(), 50*time.Millisecond) // Use some high value for timeout

Expand All @@ -285,9 +308,9 @@ func TestFetchShutdown(t *testing.T) {
// Case3 - Testing interrupt: Closing the `closeCh` in heimdall client simulating interrupt. This
// should cause the request to fail and throw an error due to `<-closeCh` in fetchWithRetry.
// Expect shutdown detected error.
handler.handleFetchCheckpoint = func(w http.ResponseWriter, _ *http.Request) {
handler.SetCheckpointHandler(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(500) // Return 500 Internal Server Error.
}
})

// Close the channel after a delay until we make request
go func() {
Expand Down
22 changes: 11 additions & 11 deletions consensus/bor/span_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,12 +1372,12 @@ func TestSpanStore_ConcurrentAccess(t *testing.T) {
// TimeoutHeimdallClient simulates a heimdall client that times out or hangs on requests
type TimeoutHeimdallClient struct {
timeout time.Duration
shouldTimeout bool
shouldHangSpan bool
shouldTimeout atomic.Bool
shouldHangSpan atomic.Bool
}

func (h *TimeoutHeimdallClient) GetSpan(ctx context.Context, spanID uint64) (*types.Span, error) {
if h.shouldTimeout {
if h.shouldTimeout.Load() {
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand Down Expand Up @@ -1418,7 +1418,7 @@ func (h *TimeoutHeimdallClient) GetSpan(ctx context.Context, spanID uint64) (*ty
}

func (h *TimeoutHeimdallClient) GetLatestSpan(ctx context.Context) (*types.Span, error) {
if h.shouldHangSpan {
if h.shouldHangSpan.Load() {
// Simulate a hanging request that would block indefinitely
select {
case <-ctx.Done():
Expand All @@ -1431,7 +1431,7 @@ func (h *TimeoutHeimdallClient) GetLatestSpan(ctx context.Context) (*types.Span,
}

func (h *TimeoutHeimdallClient) FetchStatus(ctx context.Context) (*ctypes.SyncInfo, error) {
if h.shouldTimeout {
if h.shouldTimeout.Load() {
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand Down Expand Up @@ -1464,9 +1464,9 @@ func TestSpanStore_HeimdallDownTimeout(t *testing.T) {
t.Run("heimdallStatus set to nil on FetchStatus error", func(t *testing.T) {
// Create a client that will fail on FetchStatus
mockClient := &TimeoutHeimdallClient{
shouldTimeout: true,
timeout: 10 * time.Millisecond, // Quick failure
timeout: 10 * time.Millisecond, // Quick failure
}
mockClient.shouldTimeout.Store(true)

spanStore := NewSpanStore(mockClient, nil, "1337")
defer spanStore.Close()
Expand All @@ -1491,9 +1491,9 @@ func TestSpanStore_HeimdallDownTimeout(t *testing.T) {
t.Run("background goroutine sets heimdallStatus to nil on persistent errors", func(t *testing.T) {
// Create a client that starts working then fails
mockClient := &TimeoutHeimdallClient{
shouldTimeout: false,
timeout: 10 * time.Millisecond,
timeout: 10 * time.Millisecond,
}
mockClient.shouldTimeout.Store(false)

spanStore := NewSpanStore(mockClient, nil, "1337")
defer spanStore.Close()
Expand All @@ -1506,7 +1506,7 @@ func TestSpanStore_HeimdallDownTimeout(t *testing.T) {
require.NotNil(t, status, "Status should be set initially")

// Now make FetchStatus fail
mockClient.shouldTimeout = true
mockClient.shouldTimeout.Store(true)

// Wait for the background goroutine to encounter the error
time.Sleep(500 * time.Millisecond)
Expand All @@ -1516,7 +1516,7 @@ func TestSpanStore_HeimdallDownTimeout(t *testing.T) {
require.Nil(t, status, "heimdallStatus should be nil after FetchStatus starts failing")

// Now make it work again
mockClient.shouldTimeout = false
mockClient.shouldTimeout.Store(false)

// Wait for recovery
time.Sleep(500 * time.Millisecond)
Expand Down
9 changes: 5 additions & 4 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"path"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -4784,8 +4785,8 @@ func (m *mockChainValidator) GetMilestoneIDsList() []string { return nil }
type mockFailingEngine struct {
*ethash.Ethash
shouldFailHeader map[uint64]bool
allowInitialInsertion bool // Allow initial insertion to succeed
insertionComplete bool // Track when insertion is complete
allowInitialInsertion bool // Allow initial insertion to succeed
insertionComplete atomic.Bool // Track when insertion is complete
}

func (m *mockFailingEngine) VerifyHeaders(chain consensus.ChainHeaderReader, headers []*types.Header) (chan<- struct{}, <-chan error) {
Expand All @@ -4800,7 +4801,7 @@ func (m *mockFailingEngine) VerifyHeaders(chain consensus.ChainHeaderReader, hea
return
default:
// If we're allowing initial insertion and it's not complete yet, succeed
if m.allowInitialInsertion && !m.insertionComplete {
if m.allowInitialInsertion && !m.insertionComplete.Load() {
results <- nil
} else if m.shouldFailHeader != nil && m.shouldFailHeader[header.Number.Uint64()] {
results <- errors.New("mock header verification failure")
Expand All @@ -4815,7 +4816,7 @@ func (m *mockFailingEngine) VerifyHeaders(chain consensus.ChainHeaderReader, hea
}

func (m *mockFailingEngine) markInsertionComplete() {
m.insertionComplete = true
m.insertionComplete.Store(true)
}

// TestHeaderVerificationLoop tests the background header verification functionality
Expand Down
24 changes: 22 additions & 2 deletions core/txpool/locals/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"io/fs"
"os"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -46,6 +47,7 @@ func (*devNull) Close() error { return nil }
type journal struct {
path string // Filesystem path to store the transactions at
writer io.WriteCloser // Output stream to write new transactions into
mu sync.Mutex // Protects writer for concurrent access
}

// newTxJournal creates a new transaction journal to
Expand All @@ -55,6 +57,12 @@ func newTxJournal(path string) *journal {
}
}

func (journal *journal) setWriter(w io.WriteCloser) {
journal.mu.Lock()
defer journal.mu.Unlock()
journal.writer = w
}

// load parses a transaction journal dump from disk, loading its contents into
// the specified pool.
func (journal *journal) load(add func([]*types.Transaction) []error) error {
Expand All @@ -72,8 +80,8 @@ func (journal *journal) load(add func([]*types.Transaction) []error) error {
defer input.Close()

// Temporarily discard any journal additions (don't double add on load)
journal.writer = new(devNull)
defer func() { journal.writer = nil }()
journal.setWriter(new(devNull))
defer journal.setWriter(nil)

// Inject all transactions from the journal into the pool
stream := rlp.NewStream(input, 0)
Expand Down Expand Up @@ -125,6 +133,9 @@ func (journal *journal) load(add func([]*types.Transaction) []error) error {
}

func (journal *journal) setupWriter() error {
journal.mu.Lock()
defer journal.mu.Unlock()

if journal.writer != nil {
if err := journal.writer.Close(); err != nil {
return err
Expand All @@ -145,6 +156,9 @@ func (journal *journal) setupWriter() error {

// insert adds the specified transaction to the local disk journal.
func (journal *journal) insert(tx *types.Transaction) error {
journal.mu.Lock()
defer journal.mu.Unlock()

if journal.writer == nil {
return errNoActiveJournal
}
Expand All @@ -159,6 +173,9 @@ func (journal *journal) insert(tx *types.Transaction) error {
// rotate regenerates the transaction journal based on the current contents of
// the transaction pool.
func (journal *journal) rotate(all map[common.Address]types.Transactions) error {
journal.mu.Lock()
defer journal.mu.Unlock()

// Close the current journal (if any is open)
if journal.writer != nil {
if err := journal.writer.Close(); err != nil {
Expand Down Expand Up @@ -211,6 +228,9 @@ func (journal *journal) rotate(all map[common.Address]types.Transactions) error

// close flushes the transaction journal contents to disk and closes the file.
func (journal *journal) close() error {
journal.mu.Lock()
defer journal.mu.Unlock()

var err error
if journal.writer != nil {
err = journal.writer.Close()
Expand Down
17 changes: 9 additions & 8 deletions triedb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package triedb

import (
"errors"
"sync/atomic"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
Expand Down Expand Up @@ -84,10 +85,10 @@ type backend interface {
// relevant with trie nodes and node preimages.
type Database struct {
disk ethdb.Database
config *Config // Configuration for trie database
preimages *preimageStore // The store for caching preimages
backend backend // The backend for managing trie nodes
readBackend backend
config *Config // Configuration for trie database
preimages *preimageStore // The store for caching preimages
backend backend // The backend for managing trie nodes
readBackend atomic.Pointer[backend] // Lock-free for concurrent reads, supports nil
}

// NewDatabase initializes the trie database with default settings, note
Expand Down Expand Up @@ -117,15 +118,15 @@ func NewDatabase(diskdb ethdb.Database, config *Config) *Database {
return db
}

func (db *Database) SetReadBackend(backend backend) {
db.readBackend = backend
func (db *Database) SetReadBackend(b backend) {
db.readBackend.Store(&b)
}

// Reader returns a reader for accessing all trie nodes with provided state root.
// An error will be returned if the requested state is not available.
func (db *Database) NodeReader(blockRoot common.Hash) (database.NodeReader, error) {
if db.readBackend != nil {
return db.readBackend.NodeReader(blockRoot)
if rbPtr := db.readBackend.Load(); rbPtr != nil && *rbPtr != nil {
return (*rbPtr).NodeReader(blockRoot)
}
return db.backend.NodeReader(blockRoot)
}
Expand Down
Loading