Skip to content
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ jobs:
- name: Test
run: |
go generate ./api/pb/
go test -failfast -v -timeout=300s -p 1 -coverprofile=profile.cov ./cmd/... ./internal/...
go test -failfast -v -timeout=600s -p 1 -coverprofile=profile.cov ./cmd/... ./internal/...

- name: Coveralls
uses: coverallsapp/github-action@v2
Expand Down
90 changes: 24 additions & 66 deletions internal/sinks/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,95 +509,53 @@ func (pgw *PostgresWriter) DeleteOldPartitions() {
func (pgw *PostgresWriter) MaintainUniqueSources() {
logger := log.GetLogger(pgw.ctx)

sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint
sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
sqlDistinct := `
WITH RECURSIVE t(dbname) AS (
SELECT MIN(dbname) AS dbname FROM %s
UNION
SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t
)
SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
sqlDroppedTables := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric != ALL($1)`
sqlAdd := `
INSERT INTO admin.all_distinct_dbname_metrics
SELECT u, $2 FROM (select unnest($1::text[]) as u) x
WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
RETURNING *`

var lock bool
logger.Infof("Trying to get admin.all_distinct_dbname_metrics maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
logger.Error("Getting admin.all_distinct_dbname_metrics maintainer advisory lock failed:", err)
logger.Infof("Trying to get maintenance advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
if err := pgw.sinkDb.QueryRow(pgw.ctx, "SELECT admin.try_get_maintenance_lock();").Scan(&lock); err != nil {
logger.Error("Getting maintenance advisory lock failed:", err)
return
}
if !lock {
logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
logger.Info("Skipping maintenance as another instance has the advisory lock...")
return
}

logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics)
logger.Info("Updating admin.all_distinct_dbname_metrics listing table...")
rows, _ := pgw.sinkDb.Query(pgw.ctx, "SELECT table_name FROM admin.get_top_level_metric_tables()")
allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
if err != nil {
logger.Error(err)
return
}

// updates listing table entries for a single metric.
sqlUpdateListingTable := "SELECT * FROM admin.update_listing_table(metric_table_name => $1);"
for i, tableName := range allDistinctMetricTables {
foundDbnamesMap := make(map[string]bool)
foundDbnamesArr := make([]string, 0)

metricName := strings.Replace(tableName, "public.", "", 1)
// later usage in sqlDroppedTables requires no "public." prefix
allDistinctMetricTables[i] = metricName

logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
allDistinctMetricTables[i] = metricName // later usage in sqlDroppedTables requires no "public." prefix.
logger.Debugf("Updating admin.all_distinct_dbname_metrics listing for metric: %s", metricName)
var deletedRowsCnt, insertedRowsCnt int
err = pgw.sinkDb.QueryRow(pgw.ctx, sqlUpdateListingTable, tableName).Scan(&deletedRowsCnt, &insertedRowsCnt)
if err != nil {
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
continue
logger.Errorf("Could not update admin.all_distinct_dbname_metrics listing for metric '%s': %s", metricName, err)
}
for _, drDbname := range ret {
foundDbnamesMap[drDbname] = true // "set" behaviour, don't want duplicates
if deletedRowsCnt > 0 {
logger.Infof("Removed %d stale entries from admin.all_distinct_dbname_metrics listing table for metric: %s", deletedRowsCnt, metricName)
}

// delete all that are not known and add all that are not there
for k := range foundDbnamesMap {
foundDbnamesArr = append(foundDbnamesArr, k)
}
if len(foundDbnamesArr) == 0 { // delete all entries for given metric
logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)

_, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName)
if err != nil {
logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
}
continue
}
cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName)
if err != nil {
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
} else if cmdTag.RowsAffected() > 0 {
logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
}
cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName)
if err != nil {
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
} else if cmdTag.RowsAffected() > 0 {
logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
if insertedRowsCnt > 0 {
logger.Infof("Added %d entries to admin.all_distinct_dbname_metrics listing table for metric: %s", insertedRowsCnt, metricName)
}
time.Sleep(time.Minute)
}

cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDroppedTables, allDistinctMetricTables)
// removes all entries for any non-existing table.
sqlRemoveDroppedTables := "SELECT admin.remove_dropped_tables_listing(existing_metrics => $1)"
var deletedRowsCnt int
err = pgw.sinkDb.QueryRow(pgw.ctx, sqlRemoveDroppedTables, allDistinctMetricTables).Scan(&deletedRowsCnt)
if err != nil {
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for dropped metric tables: %s", err)
} else if cmdTag.RowsAffected() > 0 {
logger.Infof("Removed %d stale entries for dropped tables from all_distinct_dbname_metrics listing table", cmdTag.RowsAffected())
logger.Errorf("Could not update admin.all_distinct_dbname_metrics listing table for dropped metric tables: %s", err)
} else if deletedRowsCnt > 0 {
logger.Infof("Removed %d stale entries for dropped metric tables from admin.all_distinct_dbname_metrics listing table", deletedRowsCnt)
}
}

