Skip to content

Commit d1c3e55

Browse files
authored
[*] rename sources.MonitoredDatabase to SourceConn (#671)
Also move some connection specific functions from reaper to SourceConn methods, e.g. TryDiscoverExecutionEnv(), DoesFunctionExists()
1 parent b6bb121 commit d1c3e55

File tree

11 files changed

+349
-347
lines changed

11 files changed

+349
-347
lines changed

internal/cmdopts/cmdsource.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func (cmd *SourcePingCommand) Execute(args []string) error {
5454
case sources.SourcePostgresContinuous:
5555
_, e = sources.ResolveDatabasesFromPostgres(s)
5656
default:
57-
mdb := &sources.MonitoredDatabase{Source: s}
57+
mdb := &sources.SourceConn{Source: s}
5858
e = mdb.Ping(context.Background())
5959
}
6060
if e != nil {

internal/metrics/logparse.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func getFileWithNextModTimestamp(logsGlobPath, currentFile string) (string, erro
8484
}
8585

8686
// 1. add zero counts for severity levels that didn't have any occurrences in the log
87-
func eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal map[string]int64, mdb *sources.MonitoredDatabase) []MeasurementEnvelope {
87+
func eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal map[string]int64, mdb *sources.SourceConn) []MeasurementEnvelope {
8888
allSeverityCounts := make(Measurement)
8989
for _, s := range PgSeverities {
9090
parsedCount, ok := eventCounts[s]
@@ -110,7 +110,7 @@ func eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal map[string]i
110110
}}
111111
}
112112

