Skip to content

Commit ed902e3

Browse files
authored
Merge pull request #2027 from 0xPolygon/fix_race_conditions
Fix race conditions detected in make test-race
2 parents 0caee33 + 3c1c05e commit ed902e3

File tree

6 files changed

+118
-37
lines changed

6 files changed

+118
-37
lines changed

.github/workflows/nightly-race.yml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
name: Nightly Race Tests
2+
3+
on:
4+
schedule:
5+
# Run at 2 AM UTC every day
6+
- cron: '0 2 * * *'
7+
workflow_dispatch: # Allow manual trigger
8+
9+
jobs:
10+
race-tests:
11+
runs-on: ubuntu-latest
12+
timeout-minutes: 60
13+
14+
steps:
15+
- name: Checkout code
16+
uses: actions/checkout@v4
17+
18+
- name: Set up Go
19+
uses: actions/setup-go@v5
20+
with:
21+
go-version-file: 'go.mod'
22+
cache: true
23+
24+
- name: Run race tests
25+
run: make test-race
26+
27+
- name: Notify on failure
28+
if: failure()
29+
uses: slackapi/slack-github-action@v1
30+
with:
31+
payload: |
32+
{
33+
"text": ":warning: Nightly race tests failed on ${{ github.repository }}\n<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|View Run>"
34+
}
35+
env:
36+
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}

