66 "encoding/json"
77 "errors"
88 "fmt"
9+ "maps"
910 "strings"
1011 "time"
1112
@@ -22,24 +23,23 @@ var (
2223 deleterDelay = time .Hour
2324)
2425
25- func NewPostgresWriter (ctx context.Context , connstr string , opts * CmdOpts , metricDefs * metrics. Metrics ) (pgw * PostgresWriter , err error ) {
26+ func NewPostgresWriter (ctx context.Context , connstr string , opts * CmdOpts ) (pgw * PostgresWriter , err error ) {
2627 var conn db.PgxPoolIface
2728 if conn , err = db .New (ctx , connstr ); err != nil {
2829 return
2930 }
30- return NewWriterFromPostgresConn (ctx , conn , opts , metricDefs )
31+ return NewWriterFromPostgresConn (ctx , conn , opts )
3132}
3233
33- func NewWriterFromPostgresConn (ctx context.Context , conn db.PgxPoolIface , opts * CmdOpts , metricDefs * metrics. Metrics ) (pgw * PostgresWriter , err error ) {
34+ func NewWriterFromPostgresConn (ctx context.Context , conn db.PgxPoolIface , opts * CmdOpts ) (pgw * PostgresWriter , err error ) {
3435 l := log .GetLogger (ctx ).WithField ("sink" , "postgres" ).WithField ("db" , conn .Config ().ConnConfig .Database )
3536 ctx = log .WithLogger (ctx , l )
3637 pgw = & PostgresWriter {
37- ctx : ctx ,
38- metricDefs : metricDefs ,
39- opts : opts ,
40- input : make (chan metrics.MeasurementEnvelope , cacheLimit ),
41- lastError : make (chan error ),
42- sinkDb : conn ,
38+ ctx : ctx ,
39+ opts : opts ,
40+ input : make (chan metrics.MeasurementEnvelope , cacheLimit ),
41+ lastError : make (chan error ),
42+ sinkDb : conn ,
4343 }
4444 if err = db .Init (ctx , pgw .sinkDb , func (ctx context.Context , conn db.PgxIface ) error {
4545 l .Info ("initialising measurements database..." )
@@ -106,7 +106,6 @@ type PostgresWriter struct {
106106 ctx context.Context
107107 sinkDb db.PgxPoolIface
108108 metricSchema DbStorageSchemaType
109- metricDefs * metrics.Metrics
110109 opts * CmdOpts
111110 input chan metrics.MeasurementEnvelope
112111 lastError chan error
@@ -122,7 +121,7 @@ type MeasurementMessagePostgres struct {
122121 DBName string
123122 Metric string
124123 Data map [string ]any
125- TagData map [string ]any
124+ TagData map [string ]string
126125}
127126
128127type DbStorageSchemaType int
@@ -239,20 +238,16 @@ func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
239238 if len (msg .Data ) == 0 {
240239 continue
241240 }
242- logger .WithField ("data" , msg .Data ).WithField ("len" , len (msg .Data )).Debug ("sending to postgres" )
243-
244241 for _ , dataRow := range msg .Data {
245242 var epochTime time.Time
246243
247- tags := make (map [string ]any )
244+ tags := make (map [string ]string )
248245 fields := make (map [string ]any )
249246
250247 totalRows ++
251248
252249 if msg .CustomTags != nil {
253- for k , v := range msg .CustomTags {
254- tags [k ] = fmt .Sprintf ("%v" , v )
255- }
250+ tags = maps .Clone (msg .CustomTags )
256251 }
257252 epochTime = time .Unix (0 , metrics .Measurement (dataRow ).GetEpoch ())
258253 for k , v := range dataRow {
@@ -330,7 +325,6 @@ func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
330325 }
331326
332327 // send data to PG, with a separate COPY for all metrics
333- logger .Debugf ("COPY-ing %d metrics to Postgres metricsDB..." , rowsBatched )
334328 t1 := time .Now ()
335329
336330 for metricName , metrics := range metricsToStorePerMetric {
@@ -507,7 +501,6 @@ func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) {
507501 logger .Infof ("Dropping %d old metric partitions one by one..." , len (partsToDrop ))
508502 for _ , toDrop := range partsToDrop {
509503 sqlDropTable := `DROP TABLE IF EXISTS ` + toDrop
510- logger .Debugf ("Dropping old metric data partition: %s" , toDrop )
511504
512505 if _ , err := pgw .sinkDb .Exec (pgw .ctx , sqlDropTable ); err != nil {
513506 logger .Errorf ("Failed to drop old partition %s from Postgres metrics DB: %w" , toDrop , err )
0 commit comments