Skip to content

Commit 543980f

Browse files
feat: local prometheus sdk metrics
1 parent 278cffe commit 543980f

File tree

34 files changed

+519
-33
lines changed

34 files changed

+519
-33
lines changed

confidence-resolver/src/telemetry.rs

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use core::fmt;
12
use std::sync::atomic::{AtomicU64, Ordering};
23
use std::sync::Arc;
34

@@ -117,6 +118,93 @@ pub struct HistogramSnapshot {
117118
pub buckets: Vec<u64>,
118119
}
119120

121+
impl TelemetrySnapshot {
122+
/// Format the snapshot as Prometheus exposition text.
123+
///
124+
/// All values are cumulative counters, matching what Prometheus expects.
125+
/// The histogram uses cumulative `le` buckets derived from the exponential
126+
/// bucket boundaries.
127+
///
128+
/// `instance` is included as an `instance="..."` label on every metric,
129+
/// allowing outputs from multiple WASM instances to be concatenated into
130+
/// a single scrape endpoint.
131+
pub fn to_prometheus(&self, instance: &str) -> String {
132+
let mut out = String::new();
133+
// fmt::Write for String is infallible; ignore the Ok.
134+
let _ = self.write_prometheus(&mut out, instance);
135+
out
136+
}
137+
138+
fn write_prometheus(&self, w: &mut dyn fmt::Write, instance: &str) -> fmt::Result {
139+
self.write_histogram(w, instance)?;
140+
self.write_resolve_rates(w, instance)
141+
}
142+
143+
fn write_histogram(&self, w: &mut dyn fmt::Write, instance: &str) -> fmt::Result {
144+
if self.latency.count == 0 {
145+
return Ok(());
146+
}
147+
148+
writeln!(
149+
w,
150+
"# TYPE confidence_resolve_latency_microseconds histogram"
151+
)?;
152+
153+
let mut cumulative: u64 = 0;
154+
for (i, &count) in self.latency.buckets.iter().enumerate() {
155+
cumulative = cumulative.wrapping_add(count);
156+
if cumulative == 0 {
157+
continue;
158+
}
159+
let upper = ((i as f64 + 1.0) * LN_RATIO).exp();
160+
writeln!(
161+
w,
162+
"confidence_resolve_latency_microseconds_bucket{{instance=\"{instance}\",le=\"{upper:.6e}\"}} {cumulative}"
163+
)?;
164+
}
165+
writeln!(
166+
w,
167+
"confidence_resolve_latency_microseconds_bucket{{instance=\"{instance}\",le=\"+Inf\"}} {}",
168+
self.latency.count
169+
)?;
170+
writeln!(
171+
w,
172+
"confidence_resolve_latency_microseconds_sum{{instance=\"{instance}\"}} {}",
173+
self.latency.sum
174+
)?;
175+
writeln!(
176+
w,
177+
"confidence_resolve_latency_microseconds_count{{instance=\"{instance}\"}} {}",
178+
self.latency.count
179+
)?;
180+
181+
Ok(())
182+
}
183+
184+
fn write_resolve_rates(&self, w: &mut dyn fmt::Write, instance: &str) -> fmt::Result {
185+
let has_any = self.resolve_rates.iter().any(|&c| c > 0);
186+
if !has_any {
187+
return Ok(());
188+
}
189+
190+
writeln!(w, "# TYPE confidence_resolves_total counter")?;
191+
for (i, &count) in self.resolve_rates.iter().enumerate() {
192+
if count == 0 {
193+
continue;
194+
}
195+
let label = ResolveReason::try_from(i as i32)
196+
.map(|r| r.as_str_name())
197+
.unwrap_or("UNKNOWN");
198+
writeln!(
199+
w,
200+
"confidence_resolves_total{{instance=\"{instance}\",reason=\"{label}\"}} {count}"
201+
)?;
202+
}
203+
204+
Ok(())
205+
}
206+
}
207+
120208
/// Concurrent telemetry collector.
121209
///
122210
/// All methods are safe to call from multiple threads without locking.
@@ -525,4 +613,84 @@ mod tests {
525613
assert!(delta.resolve_latency.is_none()); // no counter activity
526614
assert_eq!(delta.state_age.unwrap().last_state_update, 1000);
527615
}
616+
617+
#[test]
618+
fn prometheus_empty_snapshot() {
619+
let snap = TelemetrySnapshot::default();
620+
assert_eq!(snap.to_prometheus("w0"), "");
621+
}
622+
623+
#[test]
624+
fn prometheus_histogram_and_counters() {
625+
let tel = Telemetry::new();
626+
tel.record_latency_us(100);
627+
tel.record_latency_us(500);
628+
tel.mark_resolve(ResolveReason::Match);
629+
tel.mark_resolve(ResolveReason::Match);
630+
tel.mark_resolve(ResolveReason::NoSegmentMatch);
631+
632+
let prom = tel.snapshot().to_prometheus("w0");
633+
634+
// Histogram header
635+
assert!(prom.contains("# TYPE confidence_resolve_latency_microseconds histogram"));
636+
// Must have +Inf bucket with instance label
637+
assert!(prom.contains(
638+
r#"confidence_resolve_latency_microseconds_bucket{instance="w0",le="+Inf"} 2"#
639+
));
640+
// Sum and count with instance label
641+
assert!(prom.contains(r#"confidence_resolve_latency_microseconds_sum{instance="w0"} 600"#));
642+
assert!(prom.contains(r#"confidence_resolve_latency_microseconds_count{instance="w0"} 2"#));
643+
644+
// Counters with instance label
645+
assert!(prom.contains("# TYPE confidence_resolves_total counter"));
646+
assert!(prom.contains(
647+
r#"confidence_resolves_total{instance="w0",reason="RESOLVE_REASON_MATCH"} 2"#
648+
));
649+
assert!(prom.contains(
650+
r#"confidence_resolves_total{instance="w0",reason="RESOLVE_REASON_NO_SEGMENT_MATCH"} 1"#
651+
));
652+
// Zero-count reasons should be omitted
653+
assert!(!prom.contains("FLAG_ARCHIVED"));
654+
}
655+
656+
#[test]
657+
fn prometheus_cumulative_buckets() {
658+
let tel = Telemetry::new();
659+
// Two observations in different buckets
660+
tel.record_latency_us(10);
661+
tel.record_latency_us(10_000);
662+
663+
let prom = tel.snapshot().to_prometheus("w0");
664+
665+
// Parse all le buckets and verify they are monotonically non-decreasing
666+
let bucket_counts: Vec<u64> = prom
667+
.lines()
668+
.filter(|l| l.contains("le=\"") && !l.contains("+Inf"))
669+
.map(|l| l.rsplit_once(' ').unwrap().1.parse::<u64>().unwrap())
670+
.collect();
671+
672+
for pair in bucket_counts.windows(2) {
673+
assert!(
674+
pair[1] >= pair[0],
675+
"Prometheus buckets must be cumulative, got {} then {}",
676+
pair[0],
677+
pair[1]
678+
);
679+
}
680+
}
681+
682+
#[test]
683+
fn prometheus_distinct_instances_concatenate() {
684+
let tel = Telemetry::new();
685+
tel.record_latency_us(100);
686+
tel.mark_resolve(ResolveReason::Match);
687+
688+
let snap = tel.snapshot();
689+
let mut combined = snap.to_prometheus("w0");
690+
combined.push_str(&snap.to_prometheus("w1"));
691+
692+
// Both instances present
693+
assert!(combined.contains(r#"instance="w0""#));
694+
assert!(combined.contains(r#"instance="w1""#));
695+
}
528696
}
Binary file not shown.

openfeature-provider/go/confidence/internal/local_resolver/local_resolver.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type LocalResolver interface {
2020
ResolveProcess(*wasm.ResolveProcessRequest) (*wasm.ResolveProcessResponse, error)
2121
FlushAllLogs() error
2222
FlushAssignLogs() error
23+
PrometheusSnapshot() (string, error)
2324
Close(context.Context) error
2425
}
2526

openfeature-provider/go/confidence/internal/local_resolver/pool.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"strings"
78
"sync"
89
"sync/atomic"
910

@@ -91,6 +92,20 @@ func (s *PooledResolver) FlushAssignLogs() error {
9192
})
9293
}
9394

95+
// PrometheusSnapshot implements LocalResolver.
96+
func (s *PooledResolver) PrometheusSnapshot() (string, error) {
97+
var b strings.Builder
98+
err := s.maintenance(func(lr LocalResolver) error {
99+
text, err := lr.PrometheusSnapshot()
100+
if err != nil {
101+
return err
102+
}
103+
b.WriteString(text)
104+
return nil
105+
})
106+
return b.String(), err
107+
}
108+
94109
func (s *PooledResolver) Close(ctx context.Context) error {
95110
return s.maintenance(func(lr LocalResolver) error {
96111
return lr.Close(ctx)

openfeature-provider/go/confidence/internal/local_resolver/recover.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,13 @@ func (r *RecoveringResolver) FlushAssignLogs() (err error) {
117117
return
118118
}
119119

120+
func (r *RecoveringResolver) PrometheusSnapshot() (text string, err error) {
121+
r.withRecover("PrometheusSnapshot", &err, func(lr LocalResolver) {
122+
text, err = lr.PrometheusSnapshot()
123+
})
124+
return
125+
}
126+
120127
func (r *RecoveringResolver) Close(ctx context.Context) error {
121128
// For Close, if we panic, don't recreate during shutdown; just surface error.
122129
defer func() {

openfeature-provider/go/confidence/internal/local_resolver/wasm.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"context"
55
"encoding/binary"
66
"errors"
7+
"fmt"
78
"sync"
9+
"sync/atomic"
810
"time"
911

1012
_ "embed"
@@ -18,6 +20,9 @@ import (
1820
"google.golang.org/protobuf/types/known/timestamppb"
1921
)
2022

23+
// instanceCounter is a package-level counter for auto-assigning unique instance IDs.
24+
var instanceCounter atomic.Int64
25+
2126
// defaultWasmBytes contains the embedded WASM resolver module.
2227
// This file is automatically populated during the build process from wasm/confidence_resolver.wasm.
2328
// The WASM file is built from the Rust source in wasm/rust-guest/ and must be kept in sync.
@@ -32,9 +37,10 @@ type LogSink func(logs *resolverv1.WriteFlagLogsRequest)
3237
func NoOpLogSink(logs *resolverv1.WriteFlagLogsRequest) {}
3338

3439
type WasmResolver struct {
35-
instance api.Module
36-
logSink LogSink
37-
mu *sync.Mutex
40+
instance api.Module
41+
logSink LogSink
42+
mu *sync.Mutex
43+
instanceID string
3844
}
3945

4046
var _ LocalResolver = (*WasmResolver)(nil)
@@ -67,6 +73,18 @@ func (r *WasmResolver) FlushAssignLogs() error {
6773
return err
6874
}
6975

76+
func (r *WasmResolver) PrometheusSnapshot() (string, error) {
77+
req := &wasm.PrometheusSnapshotRequest{
78+
Instance: r.instanceID,
79+
}
80+
resp := &wasm.PrometheusSnapshotResponse{}
81+
err := r.call("wasm_msg_guest_prometheus_snapshot", req, resp)
82+
if err != nil {
83+
return "", err
84+
}
85+
return resp.GetText(), nil
86+
}
87+
7088
func (r *WasmResolver) Close(ctx context.Context) error {
7189
// TODO we should call flush assigned until it doesn't flush any more
7290
r.FlushAllLogs()
@@ -158,10 +176,12 @@ func (wrf *WasmResolverFactory) New() LocalResolver {
158176
if err != nil {
159177
panic(err)
160178
}
179+
id := instanceCounter.Add(1)
161180
return &WasmResolver{
162-
instance: instance,
163-
logSink: wrf.logSink,
164-
mu: &sync.Mutex{},
181+
instance: instance,
182+
logSink: wrf.logSink,
183+
mu: &sync.Mutex{},
184+
instanceID: fmt.Sprintf("%d", id),
165185
}
166186
}
167187

openfeature-provider/go/confidence/internal/proto/resolver/types.pb.go

Lines changed: 16 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

openfeature-provider/go/confidence/internal/proto/resolverinternal/internal_api.pb.go

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)