Expand Down
35 changes: 24 additions & 11 deletions internal/sinks/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,20 +668,26 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) {
pgw, err := NewPostgresWriter(ctx, connStr, opts)
r.NoError(err)

// Drop tables for all builtin metrics to avoid timeout in
// in `MaintainUniqueSources` test.
for _, metric := range metrics.GetDefaultBuiltInMetrics() {
_, err = pgw.sinkDb.Exec(pgw.ctx, fmt.Sprintf("DROP TABLE %s;", metric))
a.NoError(err)
}

t.Run("MaintainUniqueSources", func(_ *testing.T) {
// adds an entry to `admin.all_distinct_dbname_metrics`
// creates an empty metric table and adds
// an entry to `admin.all_distinct_dbname_metrics`
err = pgw.SyncMetric("test", "test_metric_1", AddOp)
r.NoError(err)

var numOfEntries int
err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries)
a.NoError(err)
a.Equal(1, numOfEntries)

// manually call the maintenance routine
pgw.MaintainUniqueSources()

// entry should have been deleted, because it has no corresponding entries in `test_metric_1` table.
// entry should have been deleted, because there is now rows in `test_metric_1` table.
err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries)
a.NoError(err)
a.Equal(0, numOfEntries)
Expand All @@ -696,19 +702,26 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) {
},
}
pgw.flush(message)

// manually call the maintenance routine
pgw.MaintainUniqueSources()

// entry should have been added, because there is a corresponding entry in `test_metric_1` table just written.
// an entry should have been added, because there is a corresponding entry in `test_metric_1` table just written.
err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries)
a.NoError(err)
a.Equal(1, numOfEntries)

message[0].DBName = "test_db_2"
pgw.flush(message)
// explicitly use `public.*` prefix.
_, err = pgw.sinkDb.Exec(pgw.ctx, "SELECT admin.update_listing_table(metric_table_name => 'public.test_metric_1');")
a.NoError(err)
// another entry should have been added.
err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries)
a.NoError(err)
a.Equal(2, numOfEntries)

_, err = conn.Exec(ctx, "DROP TABLE test_metric_1;")
r.NoError(err)

// the corresponding entry should be deleted
// all entries should be deleted
pgw.MaintainUniqueSources()
err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries)
a.NoError(err)
Expand All @@ -731,8 +744,8 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) {
_, err = conn.Exec(ctx,
fmt.Sprintf(
`CREATE TABLE subpartitions.test_metric_2_dbname_time
PARTITION OF subpartitions.test_metric_2_dbname
FOR VALUES FROM ('%s') TO ('%s')`,
PARTITION OF subpartitions.test_metric_2_dbname
FOR VALUES FROM ('%s') TO ('%s')`,
boundStart, boundEnd),
)
a.NoError(err)
Expand Down
74 changes: 74 additions & 0 deletions internal/sinks/sql/admin_functions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,77 @@ BEGIN
RETURN i;
END;
$SQL$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION admin.try_get_maintenance_lock(
OUT have_lock BOOLEAN
)
AS
$SQL$
BEGIN
-- 1571543679778230000 is just a random bigint
SELECT pg_try_advisory_lock(1571543679778230000) INTO have_lock;
END;
$SQL$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION admin.update_listing_table(
metric_table_name text,
OUT deleted_rows_cnt int,
OUT inserted_rows_cnt int
)
AS
$SQL$
DECLARE
metric_name text;
BEGIN

IF POSITION('public.' IN metric_table_name) > 0 THEN
metric_name = SUBSTRING(metric_table_name FROM POSITION('public.' IN metric_table_name) + LENGTH('public.'));
ELSE
metric_name = metric_table_name;
END IF;

EXECUTE FORMAT(
$$
CREATE TEMP TABLE distinct_dbnames AS
WITH RECURSIVE t(dbname) AS (
SELECT MIN(dbname) AS dbname FROM public.%I
UNION
SELECT (SELECT MIN(dbname) FROM public.%I WHERE dbname > t.dbname) FROM t
)
SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1;
$$, metric_name, metric_name);

EXECUTE FORMAT(
$$
DELETE FROM admin.all_distinct_dbname_metrics
WHERE dbname NOT IN (SELECT * FROM distinct_dbnames)
AND metric = '%s';
$$, metric_name);

GET DIAGNOSTICS deleted_rows_cnt = ROW_COUNT;

EXECUTE FORMAT(
$$
INSERT INTO admin.all_distinct_dbname_metrics
SELECT d.dbname, '%s' FROM distinct_dbnames AS d
WHERE NOT EXISTS (SELECT * FROM admin.all_distinct_dbname_metrics WHERE dbname = d.dbname AND metric = '%s');
$$, metric_name, metric_name);

GET DIAGNOSTICS inserted_rows_cnt = ROW_COUNT;

DROP TABLE distinct_dbnames;

END;
$SQL$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION admin.remove_dropped_tables_listing(
existing_metrics text[],
OUT deleted_rows_cnt int
)
AS
$SQL$
BEGIN
DELETE FROM admin.all_distinct_dbname_metrics WHERE metric != ALL(existing_metrics);
GET DIAGNOSTICS deleted_rows_cnt = ROW_COUNT;
END;
$SQL$ LANGUAGE plpgsql;
Loading