Skip to content

Commit 6d7b398

Browse files
authored
feat(postgresql): support PostgreSQL 17+ metrics compatibility (#1404)
* feat(postgresql): support PostgreSQL 17+ metrics compatibility * feat(postgresql): optimize
1 parent 7369825 commit 6d7b398

File tree

2 files changed

+236
-29
lines changed

2 files changed

+236
-29
lines changed

inputs/postgresql/README.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,41 @@ address = ""
3636
## 显式指定采集哪些db
3737
# databases = ["app_production", "testing"]
3838

39+
## Whether to collect statement-level metrics.
40+
## Requires extension pg_stat_statements enabled, see https://www.postgresql.org/docs/current/pgstatstatements.html
41+
# enable_statement_metrics = false
42+
43+
## Max number of statements to collect
44+
## applies only when enable_statement_metrics=true
45+
## 0 means no limit
46+
# statement_metrics_limit = 100
47+
3948
## Whether to use prepared statements when connecting to the database.
4049
## This should be set to false when connecting through a PgBouncer instance
4150
## with pool_mode set to transaction.
4251
## 是否使用prepared statements 连接数据库
4352
# prepared_statements = true
4453
```
4554
![dashboard](./postgresql.png)
55+
56+
## metrics
57+
58+
The following metrics are emitted when `enable_statement_metrics = true` (requires `pg_stat_statements`). All are cumulative counters (may reset on PostgreSQL restart or when stats are reset).
59+
60+
| Metric | Labels | Type |
61+
| --- | --- | --- |
62+
| `postgresql_statements_calls_total` | `server`, `db`, `user`, `datname`, `query` | counter |
63+
| `postgresql_statements_exec_milliseconds_total` | `server`, `db`, `user`, `datname`, `query` | counter (milliseconds) |
64+
| `postgresql_statements_rows_total` | `server`, `db`, `user`, `datname`, `query` | counter (rows) |
65+
| `postgresql_statements_block_read_milliseconds_total` | `server`, `db`, `user`, `datname`, `query` | counter (milliseconds) |
66+
| `postgresql_statements_block_write_milliseconds_total` | `server`, `db`, `user`, `datname`, `query` | counter (milliseconds) |
67+
68+
Notes:
69+
- `query` is normalized by replacing newlines/tabs with spaces.
70+
- For PostgreSQL ≥ 13, the exporter adapts to renamed columns; metric names above remain unchanged.
71+
- For PostgreSQL ≥ 17, `pg_stat_bgwriter` was split into `pg_stat_bgwriter` and `pg_stat_checkpointer`. The plugin automatically queries the new view and maps the columns to the old metric names to preserve backward compatibility.
72+
- `pg_stat_checkpointer.num_timed` -> `checkpoints_timed`
73+
- `pg_stat_checkpointer.num_requested` -> `checkpoints_req`
74+
- `pg_stat_checkpointer.write_time` -> `checkpoint_write_time`
75+
- `pg_stat_checkpointer.sync_time` -> `checkpoint_sync_time`
76+
- `pg_stat_checkpointer.buffers_written` -> `buffers_checkpoint`

inputs/postgresql/postgresql.go

Lines changed: 205 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -72,20 +72,24 @@ type MetricConfig struct {
7272
type Instance struct {
7373
config.InstanceConfig
7474

75-
Address string `toml:"address"`
76-
MaxLifetime config.Duration `toml:"max_lifetime"`
77-
IsPgBouncer bool `toml:"-"`
78-
OutputAddress string `toml:"outputaddress"`
79-
Databases []string `toml:"databases"`
80-
IgnoredDatabases []string `toml:"ignored_databases"`
81-
PreparedStatements bool `toml:"prepared_statements"`
82-
Metrics []MetricConfig `toml:"metrics"`
75+
Address string `toml:"address"`
76+
MaxLifetime config.Duration `toml:"max_lifetime"`
77+
IsPgBouncer bool `toml:"-"`
78+
OutputAddress string `toml:"outputaddress"`
79+
Databases []string `toml:"databases"`
80+
IgnoredDatabases []string `toml:"ignored_databases"`
81+
PreparedStatements bool `toml:"prepared_statements"`
82+
EnableStatementMetrics bool `toml:"enable_statement_metrics"`
83+
StatementMetricsLimit int `toml:"statement_metrics_limit"`
84+
Metrics []MetricConfig `toml:"metrics"`
85+
TrimServerTagSpace bool `toml:"trim_server_tag_space"`
8386

8487
MaxIdle int
8588
MaxOpen int
8689
db *sql.DB
8790

8891
connConfig string
92+
Version int
8993
}
9094

9195
var ignoredColumns = map[string]bool{"stats_reset": true}
@@ -196,35 +200,112 @@ func (ins *Instance) Gather(slist *types.SampleList) {
196200
}
197201

198202
for rows.Next() {
199-
err = ins.accRow(rows, slist, columns)
203+
err = ins.accRow(rows, slist, "", columns, columns, nil)
200204
if err != nil {
201205
log.Println("E! failed to get row data:", err)
202206
return
203207
}
204208
}
205209

206-
query = `SELECT * FROM pg_stat_bgwriter`
207-
208-
bgWriterRow, err := ins.db.Query(query)
209-
if err != nil {
210-
log.Println("E! failed to execute Query:", err)
211-
return
210+
// Check Postgres Version
211+
if ins.Version == 0 {
212+
var version int
213+
err = ins.db.QueryRow("SELECT current_setting('server_version_num')::int").Scan(&version)
214+
if err != nil {
215+
log.Println("E! failed to query current version:", err)
216+
return
217+
}
218+
ins.Version = version
212219
}
213220

214-
defer bgWriterRow.Close()
221+
if ins.Version < 170000 {
222+
query = `SELECT * FROM pg_stat_bgwriter`
223+
bgWriterRow, err := ins.db.Query(query)
224+
if err != nil {
225+
log.Println("E! failed to execute Query:", err)
226+
return
227+
}
228+
229+
defer bgWriterRow.Close()
215230

216-
// grab the column information from the result
217-
if columns, err = bgWriterRow.Columns(); err != nil {
218-
log.Println("E! failed to grab column info:", err)
219-
return
220-
}
231+
// grab the column information from the result
232+
if columns, err = bgWriterRow.Columns(); err != nil {
233+
log.Println("E! failed to grab column info:", err)
234+
return
235+
}
221236

222-
for bgWriterRow.Next() {
223-
err = ins.accRow(bgWriterRow, slist, columns)
237+
for bgWriterRow.Next() {
238+
err = ins.accRow(bgWriterRow, slist, "", columns, columns, nil)
239+
if err != nil {
240+
log.Println("E! failed to get row data:", err)
241+
return
242+
}
243+
}
244+
} else {
245+
// PG 17+ split pg_stat_bgwriter into pg_stat_bgwriter and pg_stat_checkpointer
246+
247+
// 1. Query pg_stat_bgwriter (remaining columns)
248+
query = `SELECT * FROM pg_stat_bgwriter`
249+
bgWriterRow, err := ins.db.Query(query)
250+
if err != nil {
251+
log.Println("E! failed to execute Query pg_stat_bgwriter:", err)
252+
return
253+
}
254+
defer bgWriterRow.Close()
255+
256+
if columns, err = bgWriterRow.Columns(); err != nil {
257+
log.Println("E! failed to grab column info for pg_stat_bgwriter:", err)
258+
return
259+
}
260+
261+
for bgWriterRow.Next() {
262+
err = ins.accRow(bgWriterRow, slist, "", columns, columns, nil)
263+
if err != nil {
264+
log.Println("E! failed to get row data from pg_stat_bgwriter:", err)
265+
return
266+
}
267+
}
268+
269+
// 2. Query pg_stat_checkpointer (moved columns, aliased to old names for compatibility)
270+
// num_timed -> checkpoints_timed
271+
// num_requested -> checkpoints_req
272+
// write_time -> checkpoint_write_time
273+
// sync_time -> checkpoint_sync_time
274+
// buffers_written -> buffers_checkpoint
275+
query = `SELECT
276+
num_timed AS checkpoints_timed,
277+
num_requested AS checkpoints_req,
278+
write_time AS checkpoint_write_time,
279+
sync_time AS checkpoint_sync_time,
280+
buffers_written AS buffers_checkpoint,
281+
restartpoints_timed,
282+
restartpoints_req,
283+
restartpoints_done
284+
FROM pg_stat_checkpointer`
285+
286+
checkpointerRow, err := ins.db.Query(query)
224287
if err != nil {
225288
log.Println("E! failed to get row data:", err)
226289
return
227290
}
291+
defer checkpointerRow.Close()
292+
293+
if columns, err = checkpointerRow.Columns(); err != nil {
294+
log.Println("E! failed to grab column info for pg_stat_checkpointer:", err)
295+
return
296+
}
297+
298+
for checkpointerRow.Next() {
299+
err = ins.accRow(checkpointerRow, slist, "", columns, columns, nil)
300+
if err != nil {
301+
log.Println("E! failed to get row data from pg_stat_checkpointer:", err)
302+
return
303+
}
304+
}
305+
}
306+
307+
if ins.EnableStatementMetrics {
308+
ins.getStatementMetrics(slist, ins.Version)
228309
}
229310

230311
waitMetrics := new(sync.WaitGroup)
@@ -239,6 +320,84 @@ func (ins *Instance) Gather(slist *types.SampleList) {
239320
waitMetrics.Wait()
240321
}
241322

323+
func (ins *Instance) getStatementMetrics(slist *types.SampleList, version int) {
324+
var query string
325+
326+
limit := func() string {
327+
if ins.StatementMetricsLimit > 0 {
328+
return fmt.Sprintf(" LIMIT %d", ins.StatementMetricsLimit)
329+
}
330+
return ""
331+
}()
332+
333+
query = `SELECT
334+
pg_get_userbyid(userid) as user,
335+
pg_database.datname,
336+
regexp_replace(pg_stat_statements.query, E'[\\r\\n\\t]+', ' ', 'g') AS query,
337+
pg_stat_statements.calls as calls_total,
338+
pg_stat_statements.total_time as exec_milliseconds_total,
339+
pg_stat_statements.rows as rows_total,
340+
pg_stat_statements.blk_read_time as block_read_milliseconds_total,
341+
pg_stat_statements.blk_write_time as block_write_milliseconds_total
342+
FROM pg_stat_statements
343+
JOIN pg_database
344+
ON pg_database.oid = pg_stat_statements.dbid
345+
ORDER BY exec_milliseconds_total DESC`
346+
347+
if version >= 170000 {
348+
query = `SELECT
349+
pg_get_userbyid(userid) as user,
350+
pg_database.datname,
351+
regexp_replace(pg_stat_statements.query, E'[\\r\\n\\t]+', ' ', 'g') AS query,
352+
pg_stat_statements.calls as calls_total,
353+
pg_stat_statements.total_exec_time as exec_milliseconds_total,
354+
pg_stat_statements.rows as rows_total,
355+
pg_stat_statements.shared_blk_read_time as block_read_milliseconds_total,
356+
pg_stat_statements.shared_blk_write_time as block_write_milliseconds_total
357+
FROM pg_stat_statements
358+
JOIN pg_database
359+
ON pg_database.oid = pg_stat_statements.dbid
360+
ORDER BY exec_milliseconds_total DESC`
361+
} else if version >= 130000 {
362+
query = `SELECT
363+
pg_get_userbyid(userid) as user,
364+
pg_database.datname,
365+
regexp_replace(pg_stat_statements.query, E'[\\r\\n\\t]+', ' ', 'g') AS query,
366+
pg_stat_statements.calls as calls_total,
367+
pg_stat_statements.total_exec_time as exec_milliseconds_total,
368+
pg_stat_statements.rows as rows_total,
369+
pg_stat_statements.blk_read_time as block_read_milliseconds_total,
370+
pg_stat_statements.blk_write_time as block_write_milliseconds_total
371+
FROM pg_stat_statements
372+
JOIN pg_database
373+
ON pg_database.oid = pg_stat_statements.dbid
374+
ORDER BY exec_milliseconds_total DESC`
375+
}
376+
377+
statements, err := ins.db.Query(query + limit)
378+
if err != nil {
379+
log.Println("E! failed to query stat statements:", err.Error())
380+
return
381+
}
382+
defer statements.Close()
383+
384+
columns, err := statements.Columns()
385+
if err != nil {
386+
log.Println("E! failed to grab column info:", err.Error())
387+
return
388+
}
389+
390+
labelColumns := []string{"user", "query"}
391+
valueColumns := []string{"calls_total", "exec_milliseconds_total", "rows_total", "block_read_milliseconds_total", "block_write_milliseconds_total"}
392+
for statements.Next() {
393+
err := ins.accRow(statements, slist, "statements", columns, valueColumns, labelColumns)
394+
if err != nil {
395+
log.Println("E! failed to get row data:", err.Error())
396+
return
397+
}
398+
}
399+
}
400+
242401
func (ins *Instance) scrapeMetric(waitMetrics *sync.WaitGroup, slist *types.SampleList, metricConf MetricConfig, tags map[string]string) {
243402
defer waitMetrics.Done()
244403

@@ -346,7 +505,7 @@ type scanner interface {
346505
Scan(dest ...interface{}) error
347506
}
348507

349-
func (ins *Instance) accRow(row scanner, slist *types.SampleList, columns []string) error {
508+
func (ins *Instance) accRow(row scanner, slist *types.SampleList, prefix string, columns, valueColumns, labelColumns []string) error {
350509
var columnVars []interface{}
351510
var dbname bytes.Buffer
352511

@@ -398,11 +557,25 @@ func (ins *Instance) accRow(row scanner, slist *types.SampleList, columns []stri
398557

399558
tags := map[string]string{"server": tagAddress, "db": dbname.String()}
400559

560+
for _, labelKey := range labelColumns {
561+
if columnMap[labelKey] != nil && !ignoredColumns[labelKey] {
562+
tags[labelKey] = fmt.Sprint(*columnMap[labelKey])
563+
}
564+
}
565+
401566
fields := make(map[string]interface{})
402-
for col, val := range columnMap {
403-
_, ignore := ignoredColumns[col]
404-
if !ignore {
405-
fields[col] = *val
567+
568+
for _, valueKey := range valueColumns {
569+
if columnMap[valueKey] != nil && !ignoredColumns[valueKey] {
570+
metricsName := valueKey
571+
if prefix != "" {
572+
if strings.HasPrefix(valueKey, "_") {
573+
metricsName = prefix + valueKey
574+
} else {
575+
metricsName = prefix + "_" + valueKey
576+
}
577+
}
578+
fields[metricsName] = *columnMap[valueKey]
406579
}
407580
}
408581
// acc.AddFields("postgresql", fields, tags)
@@ -480,6 +653,9 @@ func (ins *Instance) SanitizedAddress() (sanitizedAddress string, err error) {
480653
}
481654

482655
sanitizedAddress = kvMatcher.ReplaceAllString(canonicalizedAddress, "")
656+
if ins.TrimServerTagSpace {
657+
sanitizedAddress = strings.TrimSpace(sanitizedAddress)
658+
}
483659

484660
return sanitizedAddress, err
485-
}
661+
}

0 commit comments

Comments
 (0)