Skip to content

Commit 3458673

Browse files
committed
Return number of inserted/deleted rows for logging.
1 parent 40c1052 commit 3458673

File tree

2 files changed

+28
-7
lines changed

2 files changed

+28
-7
lines changed

internal/sinks/postgres.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -529,22 +529,33 @@ func (pgw *PostgresWriter) MaintainUniqueSources() {
529529
}
530530

531531
// updates listing table entries for a single metric.
532-
sqlUpdateListingTable := "SELECT admin.update_listing_table(metric_table_name => $1);"
532+
sqlUpdateListingTable := "SELECT * FROM admin.update_listing_table(metric_table_name => $1);"
533533
for i, tableName := range allDistinctMetricTables {
534534
metricName := strings.Replace(tableName, "public.", "", 1)
535535
allDistinctMetricTables[i] = metricName // later usage in sqlDroppedTables requires no "public." prefix.
536536
logger.Debugf("Updating admin.all_distinct_dbname_metrics listing for metric: %s", metricName)
537-
if _, err := pgw.sinkDb.Exec(pgw.ctx, sqlUpdateListingTable, tableName); err != nil {
537+
var deletedRowsCnt, insertedRowsCnt int;
538+
err = pgw.sinkDb.QueryRow(pgw.ctx, sqlUpdateListingTable, tableName).Scan(&deletedRowsCnt, &insertedRowsCnt);
539+
if err != nil {
538540
logger.Errorf("Could not update admin.all_distinct_dbname_metrics listing for metric '%s': %s", metricName, err)
539541
}
542+
if deletedRowsCnt > 0 {
543+
logger.Infof("Removed %d stale entries from admin.all_distinct_dbname_metrics listing table for metric: %s", deletedRowsCnt, metricName)
544+
}
545+
if insertedRowsCnt > 0 {
546+
logger.Infof("Added %d entries to admin.all_distinct_dbname_metrics listing table for metric: %s", insertedRowsCnt, metricName)
547+
}
540548
time.Sleep(time.Minute)
541549
}
542550

543551
// removes all entries for any non-existing table.
544552
sqlRemoveDroppedTables := "SELECT admin.remove_dropped_tables_listing(existing_metrics => $1)"
545-
_, err = pgw.sinkDb.Exec(pgw.ctx, sqlRemoveDroppedTables, allDistinctMetricTables)
553+
var deletedRowsCnt int;
554+
err = pgw.sinkDb.QueryRow(pgw.ctx, sqlRemoveDroppedTables, allDistinctMetricTables).Scan(&deletedRowsCnt)
546555
if err != nil {
547556
logger.Errorf("Could not update admin.all_distinct_dbname_metrics listing table for dropped metric tables: %s", err)
557+
} else if deletedRowsCnt > 0 {
558+
logger.Infof("Removed %d stale entries for dropped metric tables from admin.all_distinct_dbname_metrics listing table", deletedRowsCnt)
548559
}
549560
}
550561

internal/sinks/sql/admin_functions.sql

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,11 @@ BEGIN
151151
END;
152152
$SQL$ LANGUAGE plpgsql;
153153

154-
CREATE OR REPLACE FUNCTION admin.update_listing_table(metric_table_name text)
155-
RETURNS VOID
154+
CREATE OR REPLACE FUNCTION admin.update_listing_table(
155+
metric_table_name text,
156+
OUT deleted_rows_cnt int,
157+
OUT inserted_rows_cnt int
158+
)
156159
AS
157160
$SQL$
158161
DECLARE
@@ -183,23 +186,30 @@ BEGIN
183186
AND metric = '%s';
184187
$$, metric_name);
185188

189+
GET DIAGNOSTICS deleted_rows_cnt = ROW_COUNT;
190+
186191
EXECUTE FORMAT(
187192
$$
188193
INSERT INTO admin.all_distinct_dbname_metrics
189194
SELECT d.dbname, '%s' FROM distinct_dbnames AS d
190195
WHERE NOT EXISTS (SELECT * FROM admin.all_distinct_dbname_metrics WHERE dbname = d.dbname AND metric = '%s');
191196
$$, metric_name, metric_name);
192197

198+
GET DIAGNOSTICS inserted_rows_cnt = ROW_COUNT;
199+
193200
DROP TABLE distinct_dbnames;
194201

195202
END;
196203
$SQL$ LANGUAGE plpgsql;
197204

198-
CREATE OR REPLACE FUNCTION admin.remove_dropped_tables_listing(existing_metrics text[])
199-
RETURNS VOID
205+
CREATE OR REPLACE FUNCTION admin.remove_dropped_tables_listing(
206+
existing_metrics text[],
207+
OUT deleted_rows_cnt int
208+
)
200209
AS
201210
$SQL$
202211
BEGIN
203212
DELETE FROM admin.all_distinct_dbname_metrics WHERE metric != ALL(existing_metrics);
213+
GET DIAGNOSTICS deleted_rows_cnt = ROW_COUNT;
204214
END;
205215
$SQL$ LANGUAGE plpgsql;

0 commit comments

Comments
 (0)