Skip to content

Commit 19f2f56

Browse files
dgiagiozoltanbedi
andauthored
fix: Add mutex protection to prevent data races in datasource cache (#433)
### Problem The `CachedDatasource` implementation contained a data race condition where multiple goroutines could concurrently access and modify the shared `cache` map without proper synchronization. This could lead to unpredictable behavior, potential crashes, or data corruption in high-concurrency environments. The bug has been reproduced by adding a unit test for this scenario and running the tests using the `-race` option to `go test` command as shown below: ```sh ❯ go test -race ./pkg/... ? github.com/grafana/github-datasource/pkg [no test files] ? github.com/grafana/github-datasource/pkg/dfutil [no test files] ? github.com/grafana/github-datasource/pkg/errors [no test files] ok github.com/grafana/github-datasource/pkg/github 1.372s ok github.com/grafana/github-datasource/pkg/github/client (cached) ok github.com/grafana/github-datasource/pkg/github/projects 1.572s ? github.com/grafana/github-datasource/pkg/httputil [no test files] ? github.com/grafana/github-datasource/pkg/models [no test files] ================== WARNING: DATA RACE Write at 0x00c00039a7b0 by goroutine 33: runtime.mapaccess2() /opt/homebrew/Cellar/go/1.24.1/libexec/src/internal/runtime/maps/runtime_swiss.go:117 +0x2dc github.com/grafana/github-datasource/pkg/plugin.(*CachedDatasource).saveCache() /Users/dgiagio/Devel/github-datasource/pkg/plugin/datasource_caching.go:89 +0x148 github.com/grafana/github-datasource/pkg/plugin.TestWithCaching.func2() /Users/dgiagio/Devel/github-datasource/pkg/plugin/datasource_caching_test.go:32 +0xc4 testing.tRunner() /opt/homebrew/Cellar/go/1.24.1/libexec/src/testing/testing.go:1792 +0x180 testing.(*T).Run.gowrap1() /opt/homebrew/Cellar/go/1.24.1/libexec/src/testing/testing.go:1851 +0x40 Previous write at 0x00c00039a7b0 by goroutine 32: runtime.mapaccess2() /opt/homebrew/Cellar/go/1.24.1/libexec/src/internal/runtime/maps/runtime_swiss.go:117 +0x2dc github.com/grafana/github-datasource/pkg/plugin.(*CachedDatasource).saveCache() /Users/dgiagio/Devel/github-datasource/pkg/plugin/datasource_caching.go:89 +0x148 github.com/grafana/github-datasource/pkg/plugin.TestWithCaching.func1() /Users/dgiagio/Devel/github-datasource/pkg/plugin/datasource_caching_test.go:24 +0xc4 testing.tRunner() /opt/homebrew/Cellar/go/1.24.1/libexec/src/testing/testing.go:1792 +0x180 testing.(*T).Run.gowrap1() /opt/homebrew/Cellar/go/1.24.1/libexec/src/testing/testing.go:1851 +0x40 ``` ### Changes This PR adds proper concurrency handling to the `CachedDatasource` struct: - Added a read-write mutex (`sync.RWMutex`) field to protect concurrent access to the cache map - Implemented appropriate locking in the following methods: - `getCache`: Uses read lock (`RLock`/`RUnlock`) for read-only access - `saveCache`: Uses write lock (`Lock`/`Unlock`) when modifying the cache - `Cleanup`: Uses write lock when removing expired entries ### Testing Added comprehensive unit tests in `datasource_caching_test.go` that verify concurrent access safety: - Concurrent reads from empty cache - Concurrent writes to and reads from the cache - Multiple concurrent reads from the cache These tests ensure the cache operations remain thread-safe under concurrent access patterns. ### Impact This change improves stability and reliability of the GitHub datasource plugin by eliminating potential race conditions, making it more robust in high-concurrency environments. --------- Co-authored-by: Zoltán Bedi <[email protected]>
1 parent 8852825 commit 19f2f56

File tree

3 files changed

+150
-1
lines changed

3 files changed

+150
-1
lines changed

.changeset/lazy-crews-explain.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'grafana-github-datasource': patch
3+
---
4+
5+
fix: Add mutex protection to prevent data races in datasource cache

pkg/plugin/datasource_caching.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"crypto/sha256"
77
"encoding/hex"
88
"encoding/json"
9+
"sync"
910
"time"
1011

1112
"github.com/grafana/github-datasource/pkg/dfutil"
@@ -46,7 +47,9 @@ var (
4647
// If there is no cached data to respond with, the CachedDatasource forwards the request to the Datasource
4748
type CachedDatasource struct {
4849
datasource Datasource
49-
cache map[string]CachedResult
50+
51+
mu sync.RWMutex // protects the cache map against concurrent access
52+
cache map[string]CachedResult
5053
}
5154

5255
func (c *CachedDatasource) getCache(req backend.DataQuery) (dfutil.Framer, error) {
@@ -55,6 +58,9 @@ func (c *CachedDatasource) getCache(req backend.DataQuery) (dfutil.Framer, error
5558
return nil, err
5659
}
5760

61+
c.mu.RLock()
62+
defer c.mu.RUnlock()
63+
5864
// Return cached value if it's there and it's not expired
5965
res, ok := c.cache[key]
6066
if ok {
@@ -77,6 +83,9 @@ func (c *CachedDatasource) saveCache(req backend.DataQuery, f dfutil.Framer, err
7783
return nil, err
7884
}
7985

86+
c.mu.Lock()
87+
defer c.mu.Unlock()
88+
8089
c.cache[key] = newCachedResult(f)
8190
return f, err
8291
}
@@ -277,6 +286,9 @@ func getCacheKey(req backend.DataQuery) (string, error) {
277286

278287
// Cleanup removes old cache keys
279288
func (c *CachedDatasource) Cleanup() {
289+
c.mu.Lock()
290+
defer c.mu.Unlock()
291+
280292
for k, v := range c.cache {
281293
// If it's an expired value, then delete it
282294
if v.ExpiresAt.Before(time.Now()) {
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package plugin
2+
3+
import (
4+
"encoding/json"
5+
"github.com/grafana/grafana-plugin-sdk-go/backend"
6+
"github.com/grafana/grafana-plugin-sdk-go/data"
7+
"github.com/stretchr/testify/assert"
8+
"sync"
9+
"testing"
10+
)
11+
12+
// mockFramer is a struct implementing the Framer interface that returns predefined frames for testing purposes
13+
type mockFramer struct {
14+
frames data.Frames
15+
}
16+
17+
func (m mockFramer) Frames() data.Frames {
18+
return m.frames
19+
}
20+
21+
// Fixture for the test cases
22+
var dataQueryA = backend.DataQuery{JSON: json.RawMessage(`{"query": "A"}`)}
23+
var framesA = data.Frames{data.NewFrame("A", nil)}
24+
var dataQueryB = backend.DataQuery{JSON: json.RawMessage(`{"query": "B"}`)}
25+
var framesB = data.Frames{data.NewFrame("B", nil)}
26+
27+
func TestWithCaching(t *testing.T) {
28+
cachedDS := WithCaching(nil)
29+
30+
t.Run("read from empty cache concurrently", func(t *testing.T) {
31+
var wg sync.WaitGroup
32+
33+
// Read goroutine 1
34+
wg.Add(1)
35+
go func() {
36+
defer wg.Done()
37+
38+
f, err := cachedDS.getCache(dataQueryA)
39+
assert.Nil(t, f)
40+
assert.ErrorIs(t, err, ErrNoValue)
41+
}()
42+
43+
// Read goroutine 2
44+
wg.Add(1)
45+
go func() {
46+
defer wg.Done()
47+
48+
f, err := cachedDS.getCache(dataQueryA)
49+
assert.Nil(t, f)
50+
assert.ErrorIs(t, err, ErrNoValue)
51+
}()
52+
53+
wg.Wait()
54+
})
55+
56+
t.Run("write to and read from cache concurrently", func(t *testing.T) {
57+
var wg sync.WaitGroup
58+
59+
// Write goroutine 1
60+
wg.Add(1)
61+
go func() {
62+
defer wg.Done()
63+
64+
f, err := cachedDS.saveCache(dataQueryA, mockFramer{frames: framesA}, nil)
65+
assert.NoError(t, err)
66+
assert.Equal(t, framesA, f.Frames())
67+
}()
68+
69+
// Write goroutine 2
70+
wg.Add(1)
71+
go func() {
72+
defer wg.Done()
73+
74+
f, err := cachedDS.saveCache(dataQueryB, mockFramer{frames: framesB}, nil)
75+
assert.NoError(t, err)
76+
assert.Equal(t, framesB, f.Frames())
77+
}()
78+
79+
// Wait for writing goroutines
80+
wg.Wait()
81+
82+
// Read goroutine 1
83+
wg.Add(1)
84+
go func() {
85+
defer wg.Done()
86+
87+
f, err := cachedDS.getCache(dataQueryA)
88+
assert.NoError(t, err)
89+
assert.Equal(t, framesA, f.Frames())
90+
}()
91+
92+
// Read goroutine 2
93+
wg.Add(1)
94+
go func() {
95+
defer wg.Done()
96+
97+
f, err := cachedDS.getCache(dataQueryB)
98+
assert.NoError(t, err)
99+
assert.Equal(t, framesB, f.Frames())
100+
}()
101+
102+
// Wait for reading goroutines
103+
wg.Wait()
104+
})
105+
106+
t.Run("read from the cache concurrently", func(t *testing.T) {
107+
var wg sync.WaitGroup
108+
109+
// Read goroutine 1
110+
wg.Add(1)
111+
go func() {
112+
defer wg.Done()
113+
114+
f, err := cachedDS.getCache(dataQueryA)
115+
assert.NoError(t, err)
116+
assert.Equal(t, framesA, f.Frames())
117+
}()
118+
119+
// Read goroutine 2
120+
wg.Add(1)
121+
go func() {
122+
defer wg.Done()
123+
124+
f, err := cachedDS.getCache(dataQueryB)
125+
assert.NoError(t, err)
126+
assert.Equal(t, framesB, f.Frames())
127+
}()
128+
129+
// Wait for reading goroutines
130+
wg.Wait()
131+
})
132+
}

0 commit comments

Comments
 (0)