Skip to content

Commit ba468d3

Browse files
authored
[+] move sources sync to the reaper (#769)
Since reapers is responsible for connections and metric syncing it's better to move source synchronization to the reaper itself. Also get rid of `monitoredDbCache` and use `reaper.monitoredSources` directly. Now we can use `QueryMeasurements` directly with connection as an argument instead of the name to look in a cache.
1 parent 29c2e74 commit ba468d3

File tree

11 files changed

+67
-84
lines changed

11 files changed

+67
-84
lines changed

cmd/pgwatch/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ var (
4343
var Exit = os.Exit
4444

4545
func main() {
46+
47+
// Uncomment the following lines to enable the pprof HTTP server for debugging
48+
// go func() {
49+
// panic(http.ListenAndServe(":6060", nil))
50+
// }()
51+
4652
exitCode.Store(cmdopts.ExitCodeOK)
4753
defer func() {
4854
if err := recover(); err != nil {

internal/reaper/cache.go

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,14 @@
11
package reaper
22

33
import (
4-
"context"
5-
"fmt"
64
"sync"
75
"time"
86

97
"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
10-
"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
118
)
129

13-
var monitoredDbCache map[string]*sources.SourceConn
14-
var monitoredDbCacheLock sync.RWMutex
15-
1610
var lastSQLFetchError sync.Map
1711

18-
func UpdateMonitoredDBCache(data sources.SourceConns) {
19-
monitoredDbCacheNew := make(map[string]*sources.SourceConn)
20-
for _, row := range data {
21-
monitoredDbCacheNew[row.Name] = row
22-
}
23-
monitoredDbCacheLock.Lock()
24-
monitoredDbCache = monitoredDbCacheNew
25-
monitoredDbCacheLock.Unlock()
26-
}
27-
28-
func GetMonitoredDatabaseByUniqueName(ctx context.Context, name string) (*sources.SourceConn, error) {
29-
if ctx.Err() != nil {
30-
return nil, ctx.Err()
31-
}
32-
monitoredDbCacheLock.RLock()
33-
defer monitoredDbCacheLock.RUnlock()
34-
md, exists := monitoredDbCache[name]
35-
if !exists || md == nil || md.Conn == nil {
36-
return nil, fmt.Errorf("database %s not found in cache", name)
37-
}
38-
return md, nil
39-
}
40-
4112
type InstanceMetricCache struct {
4213
cache map[string](metrics.Measurements) // [dbUnique+metric]lastly_fetched_data
4314
sync.RWMutex

internal/reaper/database.go

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,14 @@ import (
1515
"github.com/jackc/pgx/v5"
1616
)
1717

18-
func QueryMeasurements(ctx context.Context, dbUnique string, sql string, args ...any) (metrics.Measurements, error) {
19-
// TODO: move to sources package and use direct pgx connection
18+
func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, args ...any) (metrics.Measurements, error) {
2019
var conn db.PgxIface
21-
var md *sources.SourceConn
2220
var err error
2321
var tx pgx.Tx
2422
if strings.TrimSpace(sql) == "" {
2523
return nil, errors.New("empty SQL")
2624
}
27-
if md, err = GetMonitoredDatabaseByUniqueName(ctx, dbUnique); err != nil {
28-
return nil, err
29-
}
25+
3026
conn = md.Conn
3127
if md.IsPostgresSource() {
3228
// we don't want transaction for non-postgres sources, e.g. pgbouncer
@@ -67,7 +63,7 @@ func DetectSprocChanges(ctx context.Context, md *sources.SourceConn, storageCh c
6763
return changeCounts
6864
}
6965

70-
data, err := QueryMeasurements(ctx, md.Name, mvp.GetSQL(int(md.Version)))
66+
data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
7167
if err != nil {
7268
log.GetLogger(ctx).Error("could not read sproc_hashes from monitored host: ", md.Name, ", err:", err)
7369
return changeCounts
@@ -150,7 +146,7 @@ func DetectTableChanges(ctx context.Context, md *sources.SourceConn, storageCh c
150146
return changeCounts
151147
}
152148

153-
data, err := QueryMeasurements(ctx, md.Name, mvp.GetSQL(int(md.Version)))
149+
data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
154150
if err != nil {
155151
log.GetLogger(ctx).Error("could not read table_hashes from monitored host:", md.Name, ", err:", err)
156152
return changeCounts
@@ -233,7 +229,7 @@ func DetectIndexChanges(ctx context.Context, md *sources.SourceConn, storageCh c
233229
return changeCounts
234230
}
235231

236-
data, err := QueryMeasurements(ctx, md.Name, mvp.GetSQL(int(md.Version)))
232+
data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
237233
if err != nil {
238234
log.GetLogger(ctx).Error("could not read index_hashes from monitored host:", md.Name, ", err:", err)
239235
return changeCounts
@@ -315,7 +311,7 @@ func DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn, storage
315311
}
316312

317313
// returns rows of: object_type, tag_role, tag_object, privilege_type
318-
data, err := QueryMeasurements(ctx, md.Name, mvp.GetSQL(int(md.Version)))
314+
data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
319315
if err != nil {
320316
log.GetLogger(ctx).Errorf("[%s][%s] failed to fetch object privileges info: %v", md.Name, specialMetricChangeEvents, err)
321317
return changeCounts
@@ -389,7 +385,7 @@ func DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn, sto
389385
return changeCounts
390386
}
391387

392-
data, err := QueryMeasurements(ctx, md.Name, mvp.GetSQL(int(md.Version)))
388+
data, err := QueryMeasurements(ctx, md, mvp.GetSQL(int(md.Version)))
393389
if err != nil {
394390
log.GetLogger(ctx).Errorf("[%s][%s] could not read configuration_hashes from monitored host: %v", md.Name, specialMetricChangeEvents, err)
395391
return changeCounts
@@ -468,7 +464,6 @@ func (r *Reaper) CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique s
468464
influxEntry := metrics.NewMeasurement(time.Now().UnixNano())
469465
influxEntry["details"] = message
470466
detectedChangesSummary = append(detectedChangesSummary, influxEntry)
471-
md, _ := GetMonitoredDatabaseByUniqueName(ctx, dbUnique)
472467
storageCh <- metrics.MeasurementEnvelope{
473468
DBName: dbUnique,
474469
SourceType: string(md.Kind),
@@ -483,15 +478,15 @@ func (r *Reaper) CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique s
483478
// Called once on daemon startup if some commonly wanted extension (most notably pg_stat_statements) is missing.
484479
// With newer Postgres version can even succeed if the user is not a real superuser due to some cloud-specific
485480
// whitelisting or "trusted extensions" (a feature from v13). Ignores errors.
486-
func TryCreateMissingExtensions(ctx context.Context, dbUnique string, extensionNames []string, existingExtensions map[string]int) []string {
481+
func TryCreateMissingExtensions(ctx context.Context, md *sources.SourceConn, extensionNames []string, existingExtensions map[string]int) []string {
487482
// TODO: move to sources package and use direct pgx connection
488483
sqlAvailable := `select name::text from pg_available_extensions`
489484
extsCreated := make([]string, 0)
490485

491486
// For security reasons don't allow to execute random strings but check that it's an existing extension
492-
data, err := QueryMeasurements(ctx, dbUnique, sqlAvailable)
487+
data, err := QueryMeasurements(ctx, md, sqlAvailable)
493488
if err != nil {
494-
log.GetLogger(ctx).Infof("[%s] Failed to get a list of available extensions: %v", dbUnique, err)
489+
log.GetLogger(ctx).Infof("[%s] Failed to get a list of available extensions: %v", md, err)
495490
return extsCreated
496491
}
497492

@@ -506,12 +501,12 @@ func TryCreateMissingExtensions(ctx context.Context, dbUnique string, extensionN
506501
}
507502
_, ok := availableExts[extToCreate]
508503
if !ok {
509-
log.GetLogger(ctx).Errorf("[%s] Requested extension %s not available on instance, cannot try to create...", dbUnique, extToCreate)
504+
log.GetLogger(ctx).Errorf("[%s] Requested extension %s not available on instance, cannot try to create...", md, extToCreate)
510505
} else {
511506
sqlCreateExt := `create extension ` + extToCreate
512-
_, err := QueryMeasurements(ctx, dbUnique, sqlCreateExt)
507+
_, err := QueryMeasurements(ctx, md, sqlCreateExt)
513508
if err != nil {
514-
log.GetLogger(ctx).Errorf("[%s] Failed to create extension %s (based on --try-create-listed-exts-if-missing input): %v", dbUnique, extToCreate, err)
509+
log.GetLogger(ctx).Errorf("[%s] Failed to create extension %s (based on --try-create-listed-exts-if-missing input): %v", md, extToCreate, err)
515510
}
516511
extsCreated = append(extsCreated, extToCreate)
517512
}

internal/reaper/file.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,10 @@ func (r *Reaper) FetchStatsDirectlyFromOS(ctx context.Context, md *sources.Sourc
5151
case metricPsutilCPU:
5252
data, err = GetGoPsutilCPU(md.GetMetricInterval(metricName))
5353
case metricPsutilDisk:
54-
if dataDirs, err = QueryMeasurements(ctx, md.Name, sqlPgDirs); err != nil {
54+
if dataDirs, err = QueryMeasurements(ctx, md, sqlPgDirs); err != nil {
5555
return nil, err
5656
}
57-
if dataTblspDirs, err = QueryMeasurements(ctx, md.Name, sqlTsDirs); err != nil {
57+
if dataTblspDirs, err = QueryMeasurements(ctx, md, sqlTsDirs); err != nil {
5858
return nil, err
5959
}
6060
data, err = GetGoPsutilDiskPG(dataDirs, dataTblspDirs)

internal/reaper/metric_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
88
"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
9+
"github.com/pashagolub/pgxmock/v4"
910
"github.com/stretchr/testify/assert"
1011
)
1112

@@ -28,7 +29,8 @@ var (
2829
func TestReaper_FetchStatsDirectlyFromOS(t *testing.T) {
2930
a := assert.New(t)
3031
r := &Reaper{}
31-
md := &sources.SourceConn{}
32+
conn, _ := pgxmock.NewPool()
33+
md := &sources.SourceConn{Conn: conn}
3234
for _, m := range directlyFetchableOSMetrics {
3335
a.True(IsDirectlyFetchableMetric(m), "Expected %s to be directly fetchable", m)
3436
a.NotPanics(func() {

internal/reaper/reaper.go

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (r *Reaper) Reap(ctx context.Context) {
7171
logger.WithError(err).Error("could not refresh metric definitions, using last valid cache")
7272
}
7373

74-
UpdateMonitoredDBCache(r.monitoredSources)
74+
// UpdateMonitoredDBCache(r.monitoredSources)
7575
hostsToShutDownDueToRoleChange := make(map[string]bool) // hosts went from master to standby and have "only if master" set
7676
for _, monitoredSource := range r.monitoredSources {
7777
srcL := logger.WithField("source", monitoredSource.Name)
@@ -202,7 +202,7 @@ func (r *Reaper) CreateSourceHelpers(ctx context.Context, srcL log.Logger, monit
202202
if r.Sources.TryCreateListedExtsIfMissing > "" {
203203
srcL.Info("trying to create extensions if missing")
204204
extsToCreate := strings.Split(r.Sources.TryCreateListedExtsIfMissing, ",")
205-
extsCreated := TryCreateMissingExtensions(ctx, monitoredSource.Name, extsToCreate, monitoredSource.Extensions)
205+
extsCreated := TryCreateMissingExtensions(ctx, monitoredSource, extsToCreate, monitoredSource.Extensions)
206206
srcL.Infof("%d/%d extensions created based on --try-create-listed-exts-if-missing input %v", len(extsCreated), len(extsToCreate), extsCreated)
207207
}
208208

@@ -223,18 +223,16 @@ func (r *Reaper) ShutdownOldWorkers(ctx context.Context, hostsToShutDownDueToRol
223223
for dbMetric, cancelFunc := range r.cancelFuncs {
224224
var currentMetricConfig map[string]float64
225225
var md *sources.SourceConn
226-
var ok, dbRemovedFromConfig bool
226+
var dbRemovedFromConfig bool
227227
singleMetricDisabled := false
228228
splits := strings.Split(dbMetric, dbMetricJoinStr)
229229
db := splits[0]
230230
metric := splits[1]
231231

232232
_, wholeDbShutDownDueToRoleChange := hostsToShutDownDueToRoleChange[db]
233233
if !wholeDbShutDownDueToRoleChange {
234-
monitoredDbCacheLock.RLock()
235-
md, ok = monitoredDbCache[db]
236-
monitoredDbCacheLock.RUnlock()
237-
if !ok { // normal removing of DB from config
234+
md = r.monitoredSources.GetMonitoredDatabase(db)
235+
if md == nil { // normal removing of DB from config
238236
dbRemovedFromConfig = true
239237
logger.Debugf("DB %s removed from config, shutting down all metric worker processes...", db)
240238
}
@@ -363,9 +361,22 @@ func (r *Reaper) LoadSources() (err error) {
363361
r.monitoredSources = make([]*sources.SourceConn, 0)
364362
return nil
365363
}
366-
if r.monitoredSources, err = r.monitoredSources.SyncFromReader(r.SourcesReaderWriter); err != nil {
364+
var newSrcs sources.SourceConns
365+
if newSrcs, err = r.monitoredSources.SyncFromReader(r.SourcesReaderWriter); err != nil {
367366
return err
368367
}
368+
for i, newMD := range newSrcs {
369+
md := r.monitoredSources.GetMonitoredDatabase(newMD.Name)
370+
if md == nil {
371+
continue
372+
}
373+
if md.Equal(newMD.Source) {
374+
// replace with the existing connection if the source is the same
375+
newSrcs[i] = md
376+
continue
377+
}
378+
}
379+
r.monitoredSources = newSrcs
369380
r.logger.WithField("sources", len(r.monitoredSources)).Info("sources refreshed")
370381
return nil
371382
}
@@ -374,10 +385,9 @@ func (r *Reaper) LoadSources() (err error) {
374385
// every monitoredDbsDatastoreSyncIntervalSeconds (default 10min)
375386
func (r *Reaper) WriteMonitoredSources(ctx context.Context) {
376387
for {
377-
if len(monitoredDbCache) > 0 {
388+
if len(r.monitoredSources) > 0 {
378389
now := time.Now().UnixNano()
379-
monitoredDbCacheLock.RLock()
380-
for _, mdb := range monitoredDbCache {
390+
for _, mdb := range r.monitoredSources {
381391
db := metrics.NewMeasurement(now)
382392
db["tag_group"] = mdb.Group
383393
db["master_only"] = mdb.OnlyIfMaster
@@ -390,7 +400,6 @@ func (r *Reaper) WriteMonitoredSources(ctx context.Context) {
390400
Data: metrics.Measurements{db},
391401
}
392402
}
393-
monitoredDbCacheLock.RUnlock()
394403
}
395404
select {
396405
case <-time.After(time.Second * monitoredDbsDatastoreSyncIntervalSeconds):
@@ -464,11 +473,11 @@ func (r *Reaper) FetchMetric(ctx context.Context, md *sources.SourceConn, metric
464473
r.CheckForPGObjectChangesAndStore(ctx, md.Name, md, hostState) // TODO no hostState for Prometheus currently
465474
return nil, nil
466475
case recoMetricName:
467-
if data, err = GetRecommendations(ctx, md.Name, md); err != nil {
476+
if data, err = GetRecommendations(ctx, md); err != nil {
468477
return nil, err
469478
}
470479
default:
471-
if data, err = QueryMeasurements(ctx, md.Name, sql); err != nil {
480+
if data, err = QueryMeasurements(ctx, md, sql); err != nil {
472481
// let's soften errors to "info" from functions that expect the server to be a primary to reduce noise
473482
if strings.Contains(err.Error(), "recovery is in progress") && md.IsInRecovery {
474483
l.Debugf("[%s:%s] failed to fetch metrics: %s", md.Name, metricName, err)

internal/reaper/recommendations.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func GetAllRecoMetricsForVersion() (metrics.MetricDefs, error) {
3333
return mvpMap, nil
3434
}
3535

36-
func GetRecommendations(ctx context.Context, dbUnique string, md *sources.SourceConn) (metrics.Measurements, error) {
36+
func GetRecommendations(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) {
3737
retData := make(metrics.Measurements, 0)
3838
startTimeEpochNs := time.Now().UnixNano()
3939

@@ -42,7 +42,7 @@ func GetRecommendations(ctx context.Context, dbUnique string, md *sources.Source
4242
return nil, err
4343
}
4444
for _, mvp := range recoMetrics {
45-
data, e := QueryMeasurements(ctx, dbUnique, mvp.GetSQL(md.Version))
45+
data, e := QueryMeasurements(ctx, md, mvp.GetSQL(md.Version))
4646
if err != nil {
4747
err = errors.Join(err, e)
4848
continue

internal/sources/conn.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66
"math"
7-
"reflect"
87
"regexp"
98
"strconv"
109
"time"
@@ -275,17 +274,5 @@ func (mds SourceConns) SyncFromReader(r Reader) (newmds SourceConns, err error)
275274
if err != nil {
276275
return nil, err
277276
}
278-
newmds, err = srcs.ResolveDatabases()
279-
for i, newMD := range newmds {
280-
md := mds.GetMonitoredDatabase(newMD.Name)
281-
if md == nil {
282-
continue
283-
}
284-
if reflect.DeepEqual(md.Source, newMD.Source) {
285-
// replace with the existing connection if the source is the same
286-
newmds[i] = md
287-
continue
288-
}
289-
}
290-
return newmds, err
277+
return srcs.ResolveDatabases()
291278
}

internal/sources/conn_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -238,13 +238,10 @@ func TestMonitoredDatabases_SyncFromReader(t *testing.T) {
238238
// then resolve the databases
239239
mdbs, _ := mds.ResolveDatabases()
240240
assert.NotNil(t, mdbs, "ResolveDatabases() = nil, want not nil")
241-
// pretend that we have a connection
242-
mdbs[0].Conn = db
243241
// sync the databases and make sure they are the same
244242
newmdbs, _ := mdbs.SyncFromReader(reader)
245243
assert.NotNil(t, newmdbs)
246244
assert.Equal(t, mdbs[0].ConnStr, newmdbs[0].ConnStr)
247-
assert.Equal(t, db, newmdbs[0].Conn)
248245
// change the connection string and check if databases are updated
249246
reader.Sources[0].ConnStr = "postgres://user:password@localhost:5432/anotherdatabase"
250247
newmdbs, _ = mdbs.SyncFromReader(reader)

internal/sources/resolver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (s Source) ResolveDatabases() (SourceConns, error) {
4646
case SourcePostgresContinuous:
4747
return ResolveDatabasesFromPostgres(s)
4848
}
49-
return SourceConns{&SourceConn{Source: *(&s).Clone()}}, nil
49+
return SourceConns{&SourceConn{Source: s}}, nil
5050
}
5151

5252
type PatroniClusterMember struct {

0 commit comments

Comments
 (0)