Skip to content

Commit f3b99e2

Browse files
authored
[-] fix PostgresWriter.EnsureMetricTime(), fixes #622 (#624)
1 parent 6665503 commit f3b99e2

File tree

3 files changed

+55
-29
lines changed

3 files changed

+55
-29
lines changed

internal/sinks/postgres.go

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,10 @@ const (
150150
const specialMetricPgbouncer = "^pgbouncer_(stats|pools)$"
151151

152152
var (
153-
regexIsPgbouncerMetrics = regexp.MustCompile(specialMetricPgbouncer)
154-
forceRecreatePGMetricPartitions = false // to signal override PG metrics storage cache
155-
partitionMapMetric = make(map[string]ExistingPartitionInfo) // metric = min/max bounds
156-
partitionMapMetricDbname = make(map[string]map[string]ExistingPartitionInfo) // metric[dbname = min/max bounds]
153+
regexIsPgbouncerMetrics = regexp.MustCompile(specialMetricPgbouncer)
154+
forceRecreatePartitions = false // to signal override PG metrics storage cache
155+
partitionMapMetric = make(map[string]ExistingPartitionInfo) // metric = min/max bounds
156+
partitionMapMetricDbname = make(map[string]map[string]ExistingPartitionInfo) // metric[dbname = min/max bounds]
157157
)
158158

159159
// SyncMetric ensures that tables exist for newly added metrics and/or sources
@@ -333,15 +333,16 @@ func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
333333
}
334334
}
335335