113-
func ParseLogs(ctx context.Context, mdb *sources.MonitoredDatabase, realDbname, metricName string, configMap map[string]float64, storeCh chan<- []MeasurementEnvelope) {
113+
func ParseLogs(ctx context.Context, mdb *sources.SourceConn, realDbname, metricName string, configMap map[string]float64, storeCh chan<- []MeasurementEnvelope) {
114114

115115
var latest, previous, serverMessagesLang string
116116
var latestHandle *os.File

internal/reaper/cache.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/sirupsen/logrus"
1313
)
1414

15-
var monitoredDbCache map[string]*sources.MonitoredDatabase
15+
var monitoredDbCache map[string]*sources.SourceConn
1616
var monitoredDbCacheLock sync.RWMutex
1717
var MonitoredDatabasesSettings = make(map[string]MonitoredDatabaseSettings)
1818
var MonitoredDatabasesSettingsLock = sync.RWMutex{}
@@ -25,8 +25,8 @@ var lastDBSizeMB = make(map[string]int64)
2525
var lastDBSizeFetchTime = make(map[string]time.Time) // cached for DB_SIZE_CACHING_INTERVAL
2626
var lastDBSizeCheckLock sync.RWMutex
2727

28-
var prevLoopMonitoredDBs sources.MonitoredDatabases // to be able to detect DBs removed from config
29-
var undersizedDBs = make(map[string]bool) // DBs below the --min-db-size-mb limit, if set
28+
var prevLoopMonitoredDBs sources.SourceConns // to be able to detect DBs removed from config
29+
var undersizedDBs = make(map[string]bool) // DBs below the --min-db-size-mb limit, if set
3030
var undersizedDBsLock = sync.RWMutex{}
3131
var recoveryIgnoredDBs = make(map[string]bool) // DBs in recovery state and OnlyIfMaster specified in config
3232
var recoveryIgnoredDBsLock = sync.RWMutex{}
@@ -35,16 +35,16 @@ var hostMetricIntervalMap = make(map[string]float64) // [db1_metric] = 30
3535

3636
var lastSQLFetchError sync.Map
3737

38-
func InitPGVersionInfoFetchingLockIfNil(md *sources.MonitoredDatabase) {
38+
func InitPGVersionInfoFetchingLockIfNil(md *sources.SourceConn) {
3939
MonitoredDatabasesSettingsLock.Lock()
4040
if _, ok := MonitoredDatabasesSettingsGetLock[md.Name]; !ok {
4141
MonitoredDatabasesSettingsGetLock[md.Name] = &sync.RWMutex{}
4242
}
4343
MonitoredDatabasesSettingsLock.Unlock()
4444
}
4545

46-
func UpdateMonitoredDBCache(data sources.MonitoredDatabases) {
47-
monitoredDbCacheNew := make(map[string]*sources.MonitoredDatabase)
46+
func UpdateMonitoredDBCache(data sources.SourceConns) {
47+
monitoredDbCacheNew := make(map[string]*sources.SourceConn)
4848
for _, row := range data {
4949
monitoredDbCacheNew[row.Name] = row
5050
}
@@ -53,7 +53,7 @@ func UpdateMonitoredDBCache(data sources.MonitoredDatabases) {
5353
monitoredDbCacheLock.Unlock()
5454
}
5555

56-
func GetMonitoredDatabaseByUniqueName(name string) (*sources.MonitoredDatabase, error) {
56+
func GetMonitoredDatabaseByUniqueName(name string) (*sources.SourceConn, error) {
5757
monitoredDbCacheLock.RLock()
5858
defer monitoredDbCacheLock.RUnlock()
5959
md, exists := monitoredDbCache[name]

internal/reaper/database.go

Lines changed: 6 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,9 @@ func DBExecRead(ctx context.Context, conn db.PgxIface, sql string, args ...any)
2828
return nil, err
2929
}
3030

31-
func GetConnByUniqueName(dbUnique string) db.PgxIface {
32-
if md, err := GetMonitoredDatabaseByUniqueName(dbUnique); err == nil {
33-
return md.Conn
34-
}
35-
return nil
36-
}
37-
3831
func DBExecReadByDbUniqueName(ctx context.Context, dbUnique string, sql string, args ...any) (metrics.Measurements, error) {
3932
var conn db.PgxIface
40-
var md *sources.MonitoredDatabase
33+
var md *sources.SourceConn
4134
var err error
4235
var tx pgx.Tx
4336
if strings.TrimSpace(sql) == "" {
@@ -112,35 +105,6 @@ func DBGetSizeMB(ctx context.Context, dbUnique string) (int64, error) {
112105
return lastDBSize, nil
113106
}
114107

115-
func TryDiscoverExecutionEnv(ctx context.Context, dbUnique string) (execEnv string) {
116-
sql := `select /* pgwatch_generated */
117-
case
118-
when exists (select * from pg_settings where name = 'pg_qs.host_database' and setting = 'azure_sys') and version() ~* 'compiled by Visual C' then 'AZURE_SINGLE'
119-
when exists (select * from pg_settings where name = 'pg_qs.host_database' and setting = 'azure_sys') and version() ~* 'compiled by gcc' then 'AZURE_FLEXIBLE'
120-
when exists (select * from pg_settings where name = 'cloudsql.supported_extensions') then 'GOOGLE'
121-
else
122-
'UNKNOWN'
123-
end as exec_env`
124-
_ = GetConnByUniqueName(dbUnique).QueryRow(ctx, sql).Scan(&execEnv)
125-
return
126-
}
127-
128-
func GetDBTotalApproxSize(ctx context.Context, dbUnique string) (int64, error) {
129-
sqlApproxDBSize := `
130-
select /* pgwatch_generated */
131-
current_setting('block_size')::int8 * sum(relpages) as db_size_approx
132-
from
133-
pg_class c
134-
where /* works only for v9.1+*/
135-
c.relpersistence != 't';
136-
`
137-
data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlApproxDBSize)
138-
if err != nil {
139-
return 0, err
140-
}
141-
return data[0]["db_size_approx"].(int64), nil
142-
}
143-
144108
// VersionToInt parses a given version and returns an integer or
145109
// an error if unable to parse the version. Only parses valid semantic versions.
146110
// Performs checking that can find errors within the version.
@@ -158,7 +122,7 @@ func VersionToInt(version string) (v int) {
158122

159123
var rBouncerAndPgpoolVerMatch = regexp.MustCompile(`\d+\.+\d+`) // extract $major.minor from "4.1.2 (karasukiboshi)" or "PgBouncer 1.12.0"
160124

161-
func GetMonitoredDatabaseSettings(ctx context.Context, md *sources.MonitoredDatabase, noCache bool) (MonitoredDatabaseSettings, error) {
125+
func GetMonitoredDatabaseSettings(ctx context.Context, md *sources.SourceConn, noCache bool) (MonitoredDatabaseSettings, error) {
162126
var dbSettings MonitoredDatabaseSettings
163127
var dbNewSettings MonitoredDatabaseSettings
164128
var ok bool
@@ -233,12 +197,12 @@ FROM
233197
dbNewSettings.ExecEnv = dbSettings.ExecEnv // carry over as not likely to change ever
234198
} else {
235199
l.Debugf("determining the execution env...")
236-
dbNewSettings.ExecEnv = TryDiscoverExecutionEnv(ctx, md.Name)
200+
dbNewSettings.ExecEnv = md.DiscoverPlatform(ctx)
237201
}
238202

239203
// to work around poor Azure Single Server FS functions performance for some metrics + the --min-db-size-mb filter
240204
if dbNewSettings.ExecEnv == execEnvAzureSingle {
241-
if approxSize, err := GetDBTotalApproxSize(ctx, md.Name); err == nil {
205+
if approxSize, err := md.GetApproxSize(ctx); err == nil {
242206
dbNewSettings.ApproxDBSizeB = approxSize
243207
} else {
244208
dbNewSettings.ApproxDBSizeB = dbSettings.ApproxDBSizeB
@@ -811,21 +775,6 @@ func FetchMetricsPgpool(ctx context.Context, msg MetricFetchConfig, vme Monitore
811775
return retData, nil
812776
}
813777

814-
func DoesFunctionExists(ctx context.Context, dbUnique, functionName string) bool {
815-
log.GetLogger(ctx).Debug("Checking for function existence", dbUnique, functionName)
816-
sql := fmt.Sprintf("select /* pgwatch_generated */ 1 from pg_proc join pg_namespace n on pronamespace = n.oid where proname = '%s' and n.nspname = 'public'", functionName)
817-
data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sql)
818-
if err != nil {
819-
log.GetLogger(ctx).Error("Failed to check for function existence", dbUnique, functionName, err)
820-
return false
821-
}
822-
if len(data) > 0 {
823-
log.GetLogger(ctx).Debugf("Function %s exists on %s", functionName, dbUnique)
824-
return true
825-
}
826-
return false
827-
}
828-
829778
// Called once on daemon startup if some commonly wanted extension (most notably pg_stat_statements) is missing.
830779
// With newer Postgres version can even succeed if the user is not a real superuser due to some cloud-specific
831780
// whitelisting or "trusted extensions" (a feature from v13). Ignores errors.
@@ -866,7 +815,7 @@ func TryCreateMissingExtensions(ctx context.Context, dbUnique string, extensionN
866815
}
867816

868817
// Called once on daemon startup to try to create "metric fething helper" functions automatically
869-
func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.MonitoredDatabase) (err error) {
818+
func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.SourceConn) (err error) {
870819
metricConfig := func() map[string]float64 {
871820
if len(md.Metrics) > 0 {
872821
return md.Metrics
@@ -919,7 +868,7 @@ func GetGoPsutilDiskPG(ctx context.Context, dbUnique string) (metrics.Measuremen
919868
return psutil.GetGoPsutilDiskPG(data, dataTblsp)
920869
}
921870

922-
func CloseResourcesForRemovedMonitoredDBs(metricsWriter sinks.Writer, currentDBs, prevLoopDBs sources.MonitoredDatabases, shutDownDueToRoleChange map[string]bool) {
871+
func CloseResourcesForRemovedMonitoredDBs(metricsWriter sinks.Writer, currentDBs, prevLoopDBs sources.SourceConns, shutDownDueToRoleChange map[string]bool) {
923872
var curDBsMap = make(map[string]bool)
924873

925874
for _, curDB := range currentDBs {

internal/reaper/reaper.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
"github.com/cybertec-postgresql/pgwatch/v3/internal/sources"
1717
)
1818

19-
var monitoredSources = make(sources.MonitoredDatabases, 0)
19+
var monitoredSources = make(sources.SourceConns, 0)
2020
var hostLastKnownStatusInRecovery = make(map[string]bool) // isInRecovery
2121
var metricConfig map[string]float64 // set to host.Metrics or host.MetricsStandby (in case optional config defined and in recovery state
2222
var metricDefinitionMap *metrics.Metrics = &metrics.Metrics{}
@@ -68,7 +68,7 @@ func (r *Reaper) Reap(ctx context.Context) (err error) {
6868

6969
if DoesEmergencyTriggerfileExist(r.Metrics.EmergencyPauseTriggerfile) {
7070
logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", r.Metrics.EmergencyPauseTriggerfile)
71-
monitoredSources = make([]*sources.MonitoredDatabase, 0)
71+
monitoredSources = make([]*sources.SourceConn, 0)
7272
}
7373

7474
UpdateMonitoredDBCache(monitoredSources)
@@ -250,7 +250,7 @@ func (r *Reaper) Reap(ctx context.Context) (err error) {
250250
logger.Debug("checking if any workers need to be shut down...")
251251
for dbMetric, cancelFunc := range cancelFuncs {
252252
var currentMetricConfig map[string]float64
253-
var dbInfo *sources.MonitoredDatabase
253+
var dbInfo *sources.SourceConn
254254
var ok, dbRemovedFromConfig bool
255255
singleMetricDisabled := false
256256
splits := strings.Split(dbMetric, dbMetricJoinStr)
@@ -322,7 +322,7 @@ func (r *Reaper) Reap(ctx context.Context) (err error) {
322322

323323
// metrics.ControlMessage notifies of shutdown + interval change
324324
func (r *Reaper) reapMetricMeasurements(ctx context.Context,
325-
mdb *sources.MonitoredDatabase,
325+
mdb *sources.SourceConn,
326326
metricName string,
327327
configMap map[string]float64) {
328328

@@ -445,7 +445,7 @@ func StoreMetrics(metrics []metrics.MeasurementEnvelope, storageCh chan<- []metr
445445
return 0, nil
446446
}
447447

448-
func SyncMonitoredDBsToDatastore(ctx context.Context, monitoredDbs []*sources.MonitoredDatabase, persistenceChannel chan []metrics.MeasurementEnvelope) {
448+
func SyncMonitoredDBsToDatastore(ctx context.Context, monitoredDbs []*sources.SourceConn, persistenceChannel chan []metrics.MeasurementEnvelope) {
449449
if len(monitoredDbs) > 0 {
450450
msms := make([]metrics.MeasurementEnvelope, len(monitoredDbs))
451451
now := time.Now()
@@ -547,7 +547,7 @@ func FetchMetrics(ctx context.Context,
547547
var err error
548548
var sql string
549549
var data, cachedData metrics.Measurements
550-
var md *sources.MonitoredDatabase
550+
var md *sources.SourceConn
551551
var fromCache, isCacheable bool
552552

553553
md, err = GetMonitoredDatabaseByUniqueName(msg.DBUniqueName)

0 commit comments

Comments
 (0)