consensus/bor/heimdall/client_test.go

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,42 @@ import (
2222
// requests to the mock heimdal server for specific functions. Add more handlers
2323
// according to requirements.
2424
type HttpHandlerFake struct {
25+
mu sync.RWMutex
2526
handleFetchCheckpoint http.HandlerFunc
2627
handleFetchMilestone http.HandlerFunc
2728
}
2829

30+
func (h *HttpHandlerFake) SetCheckpointHandler(handler http.HandlerFunc) {
31+
h.mu.Lock()
32+
defer h.mu.Unlock()
33+
h.handleFetchCheckpoint = handler
34+
}
35+
36+
func (h *HttpHandlerFake) SetMilestoneHandler(handler http.HandlerFunc) {
37+
h.mu.Lock()
38+
defer h.mu.Unlock()
39+
h.handleFetchMilestone = handler
40+
}
41+
2942
func (h *HttpHandlerFake) GetCheckpointHandler() http.HandlerFunc {
3043
return func(w http.ResponseWriter, r *http.Request) {
31-
h.handleFetchCheckpoint.ServeHTTP(w, r)
44+
h.mu.RLock()
45+
handler := h.handleFetchCheckpoint
46+
h.mu.RUnlock()
47+
if handler != nil {
48+
handler.ServeHTTP(w, r)
49+
}
3250
}
3351
}
3452

3553
func (h *HttpHandlerFake) GetMilestoneHandler() http.HandlerFunc {
3654
return func(w http.ResponseWriter, r *http.Request) {
37-
h.handleFetchMilestone.ServeHTTP(w, r)
55+
h.mu.RLock()
56+
handler := h.handleFetchMilestone
57+
h.mu.RUnlock()
58+
if handler != nil {
59+
handler.ServeHTTP(w, r)
60+
}
3861
}
3962
}
4063

@@ -97,7 +120,7 @@ func TestFetchCheckpointFromMockHeimdall(t *testing.T) {
97120
}
98121

99122
handler := &HttpHandlerFake{}
100-
handler.handleFetchCheckpoint = func(w http.ResponseWriter, _ *http.Request) {
123+
handler.SetCheckpointHandler(func(w http.ResponseWriter, _ *http.Request) {
101124
err := json.NewEncoder(w).Encode(gRPCGatewayCheckpointResponseV2{
102125
Result: gRPCGatewayCheckpointV2{
103126
Proposer: common.Address{},
@@ -112,7 +135,7 @@ func TestFetchCheckpointFromMockHeimdall(t *testing.T) {
112135
if err != nil {
113136
w.WriteHeader(500) // Return 500 Internal Server Error.
114137
}
115-
}
138+
})
116139

117140
// Fetch available port
118141
port, listener, err := network.FindAvailablePort()
@@ -165,7 +188,7 @@ func TestFetchMilestoneFromMockHeimdall(t *testing.T) {
165188
}
166189

167190
handler := &HttpHandlerFake{}
168-
handler.handleFetchMilestone = func(w http.ResponseWriter, _ *http.Request) {
191+
handler.SetMilestoneHandler(func(w http.ResponseWriter, _ *http.Request) {
169192
err := json.NewEncoder(w).Encode(gRPCGatewayMilestoneResponseV2{
170193
Result: gRPCGatewayMilestoneV2{
171194
Proposer: common.Address{},
@@ -181,7 +204,7 @@ func TestFetchMilestoneFromMockHeimdall(t *testing.T) {
181204
if err != nil {
182205
w.WriteHeader(500) // Return 500 Internal Server Error.
183206
}
184-
}
207+
})
185208

186209
// Fetch available port
187210
port, listener, err := network.FindAvailablePort()
@@ -222,7 +245,7 @@ func TestFetchShutdown(t *testing.T) {
222245
// Case1 - Testing context timeout: Create delay in serving requests for simulating timeout. Add delay slightly
223246
// greater than `retryDelay`. This should cause the request to timeout and trigger shutdown
224247
// due to `ctx.Done()`. Expect context timeout error.
225-
handler.handleFetchCheckpoint = func(w http.ResponseWriter, _ *http.Request) {
248+
handler.SetCheckpointHandler(func(w http.ResponseWriter, _ *http.Request) {
226249
time.Sleep(100 * time.Millisecond)
227250

228251
err := json.NewEncoder(w).Encode(checkpoint.CheckpointResponse{
@@ -239,7 +262,7 @@ func TestFetchShutdown(t *testing.T) {
239262
if err != nil {
240263
w.WriteHeader(500) // Return 500 Internal Server Error.
241264
}
242-
}
265+
})
243266

244267
// Fetch available port
245268
port, listener, err := network.FindAvailablePort()
@@ -264,10 +287,10 @@ func TestFetchShutdown(t *testing.T) {
264287
// Case2 - Testing context cancellation. Pass a context with timeout to the request and
265288
// cancel it before timeout. This should cause the request to timeout and trigger shutdown
266289
// due to `ctx.Done()`. Expect context cancellation error.
267-
handler.handleFetchCheckpoint = func(w http.ResponseWriter, _ *http.Request) {
290+
handler.SetCheckpointHandler(func(w http.ResponseWriter, _ *http.Request) {
268291
time.Sleep(10 * time.Millisecond)
269292
w.WriteHeader(500) // Return 500 Internal Server Error.
270-
}
293+
})
271294

272295
ctx, cancel = context.WithTimeout(t.Context(), 50*time.Millisecond) // Use some high value for timeout
273296

@@ -285,9 +308,9 @@ func TestFetchShutdown(t *testing.T) {
285308
// Case3 - Testing interrupt: Closing the `closeCh` in heimdall client simulating interrupt. This
286309
// should cause the request to fail and throw an error due to `<-closeCh` in fetchWithRetry.
287310
// Expect shutdown detected error.
288-
handler.handleFetchCheckpoint = func(w http.ResponseWriter, _ *http.Request) {
311+
handler.SetCheckpointHandler(func(w http.ResponseWriter, _ *http.Request) {
289312
w.WriteHeader(500) // Return 500 Internal Server Error.
290-
}
313+
})
291314

292315
// Close the channel after a delay until we make request
293316
go func() {

consensus/bor/span_store_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1372,12 +1372,12 @@ func TestSpanStore_ConcurrentAccess(t *testing.T) {
13721372
// TimeoutHeimdallClient simulates a heimdall client that times out or hangs on requests
13731373
type TimeoutHeimdallClient struct {
13741374
timeout time.Duration
1375-
shouldTimeout bool
1376-
shouldHangSpan bool
1375+
shouldTimeout atomic.Bool
1376+
shouldHangSpan atomic.Bool
13771377
}
13781378

13791379
func (h *TimeoutHeimdallClient) GetSpan(ctx context.Context, spanID uint64) (*types.Span, error) {
1380-
if h.shouldTimeout {
1380+
if h.shouldTimeout.Load() {
13811381
select {
13821382
case <-ctx.Done():
13831383
return nil, ctx.Err()
@@ -1418,7 +1418,7 @@ func (h *TimeoutHeimdallClient) GetSpan(ctx context.Context, spanID uint64) (*ty
14181418
}
14191419

14201420
func (h *TimeoutHeimdallClient) GetLatestSpan(ctx context.Context) (*types.Span, error) {
1421-
if h.shouldHangSpan {
1421+
if h.shouldHangSpan.Load() {
14221422
// Simulate a hanging request that would block indefinitely
14231423
select {
14241424
case <-ctx.Done():
@@ -1431,7 +1431,7 @@ func (h *TimeoutHeimdallClient) GetLatestSpan(ctx context.Context) (*types.Span,
14311431
}
14321432

14331433
func (h *TimeoutHeimdallClient) FetchStatus(ctx context.Context) (*ctypes.SyncInfo, error) {
1434-
if h.shouldTimeout {
1434+
if h.shouldTimeout.Load() {
14351435
select {
14361436
case <-ctx.Done():
14371437
return nil, ctx.Err()
@@ -1464,9 +1464,9 @@ func TestSpanStore_HeimdallDownTimeout(t *testing.T) {
14641464
t.Run("heimdallStatus set to nil on FetchStatus error", func(t *testing.T) {
14651465
// Create a client that will fail on FetchStatus
14661466
mockClient := &TimeoutHeimdallClient{
1467-
shouldTimeout: true,
1468-
timeout: 10 * time.Millisecond, // Quick failure
1467+
timeout: 10 * time.Millisecond, // Quick failure
14691468
}
1469+
mockClient.shouldTimeout.Store(true)
14701470

14711471
spanStore := NewSpanStore(mockClient, nil, "1337")
14721472
defer spanStore.Close()
@@ -1491,9 +1491,9 @@ func TestSpanStore_HeimdallDownTimeout(t *testing.T) {
14911491
t.Run("background goroutine sets heimdallStatus to nil on persistent errors", func(t *testing.T) {
14921492
// Create a client that starts working then fails
14931493
mockClient := &TimeoutHeimdallClient{
1494-
shouldTimeout: false,
1495-
timeout: 10 * time.Millisecond,
1494+
timeout: 10 * time.Millisecond,
14961495
}
1496+
mockClient.shouldTimeout.Store(false)
14971497

14981498
spanStore := NewSpanStore(mockClient, nil, "1337")
14991499
defer spanStore.Close()
@@ -1506,7 +1506,7 @@ func TestSpanStore_HeimdallDownTimeout(t *testing.T) {
15061506
require.NotNil(t, status, "Status should be set initially")
15071507

15081508
// Now make FetchStatus fail
1509-
mockClient.shouldTimeout = true
1509+
mockClient.shouldTimeout.Store(true)
15101510

15111511
// Wait for the background goroutine to encounter the error
15121512
time.Sleep(500 * time.Millisecond)
@@ -1516,7 +1516,7 @@ func TestSpanStore_HeimdallDownTimeout(t *testing.T) {
15161516
require.Nil(t, status, "heimdallStatus should be nil after FetchStatus starts failing")
15171517

15181518
// Now make it work again
1519-
mockClient.shouldTimeout = false
1519+
mockClient.shouldTimeout.Store(false)
15201520

15211521
// Wait for recovery
15221522
time.Sleep(500 * time.Millisecond)

core/blockchain_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"path"
2929
"reflect"
3030
"sync"
31+
"sync/atomic"
3132
"testing"
3233
"time"
3334

@@ -4784,8 +4785,8 @@ func (m *mockChainValidator) GetMilestoneIDsList() []string { return nil }
47844785
type mockFailingEngine struct {
47854786
*ethash.Ethash
47864787
shouldFailHeader map[uint64]bool
4787-
allowInitialInsertion bool // Allow initial insertion to succeed
4788-
insertionComplete bool // Track when insertion is complete
4788+
allowInitialInsertion bool // Allow initial insertion to succeed
4789+
insertionComplete atomic.Bool // Track when insertion is complete
47894790
}
47904791

47914792
func (m *mockFailingEngine) VerifyHeaders(chain consensus.ChainHeaderReader, headers []*types.Header) (chan<- struct{}, <-chan error) {
@@ -4800,7 +4801,7 @@ func (m *mockFailingEngine) VerifyHeaders(chain consensus.ChainHeaderReader, hea
48004801
return
48014802
default:
48024803
// If we're allowing initial insertion and it's not complete yet, succeed
4803-
if m.allowInitialInsertion && !m.insertionComplete {
4804+
if m.allowInitialInsertion && !m.insertionComplete.Load() {
48044805
results <- nil
48054806
} else if m.shouldFailHeader != nil && m.shouldFailHeader[header.Number.Uint64()] {
48064807
results <- errors.New("mock header verification failure")
@@ -4815,7 +4816,7 @@ func (m *mockFailingEngine) VerifyHeaders(chain consensus.ChainHeaderReader, hea
48154816
}
48164817

48174818
func (m *mockFailingEngine) markInsertionComplete() {
4818-
m.insertionComplete = true
4819+
m.insertionComplete.Store(true)
48194820
}
48204821

48214822
// TestHeaderVerificationLoop tests the background header verification functionality

core/txpool/locals/journal.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"io"
2222
"io/fs"
2323
"os"
24+
"sync"
2425

2526
"github.com/ethereum/go-ethereum/common"
2627
"github.com/ethereum/go-ethereum/core/types"
@@ -46,6 +47,7 @@ func (*devNull) Close() error { return nil }
4647
type journal struct {
4748
path string // Filesystem path to store the transactions at
4849
writer io.WriteCloser // Output stream to write new transactions into
50+
mu sync.Mutex // Protects writer for concurrent access
4951
}
5052

5153
// newTxJournal creates a new transaction journal to
@@ -55,6 +57,12 @@ func newTxJournal(path string) *journal {
5557
}
5658
}
5759

60+
func (journal *journal) setWriter(w io.WriteCloser) {
61+
journal.mu.Lock()
62+
defer journal.mu.Unlock()
63+
journal.writer = w
64+
}
65+
5866
// load parses a transaction journal dump from disk, loading its contents into
5967
// the specified pool.
6068
func (journal *journal) load(add func([]*types.Transaction) []error) error {
@@ -72,8 +80,8 @@ func (journal *journal) load(add func([]*types.Transaction) []error) error {
7280
defer input.Close()
7381

7482
// Temporarily discard any journal additions (don't double add on load)
75-
journal.writer = new(devNull)
76-
defer func() { journal.writer = nil }()
83+
journal.setWriter(new(devNull))
84+
defer journal.setWriter(nil)
7785

7886
// Inject all transactions from the journal into the pool
7987
stream := rlp.NewStream(input, 0)
@@ -125,6 +133,9 @@ func (journal *journal) load(add func([]*types.Transaction) []error) error {
125133
}
126134

127135
func (journal *journal) setupWriter() error {
136+
journal.mu.Lock()
137+
defer journal.mu.Unlock()
138+
128139
if journal.writer != nil {
129140
if err := journal.writer.Close(); err != nil {
130141
return err
@@ -145,6 +156,9 @@ func (journal *journal) setupWriter() error {
145156

146157
// insert adds the specified transaction to the local disk journal.
147158
func (journal *journal) insert(tx *types.Transaction) error {
159+
journal.mu.Lock()
160+
defer journal.mu.Unlock()
161+
148162
if journal.writer == nil {
149163
return errNoActiveJournal
150164
}
@@ -159,6 +173,9 @@ func (journal *journal) insert(tx *types.Transaction) error {
159173
// rotate regenerates the transaction journal based on the current contents of
160174
// the transaction pool.
161175
func (journal *journal) rotate(all map[common.Address]types.Transactions) error {
176+
journal.mu.Lock()
177+
defer journal.mu.Unlock()
178+
162179
// Close the current journal (if any is open)
163180
if journal.writer != nil {
164181
if err := journal.writer.Close(); err != nil {
@@ -211,6 +228,9 @@ func (journal *journal) rotate(all map[common.Address]types.Transactions) error
211228

212229
// close flushes the transaction journal contents to disk and closes the file.
213230
func (journal *journal) close() error {
231+
journal.mu.Lock()
232+
defer journal.mu.Unlock()
233+
214234
var err error
215235
if journal.writer != nil {
216236
err = journal.writer.Close()

triedb/database.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package triedb
1818

1919
import (
2020
"errors"
21+
"sync/atomic"
2122

2223
"github.com/ethereum/go-ethereum/common"
2324
"github.com/ethereum/go-ethereum/core/rawdb"
@@ -84,10 +85,10 @@ type backend interface {
8485
// relevant with trie nodes and node preimages.
8586
type Database struct {
8687
disk ethdb.Database
87-
config *Config // Configuration for trie database
88-
preimages *preimageStore // The store for caching preimages
89-
backend backend // The backend for managing trie nodes
90-
readBackend backend
88+
config *Config // Configuration for trie database
89+
preimages *preimageStore // The store for caching preimages
90+
backend backend // The backend for managing trie nodes
91+
readBackend atomic.Pointer[backend] // Lock-free for concurrent reads, supports nil
9192
}
9293

9394
// NewDatabase initializes the trie database with default settings, note
@@ -117,15 +118,15 @@ func NewDatabase(diskdb ethdb.Database, config *Config) *Database {
117118
return db
118119
}
119120

120-
func (db *Database) SetReadBackend(backend backend) {
121-
db.readBackend = backend
121+
func (db *Database) SetReadBackend(b backend) {
122+
db.readBackend.Store(&b)
122123
}
123124

124125
// Reader returns a reader for accessing all trie nodes with provided state root.
125126
// An error will be returned if the requested state is not available.
126127
func (db *Database) NodeReader(blockRoot common.Hash) (database.NodeReader, error) {
127-
if db.readBackend != nil {
128-
return db.readBackend.NodeReader(blockRoot)
128+
if rbPtr := db.readBackend.Load(); rbPtr != nil && *rbPtr != nil {
129+
return (*rbPtr).NodeReader(blockRoot)
129130
}
130131
return db.backend.NodeReader(blockRoot)
131132
}

0 commit comments

Comments
 (0)