336-
if pgw.metricSchema == DbStorageSchemaPostgres {
337-
err = pgw.EnsureMetricDbnameTime(pgPartBoundsDbName, forceRecreatePGMetricPartitions)
338-
} else if pgw.metricSchema == DbStorageSchemaTimescale {
339-
err = pgw.EnsureMetricTimescale(pgPartBounds, forceRecreatePGMetricPartitions)
340-
} else {
336+
switch pgw.metricSchema {
337+
case DbStorageSchemaPostgres:
338+
err = pgw.EnsureMetricDbnameTime(pgPartBoundsDbName, forceRecreatePartitions)
339+
case DbStorageSchemaTimescale:
340+
err = pgw.EnsureMetricTimescale(pgPartBounds, forceRecreatePartitions)
341+
default:
341342
logger.Fatal("unknown storage schema...")
342343
}
343-
if forceRecreatePGMetricPartitions {
344-
forceRecreatePGMetricPartitions = false
344+
if forceRecreatePartitions {
345+
forceRecreatePartitions = false
345346
}
346347
if err != nil {
347348
pgw.lastError <- err
@@ -385,8 +386,8 @@ func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
385386

386387
if _, err = pgw.sinkDb.CopyFrom(context.Background(), getTargetTable(), getTargetColumns(), pgx.CopyFromRows(rows)); err != nil {
387388
l.Error(err)
388-
forceRecreatePGMetricPartitions = strings.Contains(err.Error(), "no partition")
389-
if forceRecreatePGMetricPartitions {
389+
forceRecreatePartitions = strings.Contains(err.Error(), "no partition")
390+
if forceRecreatePartitions {
390391
logger.Warning("Some metric partitions might have been removed, halting all metric storage. Trying to re-create all needed partitions on next run")
391392
}
392393
}
@@ -404,7 +405,7 @@ func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
404405
// EnsureMetricTime creates special partitions if Timescale used for realtime metrics
405406
func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error {
406407
logger := log.GetLogger(pgw.ctx)
407-
sqlEnsure := `select * from admin.ensure_partition_metric_time($1, $2)`
408+
sqlEnsure := `select part_available_from, part_available_to from admin.ensure_partition_metric_time($1, $2)`
408409
for metric, pb := range pgPartBounds {
409410
if !strings.HasSuffix(metric, "_realtime") {
410411
continue
@@ -415,17 +416,18 @@ func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPart
415416

416417
partInfo, ok := partitionMapMetric[metric]
417418
if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
418-
err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.StartTime).Scan(&partInfo)
419+
err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.StartTime).
420+
Scan(&partInfo.StartTime, &partInfo.EndTime)
419421
if err != nil {
420-
logger.Error("Failed to create partition on 'metrics':", err)
422+
logger.Error("Failed to create partition on 'metrics': ", err)
421423
return err
422424
}
423425
partitionMapMetric[metric] = partInfo
424426
}
425427
if pb.EndTime.After(partInfo.EndTime) || force {
426-
err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.EndTime).Scan(&partInfo.EndTime)
428+
err := pgw.sinkDb.QueryRow(pgw.ctx, sqlEnsure, metric, pb.EndTime).Scan(nil, &partInfo.EndTime)
427429
if err != nil {
428-
logger.Error("Failed to create partition on 'metrics':", err)
430+
logger.Error("Failed to create partition on 'metrics': ", err)
429431
return err
430432
}
431433
partitionMapMetric[metric] = partInfo

internal/sinks/postgres_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,3 +104,27 @@ func TestWrite(t *testing.T) {
104104
err = pgw.Write(messages)
105105
assert.Error(t, err, "context canceled")
106106
}
107+
108+
func TestPostgresWriter_EnsureMetricTime(t *testing.T) {
109+
conn, err := pgxmock.NewPool()
110+
assert.NoError(t, err)
111+
pgw := PostgresWriter{
112+
ctx: ctx,
113+
sinkDb: conn,
114+
}
115+
116+
TestPartBounds := map[string]ExistingPartitionInfo{"test_metric_realtime": {time.Now(), time.Now()}}
117+
conn.ExpectQuery(`select part_available_from, part_available_to`).
118+
WithArgs("test_metric_realtime", TestPartBounds["test_metric_realtime"].StartTime).
119+
WillReturnRows(pgxmock.NewRows([]string{"part_available_from", "part_available_to"}).
120+
AddRow(time.Now(), time.Now()))
121+
122+
conn.ExpectQuery(`select part_available_from, part_available_to`).
123+
WithArgs("test_metric_realtime", TestPartBounds["test_metric_realtime"].EndTime).
124+
WillReturnRows(pgxmock.NewRows([]string{"part_available_from", "part_available_to"}).
125+
AddRow(time.Now(), time.Now()))
126+
127+
err = pgw.EnsureMetricTime(TestPartBounds, true)
128+
assert.NoError(t, err)
129+
assert.NoError(t, conn.ExpectationsWereMet())
130+
}

internal/sinks/sql/ensure_partition_timescale.sql

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -137,17 +137,17 @@ BEGIN
137137
END IF;
138138
END IF;
139139

140-
IF NOT EXISTS (SELECT 1
141-
FROM pg_tables
142-
WHERE tablename = l_part_name
143-
AND schemaname = 'subpartitions')
144-
THEN
145-
--RAISE NOTICE 'creating sub-partition % ...', l_part_name;
146-
l_sql := format($$CREATE %s TABLE IF NOT EXISTS subpartitions.%s PARTITION OF public.%s FOR VALUES FROM ('%s') TO ('%s')$$,
147-
l_unlogged, quote_ident(l_part_name), quote_ident(metric), l_part_start, l_part_end);
148-
EXECUTE l_sql;
149-
EXECUTE format($$COMMENT ON TABLE subpartitions.%s IS 'pgwatch-generated-metric-time-lvl'$$, quote_ident(l_part_name));
150-
END IF;
140+
IF NOT EXISTS (SELECT 1
141+
FROM pg_tables
142+
WHERE tablename = l_part_name
143+
AND schemaname = 'subpartitions')
144+
THEN
145+
--RAISE NOTICE 'creating sub-partition % ...', l_part_name;
146+
l_sql := format($$CREATE %s TABLE IF NOT EXISTS subpartitions.%s PARTITION OF public.%s FOR VALUES FROM ('%s') TO ('%s')$$,
147+
l_unlogged, quote_ident(l_part_name), quote_ident(metric), l_part_start, l_part_end);
148+
EXECUTE l_sql;
149+
EXECUTE format($$COMMENT ON TABLE subpartitions.%s IS 'pgwatch-generated-metric-time-lvl'$$, quote_ident(l_part_name));
150+
END IF;
151151

152152
END LOOP;
153153

0 commit comments

Comments
 (0)