Skip to content

Commit d7b58d1

Browse files
authored
[+] use new InstanceMetricCache implementation in the Reaper (#716)
1 parent 80fcc33 commit d7b58d1

File tree

12 files changed

+319
-153
lines changed

12 files changed

+319
-153
lines changed

internal/db/bootstrap_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
"github.com/testcontainers/testcontainers-go/wait"
1818
)
1919

20+
const ImageName = "docker.io/postgres:17-alpine"
21+
2022
var ctx = context.Background()
2123

2224
func TestPing(t *testing.T) {
@@ -73,7 +75,7 @@ func initTestContainer() (*postgres.PostgresContainer, error) {
7375
dbPassword := "pgwatchadmin"
7476

7577
return postgres.Run(ctx,
76-
"docker.io/postgres:16-alpine",
78+
ImageName,
7779
postgres.WithDatabase(dbName),
7880
postgres.WithUsername(dbUser),
7981
postgres.WithPassword(dbPassword),
@@ -83,9 +85,10 @@ func initTestContainer() (*postgres.PostgresContainer, error) {
8385
WithStartupTimeout(5*time.Second)),
8486
)
8587
}
88+
8689
func TestNew(t *testing.T) {
8790
pg, err := initTestContainer()
88-
assert.NoError(t, err)
91+
require.NoError(t, err)
8992
defer func() { assert.NoError(t, pg.Terminate(ctx)) }()
9093
connStr, err := pg.ConnectionString(ctx)
9194
t.Log(connStr)

internal/metrics/cmdopts.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,21 @@
11
package metrics
22

3+
import (
4+
"time"
5+
)
6+
37
// CmdOpts specifies metric command-line options
48
type CmdOpts struct {
59
Metrics string `short:"m" long:"metrics" mapstructure:"metrics" description:"File or folder of YAML files with metrics definitions" env:"PW_METRICS"`
610
CreateHelpers bool `long:"create-helpers" mapstructure:"create-helpers" description:"Create helper database objects from metric definitions" env:"PW_CREATE_HELPERS"`
711
DirectOSStats bool `long:"direct-os-stats" mapstructure:"direct-os-stats" description:"Extract OS related psutil statistics not via PL/Python wrappers but directly on host" env:"PW_DIRECT_OS_STATS"`
8-
InstanceLevelCacheMaxSeconds int64 `long:"instance-level-cache-max-seconds" mapstructure:"instance-level-cache-max-seconds" description:"Max allowed staleness for instance level metric data shared between DBs of an instance. Affects 'continuous' host types only. Set to 0 to disable" env:"PW_INSTANCE_LEVEL_CACHE_MAX_SECONDS" default:"30"`
12+
InstanceLevelCacheMaxSeconds int64 `long:"instance-level-cache-max-seconds" mapstructure:"instance-level-cache-max-seconds" description:"Max allowed staleness for instance level metric data shared between DBs of an instance. Set to 0 to disable" env:"PW_INSTANCE_LEVEL_CACHE_MAX_SECONDS" default:"30"`
913
EmergencyPauseTriggerfile string `long:"emergency-pause-triggerfile" mapstructure:"emergency-pause-triggerfile" description:"When the file exists no metrics will be temporarily fetched / scraped" env:"PW_EMERGENCY_PAUSE_TRIGGERFILE" default:"/tmp/pgwatch-emergency-pause"`
1014
}
15+
16+
func (c CmdOpts) CacheAge() time.Duration {
17+
if c.InstanceLevelCacheMaxSeconds < 0 {
18+
c.InstanceLevelCacheMaxSeconds = 0
19+
}
20+
return time.Duration(c.InstanceLevelCacheMaxSeconds) * time.Second
21+
}

internal/metrics/cmdopts_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package metrics
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestCacheAge(t *testing.T) {
11+
tests := []struct {
12+
name string
13+
opts CmdOpts
14+
expected time.Duration
15+
}{
16+
{
17+
name: "Cache enabled with positive value",
18+
opts: CmdOpts{InstanceLevelCacheMaxSeconds: 30},
19+
expected: 30 * time.Second,
20+
},
21+
{
22+
name: "Cache disabled with zero value",
23+
opts: CmdOpts{InstanceLevelCacheMaxSeconds: 0},
24+
expected: 0,
25+
},
26+
{
27+
name: "Cache disable with incorrect value",
28+
opts: CmdOpts{InstanceLevelCacheMaxSeconds: -30},
29+
expected: 0,
30+
},
31+
}
32+
33+
for _, tt := range tests {
34+
t.Run(tt.name, func(t *testing.T) {
35+
assert.Equal(t, tt.expected, tt.opts.CacheAge())
36+
})
37+
}
38+
}

internal/metrics/types.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package metrics
22

3-
import "time"
3+
import (
4+
"maps"
5+
"time"
6+
)
47

58
type (
69
ExtensionInfo struct {
@@ -98,6 +101,30 @@ func (m Measurements) GetEpoch() int64 {
98101
return Measurement(m[0]).GetEpoch()
99102
}
100103

104+
func (m Measurements) IsEpochSet() bool {
105+
if len(m) == 0 {
106+
return false
107+
}
108+
_, ok := m[0][EpochColumnName]
109+
return ok
110+
}
111+
112+
func (m Measurements) DeepCopy() Measurements {
113+
newData := make(Measurements, len(m))
114+
for i, dr := range m {
115+
newData[i] = maps.Clone(dr)
116+
}
117+
return newData
118+
}
119+
120+
// Touch updates the last modified time of the metric definitions
121+
func (m Measurements) Touch() {
122+
ns := time.Now().UnixNano()
123+
for _, measurement := range m {
124+
measurement[EpochColumnName] = ns
125+
}
126+
}
127+
101128
type MeasurementEnvelope struct {
102129
DBName string
103130
SourceType string

internal/metrics/types_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package metrics
22

33
import (
44
"testing"
5+
"time"
56

67
"github.com/stretchr/testify/assert"
78
)
@@ -38,3 +39,24 @@ func TestPrimaryOnly(t *testing.T) {
3839
assert.False(t, m.PrimaryOnly())
3940
assert.True(t, m.StandbyOnly())
4041
}
42+
43+
func TestMeasurement(t *testing.T) {
44+
m := NewMeasurement(1234567890)
45+
assert.Equal(t, int64(1234567890), m.GetEpoch(), "epoch should be equal")
46+
m[EpochColumnName] = "wrong type"
47+
assert.True(t, time.Now().UnixNano()-m.GetEpoch() < int64(time.Second), "epoch should be close to now")
48+
}
49+
50+
func TestMeasurements(t *testing.T) {
51+
m := Measurements{}
52+
assert.False(t, m.IsEpochSet(), "epoch should not be set")
53+
assert.True(t, time.Now().UnixNano()-m.GetEpoch() < 100, "epoch should be close to now")
54+
m = append(m, NewMeasurement(1234567890))
55+
assert.True(t, m.IsEpochSet(), "epoch should be set")
56+
assert.Equal(t, int64(1234567890), m.GetEpoch(), "epoch should be equal")
57+
m1 := m.DeepCopy()
58+
assert.Equal(t, m, m1, "deep copy should be equal")
59+
m1.Touch()
60+
assert.NotEqual(t, m, m1, "deep copy should be different")
61+
assert.True(t, time.Now().UnixNano()-m1.GetEpoch() < int64(time.Second), "epoch should be close to now")
62+
}

internal/reaper/cache.go

Lines changed: 28 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package reaper
22

33
import (
44
"fmt"
5-
"maps"
65
"sync"
76
"time"
87

@@ -52,59 +51,44 @@ func GetMonitoredDatabaseByUniqueName(name string) (*sources.SourceConn, error)
5251
return md, nil
5352
}
5453

55-
var instanceMetricCache = make(map[string](metrics.Measurements)) // [dbUnique+metric]lastly_fetched_data
56-
var instanceMetricCacheLock = sync.RWMutex{}
57-
var instanceMetricCacheTimestamp = make(map[string]time.Time) // [dbUnique+metric]last_fetch_time
58-
var instanceMetricCacheTimestampLock = sync.RWMutex{}
54+
type InstanceMetricCache struct {
55+
cache map[string](metrics.Measurements) // [dbUnique+metric]lastly_fetched_data
56+
sync.RWMutex
57+
}
5958

60-
func GetFromInstanceCacheIfNotOlderThanSeconds(msg MetricFetchConfig, maxAgeSeconds int64) metrics.Measurements {
61-
var clonedData metrics.Measurements
62-
instanceMetricCacheTimestampLock.RLock()
63-
instanceMetricTS, ok := instanceMetricCacheTimestamp[msg.DBUniqueNameOrig+msg.MetricName]
64-
instanceMetricCacheTimestampLock.RUnlock()
65-
if !ok || time.Now().Unix()-instanceMetricTS.Unix() > maxAgeSeconds {
66-
return nil
59+
func NewInstanceMetricCache() *InstanceMetricCache {
60+
return &InstanceMetricCache{
61+
cache: make(map[string](metrics.Measurements)),
6762
}
63+
}
6864

69-
instanceMetricCacheLock.RLock()
70-
instanceMetricData, ok := instanceMetricCache[msg.DBUniqueNameOrig+msg.MetricName]
71-
if !ok {
72-
instanceMetricCacheLock.RUnlock()
65+
func (imc *InstanceMetricCache) Get(key string, age time.Duration) metrics.Measurements {
66+
if key == "" {
7367
return nil
7468
}
75-
clonedData = deepCopyMetricData(instanceMetricData)
76-
instanceMetricCacheLock.RUnlock()
69+
imc.RLock()
70+
defer imc.RUnlock()
71+
instanceMetricEpochNs := (imc.cache[key]).GetEpoch()
7772

78-
return clonedData
79-
}
80-
81-
func IsCacheableMetric(msg MetricFetchConfig, mvp metrics.Metric) bool {
82-
switch msg.Source {
83-
case sources.SourcePostgresContinuous, sources.SourcePatroniContinuous:
84-
return false
85-
default:
86-
return mvp.IsInstanceLevel
73+
if time.Now().UnixNano()-instanceMetricEpochNs > age.Nanoseconds() {
74+
return nil
8775
}
76+
instanceMetricData, ok := imc.cache[key]
77+
if !ok {
78+
return nil
79+
}
80+
return instanceMetricData.DeepCopy()
8881
}
8982

90-
func PutToInstanceCache(msg MetricFetchConfig, data metrics.Measurements) {
91-
if len(data) == 0 {
83+
func (imc *InstanceMetricCache) Put(key string, data metrics.Measurements) {
84+
if len(data) == 0 || key == "" {
9285
return
9386
}
94-
dataCopy := deepCopyMetricData(data)
95-
instanceMetricCacheLock.Lock()
96-
instanceMetricCache[msg.DBUniqueNameOrig+msg.MetricName] = dataCopy
97-
instanceMetricCacheLock.Unlock()
98-
99-
instanceMetricCacheTimestampLock.Lock()
100-
instanceMetricCacheTimestamp[msg.DBUniqueNameOrig+msg.MetricName] = time.Now()
101-
instanceMetricCacheTimestampLock.Unlock()
102-
}
103-
104-
func deepCopyMetricData(data metrics.Measurements) metrics.Measurements {
105-
newData := make(metrics.Measurements, len(data))
106-
for i, dr := range data {
107-
newData[i] = maps.Clone(dr)
87+
imc.Lock()
88+
defer imc.Unlock()
89+
m := data.DeepCopy()
90+
if !m.IsEpochSet() {
91+
m.Touch()
10892
}
109-
return newData
93+
imc.cache[key] = m
11094
}

internal/reaper/cache_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package reaper
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestInstanceMetricCache_PutAndGet(t *testing.T) {
12+
cache := NewInstanceMetricCache()
13+
14+
// Define test data
15+
key := "test_key"
16+
data := metrics.Measurements{metrics.Measurement{"test_key": 42}}
17+
18+
// Test Put
19+
cache.Put(key, data)
20+
data.Touch() // Update the epoch time
21+
22+
// Test Get with valid key and age
23+
retrievedData := cache.Get(key, time.Second)
24+
assert.NotNil(t, retrievedData, "Expected data to be retrieved")
25+
assert.LessOrEqual(t, data.GetEpoch()-retrievedData.GetEpoch(), int64(time.Second), "Epoch times should be close")
26+
27+
// Test Get with invalid key
28+
invalidKey := "invalid_key"
29+
retrievedData = cache.Get(invalidKey, time.Second)
30+
assert.Nil(t, retrievedData, "Expected nil for invalid key")
31+
32+
// Test Get with expired age
33+
retrievedData = cache.Get(key, -time.Second) // Negative age to simulate expiration
34+
assert.Nil(t, retrievedData, "Expected nil for expired data")
35+
36+
// Test Get with empty key
37+
retrievedData = cache.Get("", time.Second)
38+
assert.Nil(t, retrievedData, "Expected nil for empty key")
39+
}
40+
41+
func TestInstanceMetricCache_PutEmptyData(t *testing.T) {
42+
cache := NewInstanceMetricCache()
43+
44+
// Test Put with empty data
45+
cache.Put("test_key", metrics.Measurements{})
46+
retrievedData := cache.Get("test_key", time.Second)
47+
assert.Nil(t, retrievedData, "Expected nil for empty data")
48+
49+
data := metrics.Measurements{metrics.Measurement{}}
50+
// Test Put with empty key
51+
cache.Put("", data)
52+
retrievedData = cache.Get("", time.Second)
53+
assert.Nil(t, retrievedData, "Expected nil for empty key")
54+
}
55+
56+
func TestInstanceMetricCache_Concurrency(t *testing.T) {
57+
cache := NewInstanceMetricCache()
58+
59+
// Define test data
60+
key := "test_key"
61+
data := metrics.Measurements{metrics.Measurement{}}
62+
data.Touch()
63+
64+
// Use goroutines to test concurrent access
65+
done := make(chan bool)
66+
go func() {
67+
cache.Put(key, data)
68+
done <- true
69+
}()
70+
go func() {
71+
_ = cache.Get(key, time.Second)
72+
done <- true
73+
}()
74+
75+
// Wait for goroutines to finish
76+
<-done
77+
<-done
78+
79+
// Verify data is still accessible
80+
retrievedData := cache.Get(key, time.Second)
81+
assert.NotNil(t, retrievedData, "Expected data to be retrieved after concurrent access")
82+
}

internal/reaper/file.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func IsDirectlyFetchableMetric(metric string) bool {
4646
return ok
4747
}
4848

49-
func FetchStatsDirectlyFromOS(ctx context.Context, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) ([]metrics.MeasurementEnvelope, error) {
49+
func FetchStatsDirectlyFromOS(ctx context.Context, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) (*metrics.MeasurementEnvelope, error) {
5050
var data, dataDirs, dataTblspDirs metrics.Measurements
5151
var err error
5252

@@ -76,7 +76,7 @@ func FetchStatsDirectlyFromOS(ctx context.Context, msg MetricFetchConfig, vme Mo
7676
if err != nil {
7777
return nil, err
7878
}
79-
return []metrics.MeasurementEnvelope{msm}, nil
79+
return &msm, nil
8080
}
8181

8282
// data + custom tags + counters

0 commit comments

Comments
 (0)