Skip to content

Commit 080d585

Browse files
Add installed_extensions prometheus metric (#9608)
and add /metrics endpoint to compute_ctl to expose such metrics metric format example for extension pg_rag with versions 1.2.3 and 1.4.2 installed in 3 and 1 databases respectively: neon_extensions_installed{extension="pg_rag", version="1.2.3"} = 3 neon_extensions_installed{extension="pg_rag", version="1.4.2"} = 1 ------ infra part: https://github.com/neondatabase/flux-fleet/pull/251 --------- Co-authored-by: Tristan Partin <[email protected]>
1 parent 7595d3a commit 080d585

File tree

7 files changed

+137
-4
lines changed

7 files changed

+137
-4
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

compute_tools/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ clap.workspace = true
1818
flate2.workspace = true
1919
futures.workspace = true
2020
hyper0 = { workspace = true, features = ["full"] }
21+
metrics.workspace = true
2122
nix.workspace = true
2223
notify.workspace = true
2324
num_cpus.workspace = true
25+
once_cell.workspace = true
2426
opentelemetry.workspace = true
2527
opentelemetry_sdk.workspace = true
2628
postgres.workspace = true
@@ -39,6 +41,7 @@ tracing-subscriber.workspace = true
3941
tracing-utils.workspace = true
4042
thiserror.workspace = true
4143
url.workspace = true
44+
prometheus.workspace = true
4245

4346
compute_api.workspace = true
4447
utils.workspace = true

compute_tools/src/http/api.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::catalog::SchemaDumpError;
99
use crate::catalog::{get_database_schema, get_dbs_and_roles};
1010
use crate::compute::forward_termination_signal;
1111
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
12+
use crate::installed_extensions;
1213
use compute_api::requests::{ConfigurationRequest, ExtensionInstallRequest, SetRoleGrantsRequest};
1314
use compute_api::responses::{
1415
ComputeStatus, ComputeStatusResponse, ExtensionInstallResult, GenericAPIError,
@@ -19,6 +20,8 @@ use anyhow::Result;
1920
use hyper::header::CONTENT_TYPE;
2021
use hyper::service::{make_service_fn, service_fn};
2122
use hyper::{Body, Method, Request, Response, Server, StatusCode};
23+
use metrics::Encoder;
24+
use metrics::TextEncoder;
2225
use tokio::task;
2326
use tracing::{debug, error, info, warn};
2427
use tracing_utils::http::OtelName;
@@ -65,6 +68,28 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
6568
Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
6669
}
6770

71+
// Prometheus metrics
72+
(&Method::GET, "/metrics") => {
73+
debug!("serving /metrics GET request");
74+
75+
let mut buffer = vec![];
76+
let metrics = installed_extensions::collect();
77+
let encoder = TextEncoder::new();
78+
encoder.encode(&metrics, &mut buffer).unwrap();
79+
80+
match Response::builder()
81+
.status(StatusCode::OK)
82+
.header(CONTENT_TYPE, encoder.format_type())
83+
.body(Body::from(buffer))
84+
{
85+
Ok(response) => response,
86+
Err(err) => {
87+
let msg = format!("error handling /metrics request: {err}");
88+
error!(msg);
89+
render_json_error(&msg, StatusCode::INTERNAL_SERVER_ERROR)
90+
}
91+
}
92+
}
6893
// Collect Postgres current usage insights
6994
(&Method::GET, "/insights") => {
7095
info!("serving /insights GET request");

compute_tools/src/http/openapi_spec.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,21 @@ paths:
3737
schema:
3838
$ref: "#/components/schemas/ComputeMetrics"
3939

40+
/metrics
41+
get:
42+
tags:
43+
- Info
44+
summary: Get compute node metrics in text format.
45+
description: ""
46+
operationId: getComputeMetrics
47+
responses:
48+
200:
49+
description: ComputeMetrics
50+
content:
51+
text/plain:
52+
schema:
53+
type: string
54+
description: Metrics in text format.
4055
/insights:
4156
get:
4257
tags:

compute_tools/src/installed_extensions.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use compute_api::responses::{InstalledExtension, InstalledExtensions};
2+
use metrics::proto::MetricFamily;
23
use std::collections::HashMap;
34
use std::collections::HashSet;
45
use tracing::info;
@@ -8,6 +9,10 @@ use anyhow::Result;
89
use postgres::{Client, NoTls};
910
use tokio::task;
1011

12+
use metrics::core::Collector;
13+
use metrics::{register_uint_gauge_vec, UIntGaugeVec};
14+
use once_cell::sync::Lazy;
15+
1116
/// We don't reuse get_existing_dbs() just for code clarity
1217
/// and to make database listing query here more explicit.
1318
///
@@ -59,6 +64,12 @@ pub async fn get_installed_extensions(connstr: Url) -> Result<InstalledExtension
5964

6065
for (extname, v) in extensions.iter() {
6166
let version = v.to_string();
67+
68+
// increment the number of databases where the version of extension is installed
69+
INSTALLED_EXTENSIONS
70+
.with_label_values(&[extname, &version])
71+
.inc();
72+
6273
extensions_map
6374
.entry(extname.to_string())
6475
.and_modify(|e| {
@@ -74,9 +85,11 @@ pub async fn get_installed_extensions(connstr: Url) -> Result<InstalledExtension
7485
}
7586
}
7687

77-
Ok(InstalledExtensions {
88+
let res = InstalledExtensions {
7889
extensions: extensions_map.values().cloned().collect(),
79-
})
90+
};
91+
92+
Ok(res)
8093
})
8194
.await?
8295
}
@@ -97,6 +110,18 @@ pub fn get_installed_extensions_sync(connstr: Url) -> Result<()> {
97110
"[NEON_EXT_STAT] {}",
98111
serde_json::to_string(&result).expect("failed to serialize extensions list")
99112
);
100-
101113
Ok(())
102114
}
115+
116+
static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
117+
register_uint_gauge_vec!(
118+
"installed_extensions",
119+
"Number of databases where the version of extension is installed",
120+
&["extension_name", "version"]
121+
)
122+
.expect("failed to define a metric")
123+
});
124+
125+
pub fn collect() -> Vec<MetricFamily> {
126+
INSTALLED_EXTENSIONS.collect()
127+
}

