@@ -31,6 +31,8 @@ func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts, metri
3131}
3232
3333func NewWriterFromPostgresConn (ctx context.Context , conn db.PgxPoolIface , opts * CmdOpts , metricDefs * metrics.Metrics ) (pgw * PostgresWriter , err error ) {
34+ l := log .GetLogger (ctx ).WithField ("sink" , "postgres" ).WithField ("db" , conn .Config ().ConnConfig .Database )
35+ ctx = log .WithLogger (ctx , l )
3436 pgw = & PostgresWriter {
3537 ctx : ctx ,
3638 metricDefs : metricDefs ,
@@ -40,7 +42,7 @@ func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *
4042 sinkDb : conn ,
4143 }
4244 if err = db .Init (ctx , pgw .sinkDb , func (ctx context.Context , conn db.PgxIface ) error {
43- log . GetLogger ( ctx ). Info ("Initialising measurements database..." )
45+ l . Info ("initialising measurements database..." )
4446 exists , err := db .DoesSchemaExist (ctx , conn , "admin" )
4547 if err != nil || exists {
4648 return err
@@ -63,6 +65,7 @@ func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *
6365 go pgw .deleteOldPartitions (deleterDelay )
6466 go pgw .maintainUniqueSources ()
6567 go pgw .poll ()
68+ l .Info (`measurements sink is activated` )
6669 return
6770}
6871
@@ -178,7 +181,7 @@ func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error) {
178181 return
179182}
180183
181- // Write send the measurements to the cache channel
184+ // Write sends the measurements to the cache channel
182185func (pgw * PostgresWriter ) Write (msgs []metrics.MeasurementEnvelope ) error {
183186 if pgw .ctx .Err () != nil {
184187 return pgw .ctx .Err ()
@@ -232,9 +235,7 @@ func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
232235 if len (msgs ) == 0 {
233236 return
234237 }
235- logger := log .GetLogger (pgw .ctx ).
236- WithField ("sink" , "postgres" ).
237- WithField ("db" , pgw .sinkDb .Config ().ConnConfig .Database )
238+ logger := log .GetLogger (pgw .ctx )
238239 tsWarningPrinted := false
239240 metricsToStorePerMetric := make (map [string ][]MeasurementMessagePostgres )
240241 rowsBatched := 0
@@ -247,7 +248,7 @@ func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
247248 if len (msg .Data ) == 0 {
248249 continue
249250 }
250- logger .WithField ("data" , msg .Data ).WithField ("len" , len (msg .Data )).Debug ("Sending To Postgres " )
251+ logger .WithField ("data" , msg .Data ).WithField ("len" , len (msg .Data )).Debug ("sending to postgres " )
251252
252253 for _ , dataRow := range msg .Data {
253254 var epochTime time.Time
@@ -337,7 +338,7 @@ func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
337338 } else if pgw .metricSchema == DbStorageSchemaTimescale {
338339 err = pgw .EnsureMetricTimescale (pgPartBounds , forceRecreatePGMetricPartitions )
339340 } else {
340- logger .Fatal ("should never happen ..." )
341+ logger .Fatal ("unknown storage schema ..." )
341342 }
342343 if forceRecreatePGMetricPartitions {
343344 forceRecreatePGMetricPartitions = false
0 commit comments