Skip to content

Commit 29c2e74

Browse files
authored
[+] decrease memory allocations for metrics.MeasurementEnvelope (#768)
Allocating additional slices increases a memory usage. Instead we can send metrics.MeasurementEnvelope one by one immediately after receiving. There is no need to collect them beforehand. For 12 postgres instances monitored with full preset this change saves us ~4-9% of allocations.
1 parent 861c882 commit 29c2e74

File tree

12 files changed

+118
-121
lines changed

12 files changed

+118
-121
lines changed

internal/metrics/logparse.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func getFileWithNextModTimestamp(logsGlobPath, currentFile string) (string, erro
8484
}
8585

8686
// 1. add zero counts for severity levels that didn't have any occurrences in the log
87-
func eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal map[string]int64, mdb *sources.SourceConn) []MeasurementEnvelope {
87+
func eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal map[string]int64, mdb *sources.SourceConn) MeasurementEnvelope {
8888
allSeverityCounts := NewMeasurement(time.Now().UnixNano())
8989
for _, s := range PgSeverities {
9090
parsedCount, ok := eventCounts[s]
@@ -100,16 +100,16 @@ func eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal map[string]i
100100
allSeverityCounts[strings.ToLower(s)+"_total"] = 0
101101
}
102102
}
103-
return []MeasurementEnvelope{{
103+
return MeasurementEnvelope{
104104
DBName: mdb.Name,
105105
SourceType: string(mdb.Kind),
106106
MetricName: specialMetricServerLogEventCounts,
107107
Data: Measurements{allSeverityCounts},
108108
CustomTags: mdb.CustomTags,
109-
}}
109+
}
110110
}
111111