test_runner/fixtures/endpoint/http.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,8 @@ def set_role_grants(self, database: str, role: str, schema: str, privileges: lis
4646
)
4747
res.raise_for_status()
4848
return res.json()
49+
50+
def metrics(self) -> str:
51+
res = self.get(f"http://localhost:{self.port}/metrics")
52+
res.raise_for_status()
53+
return res.text

test_runner/regress/test_installed_extensions.py

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
1+
from __future__ import annotations
2+
3+
import time
14
from logging import info
5+
from typing import TYPE_CHECKING
6+
7+
from fixtures.log_helper import log
8+
from fixtures.metrics import parse_metrics
29

3-
from fixtures.neon_fixtures import NeonEnv
10+
if TYPE_CHECKING:
11+
from fixtures.neon_fixtures import NeonEnv
412

513

614
def test_installed_extensions(neon_simple_env: NeonEnv):
@@ -85,3 +93,52 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
8593
assert ext["n_databases"] == 2
8694
ext["versions"].sort()
8795
assert ext["versions"] == ["1.2", "1.3"]
96+
97+
# check that /metrics endpoint is available
98+
# ensure that we see the metric before and after restart
99+
res = client.metrics()
100+
info("Metrics: %s", res)
101+
m = parse_metrics(res)
102+
neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "version": "1.2"})
103+
assert len(neon_m) == 1
104+
for sample in neon_m:
105+
assert sample.value == 2
106+
neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "version": "1.3"})
107+
assert len(neon_m) == 1
108+
for sample in neon_m:
109+
assert sample.value == 1
110+
111+
endpoint.stop()
112+
endpoint.start()
113+
114+
timeout = 10
115+
while timeout > 0:
116+
try:
117+
res = client.metrics()
118+
timeout = -1
119+
if len(parse_metrics(res).query_all("installed_extensions")) < 4:
120+
# Assume that not all metrics that are collected yet
121+
time.sleep(1)
122+
timeout -= 1
123+
continue
124+
except Exception:
125+
log.exception("failed to get metrics, assume they are not collected yet")
126+
time.sleep(1)
127+
timeout -= 1
128+
continue
129+
130+
assert (
131+
len(parse_metrics(res).query_all("installed_extensions")) >= 4
132+
), "Not all metrics are collected"
133+
134+
info("After restart metrics: %s", res)
135+
m = parse_metrics(res)
136+
neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "version": "1.2"})
137+
assert len(neon_m) == 1
138+
for sample in neon_m:
139+
assert sample.value == 1
140+
141+
neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "version": "1.3"})
142+
assert len(neon_m) == 1
143+
for sample in neon_m:
144+
assert sample.value == 1

0 commit comments

Comments
 (0)