Skip to content

Commit 13b63c2

Browse files
authored
[+] add Reaper.WriteMonitoredSources() and QueryMeasurents() (#678)
* [+] add `Reaper.WriteMonitoredSources()` and `QueryMeasurents()` * [*] move check for emergency trigger file to `Reaper.LoadSources()`
1 parent f7d3eb4 commit 13b63c2

File tree

4 files changed

+80
-95
lines changed

4 files changed

+80
-95
lines changed

internal/reaper/cache.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ func (r *Reaper) LoadMetrics() (err error) {
9292

9393
// LoadSources loads sources from the reader
9494
func (r *Reaper) LoadSources() (err error) {
95+
if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
96+
r.logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
97+
monitoredSources = make([]*sources.SourceConn, 0)
98+
return nil
99+
}
95100
if monitoredSources, err = monitoredSources.SyncFromReader(r.SourcesReaderWriter); err != nil {
96101
return err
97102
}

internal/reaper/database.go

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,7 @@ import (
2020
"github.com/jackc/pgx/v5"
2121
)
2222

23-
func DBExecRead(ctx context.Context, conn db.PgxIface, sql string, args ...any) (metrics.Measurements, error) {
24-
rows, err := conn.Query(ctx, sql, args...)
25-
if err == nil {
26-
return pgx.CollectRows(rows, pgx.RowToMap)
27-
}
28-
return nil, err
29-
}
30-
31-
func DBExecReadByDbUniqueName(ctx context.Context, dbUnique string, sql string, args ...any) (metrics.Measurements, error) {
23+
func QueryMeasurements(ctx context.Context, dbUnique string, sql string, args ...any) (metrics.Measurements, error) {
3224
var conn db.PgxIface
3325
var md *sources.SourceConn
3426
var err error
@@ -50,9 +42,13 @@ func DBExecReadByDbUniqueName(ctx context.Context, dbUnique string, sql string,
5042
if err != nil {
5143
return nil, err
5244
}
53-
return DBExecRead(ctx, tx, sql, args...)
45+
conn = tx
46+
}
47+
rows, err := conn.Query(ctx, sql, args...)
48+
if err == nil {
49+
return pgx.CollectRows(rows, pgx.RowToMap)
5450
}
55-
return DBExecRead(ctx, conn, sql, args...)
51+
return nil, err
5652
}
5753

5854
const (
@@ -80,7 +76,7 @@ func DBGetSizeMB(ctx context.Context, dbUnique string) (int64, error) {
8076
if err != nil || (ver.ExecEnv != execEnvAzureSingle) || (ver.ExecEnv == execEnvAzureSingle && ver.ApproxDBSizeB < 1e12) {
8177
log.GetLogger(ctx).Debugf("[%s] determining DB size ...", dbUnique)
8278

83-
data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlDbSize) // can take some time on ancient FS, use 300s stmt timeout
79+
data, err := QueryMeasurements(ctx, dbUnique, sqlDbSize) // can take some time on ancient FS, use 300s stmt timeout
8480
if err != nil {
8581
log.GetLogger(ctx).Errorf("[%s] failed to determine DB size...cannot apply --min-db-size-mb flag. err: %v ...", dbUnique, err)
8682
return 0, err
@@ -217,7 +213,7 @@ FROM
217213
}
218214

219215
l.Debugf("[%s] determining installed extensions info...", md.Name)
220-
data, err := DBExecReadByDbUniqueName(ctx, md.Name, sqlExtensions)
216+
data, err := QueryMeasurements(ctx, md.Name, sqlExtensions)
221217
if err != nil {
222218
l.Errorf("[%s] failed to determine installed extensions info: %v", md.Name, err)
223219
} else {
@@ -259,7 +255,7 @@ func DetectSprocChanges(ctx context.Context, dbUnique string, vme MonitoredDatab
259255
return changeCounts
260256
}
261257

262-
data, err := DBExecReadByDbUniqueName(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
258+
data, err := QueryMeasurements(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
263259
if err != nil {
264260
log.GetLogger(ctx).Error("could not read sproc_hashes from monitored host: ", dbUnique, ", err:", err)
265261
return changeCounts
@@ -343,7 +339,7 @@ func DetectTableChanges(ctx context.Context, dbUnique string, vme MonitoredDatab
343339
return changeCounts
344340
}
345341

346-
data, err := DBExecReadByDbUniqueName(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
342+
data, err := QueryMeasurements(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
347343
if err != nil {
348344
log.GetLogger(ctx).Error("could not read table_hashes from monitored host:", dbUnique, ", err:", err)
349345
return changeCounts
@@ -427,7 +423,7 @@ func DetectIndexChanges(ctx context.Context, dbUnique string, vme MonitoredDatab
427423
return changeCounts
428424
}
429425

430-
data, err := DBExecReadByDbUniqueName(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
426+
data, err := QueryMeasurements(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
431427
if err != nil {
432428
log.GetLogger(ctx).Error("could not read index_hashes from monitored host:", dbUnique, ", err:", err)
433429
return changeCounts
@@ -510,7 +506,7 @@ func DetectPrivilegeChanges(ctx context.Context, dbUnique string, vme MonitoredD
510506
}
511507

512508
// returns rows of: object_type, tag_role, tag_object, privilege_type
513-
data, err := DBExecReadByDbUniqueName(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
509+
data, err := QueryMeasurements(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
514510
if err != nil {
515511
log.GetLogger(ctx).Errorf("[%s][%s] failed to fetch object privileges info: %v", dbUnique, specialMetricChangeEvents, err)
516512
return changeCounts
@@ -591,7 +587,7 @@ func DetectConfigurationChanges(ctx context.Context, dbUnique string, vme Monito
591587
return changeCounts
592588
}
593589

594-
data, err := DBExecReadByDbUniqueName(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
590+
data, err := QueryMeasurements(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
595591
if err != nil {
596592
log.GetLogger(ctx).Errorf("[%s][%s] could not read configuration_hashes from monitored host: %v", dbUnique, specialMetricChangeEvents, err)
597593
return changeCounts
@@ -638,7 +634,8 @@ func DetectConfigurationChanges(ctx context.Context, dbUnique string, vme Monito
638634
return changeCounts
639635
}
640636

641-
func CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) {
637+
func (r *Reaper) CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, hostState map[string]map[string]string) {
638+
storageCh := r.measurementCh
642639
sprocCounts := DetectSprocChanges(ctx, dbUnique, vme, storageCh, hostState) // TODO some of Detect*() code could be unified...
643640
tableCounts := DetectTableChanges(ctx, dbUnique, vme, storageCh, hostState)
644641
indexCounts := DetectIndexChanges(ctx, dbUnique, vme, storageCh, hostState)
@@ -691,7 +688,7 @@ func FetchMetricsPgpool(ctx context.Context, msg MetricFetchConfig, vme Monitore
691688

692689
for _, sql := range sqlLines {
693690
if strings.HasPrefix(sql, "SHOW POOL_NODES") {
694-
data, err := DBExecReadByDbUniqueName(ctx, msg.DBUniqueName, sql)
691+
data, err := QueryMeasurements(ctx, msg.DBUniqueName, sql)
695692
if err != nil {
696693
log.GetLogger(ctx).Errorf("[%s][%s] Could not fetch PgPool statistics: %v", msg.DBUniqueName, msg.MetricName, err)
697694
return data, err
@@ -745,7 +742,7 @@ func FetchMetricsPgpool(ctx context.Context, msg MetricFetchConfig, vme Monitore
745742
continue
746743
}
747744

748-
data, err := DBExecReadByDbUniqueName(ctx, msg.DBUniqueName, sql)
745+
data, err := QueryMeasurements(ctx, msg.DBUniqueName, sql)
749746
if err != nil {
750747
log.GetLogger(ctx).Errorf("[%s][%s] Could not fetch PgPool statistics: %v", msg.DBUniqueName, msg.MetricName, err)
751748
continue
@@ -783,7 +780,7 @@ func TryCreateMissingExtensions(ctx context.Context, dbUnique string, extensionN
783780
extsCreated := make([]string, 0)
784781

785782
// For security reasons don't allow to execute random strings but check that it's an existing extension
786-
data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlAvailable)
783+
data, err := QueryMeasurements(ctx, dbUnique, sqlAvailable)
787784
if err != nil {
788785
log.GetLogger(ctx).Infof("[%s] Failed to get a list of available extensions: %v", dbUnique, err)
789786
return extsCreated
@@ -803,7 +800,7 @@ func TryCreateMissingExtensions(ctx context.Context, dbUnique string, extensionN
803800
log.GetLogger(ctx).Errorf("[%s] Requested extension %s not available on instance, cannot try to create...", dbUnique, extToCreate)
804801
} else {
805802
sqlCreateExt := `create extension ` + extToCreate
806-
_, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlCreateExt)
803+
_, err := QueryMeasurements(ctx, dbUnique, sqlCreateExt)
807804
if err != nil {
808805
log.GetLogger(ctx).Errorf("[%s] Failed to create extension %s (based on --try-create-listed-exts-if-missing input): %v", dbUnique, extToCreate, err)
809806
}
@@ -856,12 +853,12 @@ func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.SourceConn
856853
func GetGoPsutilDiskPG(ctx context.Context, dbUnique string) (metrics.Measurements, error) {
857854
sql := `select current_setting('data_directory') as dd, current_setting('log_directory') as ld, current_setting('server_version_num')::int as pgver`
858855
sqlTS := `select spcname::text as name, pg_catalog.pg_tablespace_location(oid) as location from pg_catalog.pg_tablespace where not spcname like any(array[E'pg\\_%'])`
859-
data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sql)
856+
data, err := QueryMeasurements(ctx, dbUnique, sql)
860857
if err != nil || len(data) == 0 {
861858
log.GetLogger(ctx).Errorf("Failed to determine relevant PG disk paths via SQL: %v", err)
862859
return nil, err
863860
}
864-
dataTblsp, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlTS)
861+
dataTblsp, err := QueryMeasurements(ctx, dbUnique, sqlTS)
865862
if err != nil {
866863
log.GetLogger(ctx).Infof("Failed to determine relevant PG tablespace paths via SQL: %v", err)
867864
}

internal/reaper/reaper.go

Lines changed: 52 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package reaper
33
import (
44
"context"
55
"fmt"
6+
"maps"
67
"slices"
78
"strings"
89
"sync"
@@ -56,6 +57,8 @@ func (r *Reaper) Reap(ctx context.Context) (err error) {
5657
logger := r.logger
5758

5859
go r.WriteMeasurements(ctx)
60+
go r.WriteMonitoredSources(ctx)
61+
5962
r.ready.Store(true)
6063

6164
for { //main loop
@@ -66,18 +69,7 @@ func (r *Reaper) Reap(ctx context.Context) (err error) {
6669
logger.WithError(err).Error("could not refresh metric definitions, using last valid cache")
6770
}
6871

69-
if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
70-
logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
71-
monitoredSources = make([]*sources.SourceConn, 0)
72-
}
73-
7472
UpdateMonitoredDBCache(monitoredSources)
75-
76-
if lastMonitoredDBsUpdate.IsZero() || lastMonitoredDBsUpdate.Before(time.Now().Add(-1*time.Second*monitoredDbsDatastoreSyncIntervalSeconds)) {
77-
go SyncMonitoredDBsToDatastore(ctx, monitoredSources, r.measurementCh)
78-
lastMonitoredDBsUpdate = time.Now()
79-
}
80-
8173
hostsToShutDownDueToRoleChange := make(map[string]bool) // hosts went from master to standby and have "only if master" set
8274
for _, monitoredSource := range monitoredSources {
8375
srcL := logger.WithField("source", monitoredSource.Name)
@@ -362,7 +354,7 @@ func (r *Reaper) reapMetricMeasurements(ctx context.Context, mdb *sources.Source
362354
}
363355
t1 := time.Now()
364356
if metricStoreMessages == nil {
365-
metricStoreMessages, err = FetchMetrics(ctx, mfm, hostState, r.measurementCh, r.Options)
357+
metricStoreMessages, err = r.FetchMetrics(ctx, mfm, hostState)
366358
}
367359
t2 := time.Now()
368360

@@ -420,64 +412,63 @@ func (r *Reaper) reapMetricMeasurements(ctx context.Context, mdb *sources.Source
420412
}
421413
}
422414

423-
func StoreMetrics(metrics []metrics.MeasurementEnvelope, storageCh chan<- []metrics.MeasurementEnvelope) (int, error) {
424-
if len(metrics) > 0 {
425-
storageCh <- metrics
426-
return len(metrics), nil
415+
// WriteMonitoredSources writes actively monitored DBs listing to sinks
416+
// every monitoredDbsDatastoreSyncIntervalSeconds (default 10min)
417+
func (r *Reaper) WriteMonitoredSources(ctx context.Context) {
418+
if len(monitoredDbCache) == 0 {
419+
return
427420
}
428-
return 0, nil
429-
}
430-
431-
func SyncMonitoredDBsToDatastore(ctx context.Context, monitoredDbs []*sources.SourceConn, persistenceChannel chan []metrics.MeasurementEnvelope) {
432-
if len(monitoredDbs) > 0 {
433-
msms := make([]metrics.MeasurementEnvelope, len(monitoredDbs))
434-
now := time.Now()
435-
436-
for _, mdb := range monitoredDbs {
437-
db := metrics.Measurement{
438-
"tag_group": mdb.Group,
439-
"master_only": mdb.OnlyIfMaster,
440-
"epoch_ns": now.UnixNano(),
441-
}
442-
for k, v := range mdb.CustomTags {
443-
db[tagPrefix+k] = v
444-
}
445-
msms = append(msms, metrics.MeasurementEnvelope{
446-
DBName: mdb.Name,
447-
MetricName: monitoredDbsDatastoreSyncMetricName,
448-
Data: metrics.Measurements{db},
449-
})
421+
msms := make([]metrics.MeasurementEnvelope, len(monitoredDbCache))
422+
now := time.Now().UnixNano()
423+
424+
monitoredDbCacheLock.RLock()
425+
for _, mdb := range monitoredDbCache {
426+
db := metrics.Measurement{
427+
"tag_group": mdb.Group,
428+
"master_only": mdb.OnlyIfMaster,
429+
"epoch_ns": now,
450430
}
451-
select {
452-
case persistenceChannel <- msms:
453-
//continue
454-
case <-ctx.Done():
455-
return
431+
for k, v := range mdb.CustomTags {
432+
db[tagPrefix+k] = v
456433
}
434+
msms = append(msms, metrics.MeasurementEnvelope{
435+
DBName: mdb.Name,
436+
MetricName: monitoredDbsDatastoreSyncMetricName,
437+
Data: metrics.Measurements{db},
438+
})
439+
}
440+
monitoredDbCacheLock.RUnlock()
441+
442+
select {
443+
case <-time.After(time.Second * monitoredDbsDatastoreSyncIntervalSeconds):
444+
// continue
445+
case r.measurementCh <- msms:
446+
//continue
447+
case <-ctx.Done():
448+
return
457449
}
458450
}
459451

460-
func AddDbnameSysinfoIfNotExistsToQueryResultData(data metrics.Measurements, ver MonitoredDatabaseSettings, opts *cmdopts.Options) metrics.Measurements {
452+
func (r *Reaper) AddSysinfoToMeasurements(data metrics.Measurements, ver MonitoredDatabaseSettings) metrics.Measurements {
461453
enrichedData := make(metrics.Measurements, 0)
462454
for _, dr := range data {
463-
if opts.Sinks.RealDbnameField > "" && ver.RealDbname > "" {
464-
old, ok := dr[opts.Sinks.RealDbnameField]
455+
if r.Sinks.RealDbnameField > "" && ver.RealDbname > "" {
456+
old, ok := dr[r.Sinks.RealDbnameField]
465457
if !ok || old == "" {
466-
dr[opts.Sinks.RealDbnameField] = ver.RealDbname
458+
dr[r.Sinks.RealDbnameField] = ver.RealDbname
467459
}
468460
}
469-
if opts.Sinks.SystemIdentifierField > "" && ver.SystemIdentifier > "" {
470-
old, ok := dr[opts.Sinks.SystemIdentifierField]
461+
if r.Sinks.SystemIdentifierField > "" && ver.SystemIdentifier > "" {
462+
old, ok := dr[r.Sinks.SystemIdentifierField]
471463
if !ok || old == "" {
472-
dr[opts.Sinks.SystemIdentifierField] = ver.SystemIdentifier
464+
dr[r.Sinks.SystemIdentifierField] = ver.SystemIdentifier
473465
}
474466
}
475467
enrichedData = append(enrichedData, dr)
476468
}
477469
return enrichedData
478470
}
479471

480-
var lastMonitoredDBsUpdate time.Time
481472
var instanceMetricCache = make(map[string](metrics.Measurements)) // [dbUnique+metric]lastly_fetched_data
482473
var instanceMetricCacheLock = sync.RWMutex{}
483474
var instanceMetricCacheTimestamp = make(map[string]time.Time) // [dbUnique+metric]last_fetch_time
@@ -506,23 +497,15 @@ func PutToInstanceCache(msg MetricFetchConfig, data metrics.Measurements) {
506497

507498
func deepCopyMetricData(data metrics.Measurements) metrics.Measurements {
508499
newData := make(metrics.Measurements, len(data))
509-
510500
for i, dr := range data {
511-
newRow := make(map[string]any)
512-
for k, v := range dr {
513-
newRow[k] = v
514-
}
515-
newData[i] = newRow
501+
newData[i] = maps.Clone(dr)
516502
}
517-
518503
return newData
519504
}
520505

521-
func FetchMetrics(ctx context.Context,
506+
func (r *Reaper) FetchMetrics(ctx context.Context,
522507
msg MetricFetchConfig,
523-
hostState map[string]map[string]string,
524-
storageCh chan<- []metrics.MeasurementEnvelope,
525-
opts *cmdopts.Options) ([]metrics.MeasurementEnvelope, error) {
508+
hostState map[string]map[string]string) ([]metrics.MeasurementEnvelope, error) {
526509

527510
var dbSettings MonitoredDatabaseSettings
528511
var dbVersion int
@@ -573,8 +556,8 @@ func FetchMetrics(ctx context.Context,
573556
}
574557

575558
isCacheable = IsCacheableMetric(msg, mvp)
576-
if isCacheable && opts.Metrics.InstanceLevelCacheMaxSeconds > 0 && msg.Interval.Seconds() > float64(opts.Metrics.InstanceLevelCacheMaxSeconds) {
577-
cachedData = GetFromInstanceCacheIfNotOlderThanSeconds(msg, opts.Metrics.InstanceLevelCacheMaxSeconds)
559+
if isCacheable && r.Metrics.InstanceLevelCacheMaxSeconds > 0 && msg.Interval.Seconds() > float64(r.Metrics.InstanceLevelCacheMaxSeconds) {
560+
cachedData = GetFromInstanceCacheIfNotOlderThanSeconds(msg, r.Metrics.InstanceLevelCacheMaxSeconds)
578561
if len(cachedData) > 0 {
579562
fromCache = true
580563
goto send_to_storageChannel
@@ -595,7 +578,7 @@ func FetchMetrics(ctx context.Context,
595578
}
596579

597580
if msg.MetricName == specialMetricChangeEvents { // special handling, multiple queries + stateful
598-
CheckForPGObjectChangesAndStore(ctx, msg.DBUniqueName, dbSettings, storageCh, hostState) // TODO no hostState for Prometheus currently
581+
r.CheckForPGObjectChangesAndStore(ctx, msg.DBUniqueName, dbSettings, hostState) // TODO no hostState for Prometheus currently
599582
} else if msg.MetricName == recoMetricName {
600583
if data, err = GetRecommendations(ctx, msg.DBUniqueName, dbSettings); err != nil {
601584
return nil, err
@@ -605,7 +588,7 @@ func FetchMetrics(ctx context.Context,
605588
return nil, err
606589
}
607590
} else {
608-
data, err = DBExecReadByDbUniqueName(ctx, msg.DBUniqueName, sql)
591+
data, err = QueryMeasurements(ctx, msg.DBUniqueName, sql)
609592

610593
if err != nil {
611594
// let's soften errors to "info" from functions that expect the server to be a primary to reduce noise
@@ -636,17 +619,17 @@ func FetchMetrics(ctx context.Context,
636619
log.GetLogger(ctx).WithFields(map[string]any{"source": msg.DBUniqueName, "metric": msg.MetricName, "rows": len(data)}).Info("measurements fetched")
637620
}
638621

639-
if isCacheable && opts.Metrics.InstanceLevelCacheMaxSeconds > 0 && msg.Interval.Seconds() > float64(opts.Metrics.InstanceLevelCacheMaxSeconds) {
622+
if isCacheable && r.Metrics.InstanceLevelCacheMaxSeconds > 0 && msg.Interval.Seconds() > float64(r.Metrics.InstanceLevelCacheMaxSeconds) {
640623
PutToInstanceCache(msg, data)
641624
}
642625

643626
send_to_storageChannel:
644627

645-
if (opts.Sinks.RealDbnameField > "" || opts.Sinks.SystemIdentifierField > "") && msg.Source == sources.SourcePostgres {
628+
if (r.Sinks.RealDbnameField > "" || r.Sinks.SystemIdentifierField > "") && msg.Source == sources.SourcePostgres {
646629
MonitoredDatabasesSettingsLock.RLock()
647630
ver := MonitoredDatabasesSettings[msg.DBUniqueName]
648631
MonitoredDatabasesSettingsLock.RUnlock()
649-
data = AddDbnameSysinfoIfNotExistsToQueryResultData(data, ver, opts)
632+
data = r.AddSysinfoToMeasurements(data, ver)
650633
}
651634

652635
if mvp.StorageName != "" {

0 commit comments

Comments
 (0)