112-
func ParseLogs(ctx context.Context, mdb *sources.SourceConn, realDbname string, interval float64, storeCh chan<- []MeasurementEnvelope) {
112+
func ParseLogs(ctx context.Context, mdb *sources.SourceConn, realDbname string, interval float64, storeCh chan<- MeasurementEnvelope) {
113113

114114
var latest, previous, serverMessagesLang string
115115
var latestHandle *os.File
@@ -305,8 +305,7 @@ func ParseLogs(ctx context.Context, mdb *sources.SourceConn, realDbname string,
305305
send_to_storage_if_needed:
306306
if lastSendTime.IsZero() || lastSendTime.Before(time.Now().Add(-1*time.Second*time.Duration(interval))) {
307307
logger.Debugf("Sending log event counts for last interval to storage channel. Local eventcounts: %+v, global eventcounts: %+v", eventCounts, eventCountsTotal)
308-
metricStoreMessages := eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal, mdb)
309-
storeCh <- metricStoreMessages
308+
storeCh <- eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal, mdb)
310309
zeroEventCounts(eventCounts)
311310
zeroEventCounts(eventCountsTotal)
312311
lastSendTime = time.Now()

internal/reaper/database.go

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func QueryMeasurements(ctx context.Context, dbUnique string, sql string, args ..
5050
return nil, err
5151
}
5252

53-
func DetectSprocChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
53+
func DetectSprocChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
5454
detectedChanges := make(metrics.Measurements, 0)
5555
var firstRun bool
5656
var changeCounts ChangeDetectionResults
@@ -122,13 +122,18 @@ func DetectSprocChanges(ctx context.Context, md *sources.SourceConn, storageCh c
122122
}
123123
log.GetLogger(ctx).Debugf("[%s][%s] detected %d sproc changes", md.Name, specialMetricChangeEvents, len(detectedChanges))
124124
if len(detectedChanges) > 0 {
125-
storageCh <- []metrics.MeasurementEnvelope{{DBName: md.Name, MetricName: "sproc_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
125+
storageCh <- metrics.MeasurementEnvelope{
126+
DBName: md.Name,
127+
MetricName: "sproc_changes",
128+
Data: detectedChanges,
129+
CustomTags: md.CustomTags,
130+
}
126131
}
127132

128133
return changeCounts
129134
}
130135

131-
func DetectTableChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
136+
func DetectTableChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
132137
detectedChanges := make(metrics.Measurements, 0)
133138
var firstRun bool
134139
var changeCounts ChangeDetectionResults
@@ -200,13 +205,18 @@ func DetectTableChanges(ctx context.Context, md *sources.SourceConn, storageCh c
200205

201206
log.GetLogger(ctx).Debugf("[%s][%s] detected %d table changes", md.Name, specialMetricChangeEvents, len(detectedChanges))
202207
if len(detectedChanges) > 0 {
203-
storageCh <- []metrics.MeasurementEnvelope{{DBName: md.Name, MetricName: "table_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
208+
storageCh <- metrics.MeasurementEnvelope{
209+
DBName: md.Name,
210+
MetricName: "table_changes",
211+
Data: detectedChanges,
212+
CustomTags: md.CustomTags,
213+
}
204214
}
205215

206216
return changeCounts
207217
}
208218

209-
func DetectIndexChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
219+
func DetectIndexChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
210220
detectedChanges := make(metrics.Measurements, 0)
211221
var firstRun bool
212222
var changeCounts ChangeDetectionResults
@@ -276,13 +286,18 @@ func DetectIndexChanges(ctx context.Context, md *sources.SourceConn, storageCh c
276286
}
277287
log.GetLogger(ctx).Debugf("[%s][%s] detected %d index changes", md.Name, specialMetricChangeEvents, len(detectedChanges))
278288
if len(detectedChanges) > 0 {
279-
storageCh <- []metrics.MeasurementEnvelope{{DBName: md.Name, MetricName: "index_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
289+
storageCh <- metrics.MeasurementEnvelope{
290+
DBName: md.Name,
291+
MetricName: "index_changes",
292+
Data: detectedChanges,
293+
CustomTags: md.CustomTags,
294+
}
280295
}
281296

282297
return changeCounts
283298
}
284299

285-
func DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
300+
func DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
286301
detectedChanges := make(metrics.Measurements, 0)
287302
var firstRun bool
288303
var changeCounts ChangeDetectionResults
@@ -346,19 +361,18 @@ func DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn, storage
346361

347362
log.GetLogger(ctx).Debugf("[%s][%s] detected %d object privilege changes...", md.Name, specialMetricChangeEvents, len(detectedChanges))
348363
if len(detectedChanges) > 0 {
349-
storageCh <- []metrics.MeasurementEnvelope{
350-
{
351-
DBName: md.Name,
352-
MetricName: "privilege_changes",
353-
Data: detectedChanges,
354-
CustomTags: md.CustomTags,
355-
}}
364+
storageCh <- metrics.MeasurementEnvelope{
365+
DBName: md.Name,
366+
MetricName: "privilege_changes",
367+
Data: detectedChanges,
368+
CustomTags: md.CustomTags,
369+
}
356370
}
357371

358372
return changeCounts
359373
}
360374

361-
func DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- []metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
375+
func DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn, storageCh chan<- metrics.MeasurementEnvelope, hostState map[string]map[string]string) ChangeDetectionResults {
362376
detectedChanges := make(metrics.Measurements, 0)
363377
var firstRun bool
364378
var changeCounts ChangeDetectionResults
@@ -410,12 +424,12 @@ func DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn, sto
410424

411425
log.GetLogger(ctx).Debugf("[%s][%s] detected %d configuration changes", md.Name, specialMetricChangeEvents, len(detectedChanges))
412426
if len(detectedChanges) > 0 {
413-
storageCh <- []metrics.MeasurementEnvelope{{
427+
storageCh <- metrics.MeasurementEnvelope{
414428
DBName: md.Name,
415429
MetricName: "configuration_changes",
416430
Data: detectedChanges,
417431
CustomTags: md.CustomTags,
418-
}}
432+
}
419433
}
420434

421435
return changeCounts
@@ -455,12 +469,13 @@ func (r *Reaper) CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique s
455469
influxEntry["details"] = message
456470
detectedChangesSummary = append(detectedChangesSummary, influxEntry)
457471
md, _ := GetMonitoredDatabaseByUniqueName(ctx, dbUnique)
458-
storageCh <- []metrics.MeasurementEnvelope{{DBName: dbUnique,
472+
storageCh <- metrics.MeasurementEnvelope{
473+
DBName: dbUnique,
459474
SourceType: string(md.Kind),
460475
MetricName: "object_changes",
461476
Data: detectedChangesSummary,
462477
CustomTags: md.CustomTags,
463-
}}
478+
}
464479

465480
}
466481
}

internal/reaper/reaper.go

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ var metricDefs = NewConcurrentMetricDefs()
2424
type Reaper struct {
2525
*cmdopts.Options
2626
ready atomic.Bool
27-
measurementCh chan []metrics.MeasurementEnvelope
27+
measurementCh chan metrics.MeasurementEnvelope
2828
measurementCache *InstanceMetricCache
2929
logger log.Logger
3030
monitoredSources sources.SourceConns
@@ -36,7 +36,7 @@ type Reaper struct {
3636
func NewReaper(ctx context.Context, opts *cmdopts.Options) (r *Reaper) {
3737
return &Reaper{
3838
Options: opts,
39-
measurementCh: make(chan []metrics.MeasurementEnvelope, 10000),
39+
measurementCh: make(chan metrics.MeasurementEnvelope, 10000),
4040
measurementCache: NewInstanceMetricCache(),
4141
logger: log.GetLogger(ctx),
4242
monitoredSources: make(sources.SourceConns, 0),
@@ -270,7 +270,6 @@ func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceC
270270
var lastErrorNotificationTime time.Time
271271
var err error
272272
var ok bool
273-
var envelopes []metrics.MeasurementEnvelope
274273

275274
failedFetches := 0
276275
lastDBVersionFetchTime := time.Unix(0, 0) // check DB ver. ev. 5 min
@@ -321,7 +320,7 @@ func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceC
321320
lastErrorNotificationTime = time.Now()
322321
}
323322
} else if metricStoreMessages != nil && len(metricStoreMessages.Data) > 0 {
324-
envelopes = []metrics.MeasurementEnvelope{*metricStoreMessages}
323+
r.measurementCh <- *metricStoreMessages
325324
// pick up "server restarted" events here to avoid doing extra selects from CheckForPGObjectChangesAndStore code
326325
if metricName == "db_stats" {
327326
postmasterUptimeS, ok := (metricStoreMessages.Data)[0]["postmaster_uptime_s"]
@@ -334,20 +333,18 @@ func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceC
334333
entry := metrics.NewMeasurement(metricStoreMessages.Data.GetEpoch())
335334
entry["details"] = message
336335
detectedChangesSummary = append(detectedChangesSummary, entry)
337-
envelopes = append(envelopes,
338-
metrics.MeasurementEnvelope{
339-
DBName: md.Name,
340-
SourceType: string(md.Kind),
341-
MetricName: "object_changes",
342-
Data: detectedChangesSummary,
343-
CustomTags: metricStoreMessages.CustomTags,
344-
})
336+
r.measurementCh <- metrics.MeasurementEnvelope{
337+
DBName: md.Name,
338+
SourceType: string(md.Kind),
339+
MetricName: "object_changes",
340+
Data: detectedChangesSummary,
341+
CustomTags: metricStoreMessages.CustomTags,
342+
}
345343
}
346344
}
347345
lastUptimeS = postmasterUptimeS.(int64)
348346
}
349347
}
350-
r.measurementCh <- envelopes
351348
}
352349

353350
select {
@@ -378,9 +375,7 @@ func (r *Reaper) LoadSources() (err error) {
378375
func (r *Reaper) WriteMonitoredSources(ctx context.Context) {
379376
for {
380377
if len(monitoredDbCache) > 0 {
381-
msms := make([]metrics.MeasurementEnvelope, len(monitoredDbCache))
382378
now := time.Now().UnixNano()
383-
384379
monitoredDbCacheLock.RLock()
385380
for _, mdb := range monitoredDbCache {
386381
db := metrics.NewMeasurement(now)
@@ -389,14 +384,13 @@ func (r *Reaper) WriteMonitoredSources(ctx context.Context) {
389384
for k, v := range mdb.CustomTags {
390385
db[metrics.TagPrefix+k] = v
391386
}
392-
msms = append(msms, metrics.MeasurementEnvelope{
387+
r.measurementCh <- metrics.MeasurementEnvelope{
393388
DBName: mdb.Name,
394389
MetricName: monitoredDbsDatastoreSyncMetricName,
395390
Data: metrics.Measurements{db},
396-
})
391+
}
397392
}
398393
monitoredDbCacheLock.RUnlock()
399-
r.measurementCh <- msms
400394
}
401395
select {
402396
case <-time.After(time.Second * monitoredDbsDatastoreSyncIntervalSeconds):

internal/sinks/json.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,28 +30,28 @@ func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error) {
3030
return jw, nil
3131
}
3232

33-
func (jw *JSONWriter) Write(msgs []metrics.MeasurementEnvelope) error {
33+
func (jw *JSONWriter) Write(msg metrics.MeasurementEnvelope) error {
3434
if jw.ctx.Err() != nil {
3535
return jw.ctx.Err()
3636
}
37-
if len(msgs) == 0 {
37+
if len(msg.Data) == 0 {
3838
return nil
3939
}
4040
enc := json.NewEncoder(jw.lw)
4141
t1 := time.Now()
4242
written := 0
43-
for _, msg := range msgs {
44-
dataRow := map[string]any{
45-
"metric": msg.MetricName,
46-
"data": msg.Data,
47-
"dbname": msg.DBName,
48-
"custom_tags": msg.CustomTags,
49-
}
50-
if err := enc.Encode(dataRow); err != nil {
51-
return err
52-
}
53-
written += len(msg.Data)
43+
44+
dataRow := map[string]any{
45+
"metric": msg.MetricName,
46+
"data": msg.Data,
47+
"dbname": msg.DBName,
48+
"custom_tags": msg.CustomTags,
49+
}
50+
if err := enc.Encode(dataRow); err != nil {
51+
return err
5452
}
53+
written += len(msg.Data)
54+
5555
diff := time.Since(t1)
5656
log.GetLogger(jw.ctx).WithField("rows", written).WithField("elapsed", diff).Info("measurements written")
5757
return nil

internal/sinks/json_test.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ import (
88

99
"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
1010
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
1112
)
1213

1314
func TestJSONWriter_Write(t *testing.T) {
1415
a := assert.New(t)
16+
r := require.New(t)
1517
// Define test data
1618
msg := metrics.MeasurementEnvelope{
1719
MetricName: "test_metric",
@@ -25,23 +27,23 @@ func TestJSONWriter_Write(t *testing.T) {
2527
tempFile := t.TempDir() + "/test.json"
2628
ctx, cancel := context.WithCancel(context.Background())
2729
jw, err := NewJSONWriter(ctx, tempFile)
28-
a.NoError(err)
30+
r.NoError(err)
2931

30-
err = jw.Write([]metrics.MeasurementEnvelope{msg})
32+
err = jw.Write(msg)
3133
a.NoError(err, "write successful")
32-
err = jw.Write([]metrics.MeasurementEnvelope{})
33-
a.NoError(err, "empty write successful")
34+
err = jw.Write(metrics.MeasurementEnvelope{})
35+
r.NoError(err, "empty write successful")
3436

3537
cancel()
36-
err = jw.Write([]metrics.MeasurementEnvelope{msg})
38+
err = jw.Write(msg)
3739
a.Error(err, "context canceled")
3840

3941
// Read the contents of the file
4042
var data map[string]any
4143
file, err := os.ReadFile(tempFile)
42-
a.NoError(err)
44+
r.NoError(err)
4345
err = json.Unmarshal(file, &data)
44-
a.NoError(err)
46+
r.NoError(err)
4547
a.Equal(msg.MetricName, data["metric"])
4648
a.Equal(len(msg.Data), len(data["data"].([]any)))
4749
a.Equal(msg.DBName, data["dbname"])

internal/sinks/multiwriter.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
// Writer is an interface that writes metrics values
1414
type Writer interface {
1515
SyncMetric(dbUnique, metricName, op string) error
16-
Write(msgs []metrics.MeasurementEnvelope) error
16+
Write(msgs metrics.MeasurementEnvelope) error
1717
}
1818

1919
// MultiWriter ensures the simultaneous storage of data in several storages.
@@ -69,9 +69,9 @@ func (mw *MultiWriter) SyncMetric(dbUnique, metricName, op string) (err error) {
6969
return
7070
}
7171

72-
func (mw *MultiWriter) Write(msgs []metrics.MeasurementEnvelope) (err error) {
72+
func (mw *MultiWriter) Write(msg metrics.MeasurementEnvelope) (err error) {
7373
for _, w := range mw.writers {
74-
err = errors.Join(err, w.Write(msgs))
74+
err = errors.Join(err, w.Write(msg))
7575
}
7676
return
7777
}

internal/sinks/multiwriter_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ func (mw *MockWriter) SyncMetric(_, _, _ string) error {
1414
return nil
1515
}
1616

17-
func (mw *MockWriter) Write(_ []metrics.MeasurementEnvelope) error {
17+
func (mw *MockWriter) Write(_ metrics.MeasurementEnvelope) error {
1818
return nil
1919
}
2020

@@ -82,6 +82,6 @@ func TestWriteMeasurements(t *testing.T) {
8282
mw := &MultiWriter{}
8383
mockWriter := &MockWriter{}
8484
mw.AddWriter(mockWriter)
85-
err := mw.Write([]metrics.MeasurementEnvelope{{}})
85+
err := mw.Write(metrics.MeasurementEnvelope{})
8686
assert.NoError(t, err)
8787
}

0 commit comments

Comments
 (0)