Skip to content

Commit bec1737

Browse files
authored
feat: collect metrics + impact metrics flatbuffers + ygg ffi (#81)
1 parent a80b33c commit bec1737

File tree

5 files changed

+268
-9
lines changed

5 files changed

+268
-9
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// <auto-generated>
2+
// automatically generated by the FlatBuffers compiler, do not modify
3+
// </auto-generated>
4+
5+
namespace yggdrasil.messaging
6+
{
7+
8+
using global::System;
9+
using global::System.Collections.Generic;
10+
using global::Google.FlatBuffers;
11+
12+
public struct CollectMetricsResponse : IFlatbufferObject
13+
{
14+
private Table __p;
15+
public ByteBuffer ByteBuffer { get { return __p.bb; } }
16+
public static void ValidateVersion() { FlatBufferConstants.FLATBUFFERS_25_2_10(); }
17+
public static CollectMetricsResponse GetRootAsCollectMetricsResponse(ByteBuffer _bb) { return GetRootAsCollectMetricsResponse(_bb, new CollectMetricsResponse()); }
18+
public static CollectMetricsResponse GetRootAsCollectMetricsResponse(ByteBuffer _bb, CollectMetricsResponse obj) { return (obj.__assign(_bb.GetInt(_bb.Position) + _bb.Position, _bb)); }
19+
public void __init(int _i, ByteBuffer _bb) { __p = new Table(_i, _bb); }
20+
public CollectMetricsResponse __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; }
21+
22+
public string Response { get { int o = __p.__offset(4); return o != 0 ? __p.__string(o + __p.bb_pos) : null; } }
23+
#if ENABLE_SPAN_T
24+
public Span<byte> GetResponseBytes() { return __p.__vector_as_span<byte>(4, 1); }
25+
#else
26+
public ArraySegment<byte>? GetResponseBytes() { return __p.__vector_as_arraysegment(4); }
27+
#endif
28+
public byte[] GetResponseArray() { return __p.__vector_as_array<byte>(4); }
29+
public string Error { get { int o = __p.__offset(6); return o != 0 ? __p.__string(o + __p.bb_pos) : null; } }
30+
#if ENABLE_SPAN_T
31+
public Span<byte> GetErrorBytes() { return __p.__vector_as_span<byte>(6, 1); }
32+
#else
33+
public ArraySegment<byte>? GetErrorBytes() { return __p.__vector_as_arraysegment(6); }
34+
#endif
35+
public byte[] GetErrorArray() { return __p.__vector_as_array<byte>(6); }
36+
37+
public static Offset<yggdrasil.messaging.CollectMetricsResponse> CreateCollectMetricsResponse(FlatBufferBuilder builder,
38+
StringOffset responseOffset = default(StringOffset),
39+
StringOffset errorOffset = default(StringOffset)) {
40+
builder.StartTable(2);
41+
CollectMetricsResponse.AddError(builder, errorOffset);
42+
CollectMetricsResponse.AddResponse(builder, responseOffset);
43+
return CollectMetricsResponse.EndCollectMetricsResponse(builder);
44+
}
45+
46+
public static void StartCollectMetricsResponse(FlatBufferBuilder builder) { builder.StartTable(2); }
47+
public static void AddResponse(FlatBufferBuilder builder, StringOffset responseOffset) { builder.AddOffset(0, responseOffset.Value, 0); }
48+
public static void AddError(FlatBufferBuilder builder, StringOffset errorOffset) { builder.AddOffset(1, errorOffset.Value, 0); }
49+
public static Offset<yggdrasil.messaging.CollectMetricsResponse> EndCollectMetricsResponse(FlatBufferBuilder builder) {
50+
int o = builder.EndTable();
51+
return new Offset<yggdrasil.messaging.CollectMetricsResponse>(o);
52+
}
53+
}
54+
55+
56+
static public class CollectMetricsResponseVerify
57+
{
58+
static public bool Verify(Google.FlatBuffers.Verifier verifier, uint tablePos)
59+
{
60+
return verifier.VerifyTableStart(tablePos)
61+
&& verifier.VerifyString(tablePos, 4 /*Response*/, false)
62+
&& verifier.VerifyString(tablePos, 6 /*Error*/, false)
63+
&& verifier.VerifyTableEnd(tablePos);
64+
}
65+
}
66+
67+
}

flat-buffer-defs/enabled-message.fbs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,11 @@ table ObserveHistogram {
198198
labels: [SampleLabelEntry];
199199
}
200200

201+
table CollectMetricsResponse {
202+
response: string;
203+
error: string;
204+
}
205+
201206
root_type Response;
202207
root_type ContextMessage;
203208
root_type MetricsResponse;

yggdrasilffi/src/flat/enabled-message_generated.rs

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4532,6 +4532,120 @@ impl core::fmt::Debug for ObserveHistogram<'_> {
45324532
ds.finish()
45334533
}
45344534
}
4535+
pub enum CollectMetricsResponseOffset {}
4536+
#[derive(Copy, Clone, PartialEq)]
4537+
4538+
pub struct CollectMetricsResponse<'a> {
4539+
pub _tab: flatbuffers::Table<'a>,
4540+
}
4541+
4542+
impl<'a> flatbuffers::Follow<'a> for CollectMetricsResponse<'a> {
4543+
type Inner = CollectMetricsResponse<'a>;
4544+
#[inline]
4545+
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
4546+
Self { _tab: flatbuffers::Table::new(buf, loc) }
4547+
}
4548+
}
4549+
4550+
impl<'a> CollectMetricsResponse<'a> {
4551+
pub const VT_RESPONSE: flatbuffers::VOffsetT = 4;
4552+
pub const VT_ERROR: flatbuffers::VOffsetT = 6;
4553+
4554+
#[inline]
4555+
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
4556+
CollectMetricsResponse { _tab: table }
4557+
}
4558+
#[allow(unused_mut)]
4559+
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
4560+
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
4561+
args: &'args CollectMetricsResponseArgs<'args>
4562+
) -> flatbuffers::WIPOffset<CollectMetricsResponse<'bldr>> {
4563+
let mut builder = CollectMetricsResponseBuilder::new(_fbb);
4564+
if let Some(x) = args.error { builder.add_error(x); }
4565+
if let Some(x) = args.response { builder.add_response(x); }
4566+
builder.finish()
4567+
}
4568+
4569+
4570+
#[inline]
4571+
pub fn response(&self) -> Option<&'a str> {
4572+
// Safety:
4573+
// Created from valid Table for this object
4574+
// which contains a valid value in this slot
4575+
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<&str>>(CollectMetricsResponse::VT_RESPONSE, None)}
4576+
}
4577+
#[inline]
4578+
pub fn error(&self) -> Option<&'a str> {
4579+
// Safety:
4580+
// Created from valid Table for this object
4581+
// which contains a valid value in this slot
4582+
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<&str>>(CollectMetricsResponse::VT_ERROR, None)}
4583+
}
4584+
}
4585+
4586+
impl flatbuffers::Verifiable for CollectMetricsResponse<'_> {
4587+
#[inline]
4588+
fn run_verifier(
4589+
v: &mut flatbuffers::Verifier, pos: usize
4590+
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
4591+
use self::flatbuffers::Verifiable;
4592+
v.visit_table(pos)?
4593+
.visit_field::<flatbuffers::ForwardsUOffset<&str>>("response", Self::VT_RESPONSE, false)?
4594+
.visit_field::<flatbuffers::ForwardsUOffset<&str>>("error", Self::VT_ERROR, false)?
4595+
.finish();
4596+
Ok(())
4597+
}
4598+
}
4599+
pub struct CollectMetricsResponseArgs<'a> {
4600+
pub response: Option<flatbuffers::WIPOffset<&'a str>>,
4601+
pub error: Option<flatbuffers::WIPOffset<&'a str>>,
4602+
}
4603+
impl<'a> Default for CollectMetricsResponseArgs<'a> {
4604+
#[inline]
4605+
fn default() -> Self {
4606+
CollectMetricsResponseArgs {
4607+
response: None,
4608+
error: None,
4609+
}
4610+
}
4611+
}
4612+
4613+
pub struct CollectMetricsResponseBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
4614+
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
4615+
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
4616+
}
4617+
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> CollectMetricsResponseBuilder<'a, 'b, A> {
4618+
#[inline]
4619+
pub fn add_response(&mut self, response: flatbuffers::WIPOffset<&'b str>) {
4620+
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(CollectMetricsResponse::VT_RESPONSE, response);
4621+
}
4622+
#[inline]
4623+
pub fn add_error(&mut self, error: flatbuffers::WIPOffset<&'b str>) {
4624+
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(CollectMetricsResponse::VT_ERROR, error);
4625+
}
4626+
#[inline]
4627+
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> CollectMetricsResponseBuilder<'a, 'b, A> {
4628+
let start = _fbb.start_table();
4629+
CollectMetricsResponseBuilder {
4630+
fbb_: _fbb,
4631+
start_: start,
4632+
}
4633+
}
4634+
#[inline]
4635+
pub fn finish(self) -> flatbuffers::WIPOffset<CollectMetricsResponse<'a>> {
4636+
let o = self.fbb_.end_table(self.start_);
4637+
flatbuffers::WIPOffset::new(o.value())
4638+
}
4639+
}
4640+
4641+
impl core::fmt::Debug for CollectMetricsResponse<'_> {
4642+
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
4643+
let mut ds = f.debug_struct("CollectMetricsResponse");
4644+
ds.field("response", &self.response());
4645+
ds.field("error", &self.error());
4646+
ds.finish()
4647+
}
4648+
}
45354649
#[inline]
45364650
/// Verifies that a buffer of bytes contains a `VoidResponse`
45374651
/// and returns it.

yggdrasilffi/src/flat/mod.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ use std::borrow::Cow;
1111
use std::ffi::{c_char, c_void};
1212
use unleash_yggdrasil::impact_metrics::MetricOptions;
1313

14-
use crate::flat::messaging::yggdrasil::messaging::{DefineCounter, VoidResponse};
15-
use crate::flat::serialisation::{Buf, TakeStateResult};
14+
use crate::flat::messaging::yggdrasil::messaging::{
15+
CollectMetricsResponse, DefineCounter, VoidResponse,
16+
};
17+
use crate::flat::serialisation::{Buf, MetricMeasurement, TakeStateResult};
1618
use crate::{get_json, ManagedEngine, RawPointerDataType};
1719
use chrono::Utc;
1820
use messaging::yggdrasil::messaging::{
@@ -343,6 +345,32 @@ pub unsafe extern "C" fn flat_define_counter(
343345
VoidResponse::build_response(result)
344346
}
345347

348+
/// Collects and returns metrics and impact metrics
349+
///
350+
/// # Safety
351+
///
352+
/// passing an invalid engine_ptr will cause UB
353+
/// the returned Buf should be freed by calling flat_buf_free, otherwise you're leaking memory
354+
///
355+
#[no_mangle]
356+
pub unsafe extern "C" fn flat_collect_metrics(engine_ptr: *mut c_void) -> Buf {
357+
let result = guard_result::<MetricMeasurement, _>(|| {
358+
let guard = get_engine(engine_ptr)?;
359+
let mut engine = recover_lock(&guard);
360+
let impact_metrics = engine.collect_impact_metrics();
361+
let bucket = engine.get_metrics(Utc::now());
362+
if bucket.is_none() && impact_metrics.is_empty() {
363+
return Ok(None);
364+
}
365+
Ok(Some(MetricMeasurement {
366+
metrics: bucket,
367+
impact_metrics,
368+
}))
369+
});
370+
371+
CollectMetricsResponse::build_response(result)
372+
}
373+
346374
#[cfg(test)]
347375
mod tests {
348376
use super::*;

yggdrasilffi/src/flat/serialisation.rs

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset};
2+
use serde::{Deserialize, Serialize};
23
use std::collections::BTreeMap;
34
use std::{
45
cell::RefCell,
56
fmt::{Display, Formatter},
67
};
78
use unleash_types::client_metrics::MetricBucket;
9+
use unleash_yggdrasil::impact_metrics::CollectedMetric;
810
use unleash_yggdrasil::{EvalWarning, ExtendedVariantDef, ToggleDefinition};
911

1012
use crate::flat::messaging::yggdrasil::messaging::{
11-
BuiltInStrategies, BuiltInStrategiesBuilder, CoreVersion, CoreVersionBuilder,
12-
FeatureDefBuilder, FeatureDefs, FeatureDefsBuilder, MetricsResponse, MetricsResponseBuilder,
13-
Response, ResponseBuilder, StrategyDefinition, StrategyDefinitionArgs, StrategyFeature,
14-
StrategyFeatureArgs, StrategyParameter, StrategyParameterArgs, TakeStateResponse,
15-
TakeStateResponseArgs, TakeStateResponseBuilder, ToggleEntryBuilder, ToggleStatsBuilder,
16-
Variant, VariantBuilder, VariantEntryBuilder, VariantPayloadBuilder, VoidResponse,
17-
VoidResponseBuilder,
13+
BuiltInStrategies, BuiltInStrategiesBuilder, CollectMetricsResponse,
14+
CollectMetricsResponseBuilder, CoreVersion, CoreVersionBuilder, FeatureDefBuilder, FeatureDefs,
15+
FeatureDefsBuilder, MetricsResponse, MetricsResponseBuilder, Response, ResponseBuilder,
16+
StrategyDefinition, StrategyDefinitionArgs, StrategyFeature, StrategyFeatureArgs,
17+
StrategyParameter, StrategyParameterArgs, TakeStateResponse, TakeStateResponseArgs,
18+
TakeStateResponseBuilder, ToggleEntryBuilder, ToggleStatsBuilder, Variant, VariantBuilder,
19+
VariantEntryBuilder, VariantPayloadBuilder, VoidResponse, VoidResponseBuilder,
1820
};
1921

2022
thread_local! {
@@ -44,6 +46,12 @@ pub struct TakeStateResult {
4446
pub feature_strategies_map: BTreeMap<String, BTreeMap<String, BTreeMap<String, String>>>,
4547
}
4648

49+
#[derive(Debug, Clone, Deserialize, Serialize)]
50+
pub struct MetricMeasurement {
51+
pub metrics: Option<MetricBucket>,
52+
pub impact_metrics: Vec<CollectedMetric>,
53+
}
54+
4755
#[repr(C)]
4856
pub struct Buf {
4957
pub ptr: *mut u8, // points to heap memory owned by Rust
@@ -259,6 +267,43 @@ impl FlatMessage<Result<Option<()>, FlatError>> for VoidResponse<'static> {
259267
}
260268
}
261269

270+
impl FlatMessage<Result<Option<MetricMeasurement>, FlatError>> for CollectMetricsResponse<'static> {
271+
fn as_flat_buffer(
272+
builder: &mut FlatBufferBuilder<'static>,
273+
from: Result<Option<MetricMeasurement>, FlatError>,
274+
) -> WIPOffset<Self> {
275+
match from {
276+
Err(error) => {
277+
let error_offset = builder.create_string(&error.to_string());
278+
let mut response_builder = CollectMetricsResponseBuilder::new(builder);
279+
response_builder.add_error(error_offset);
280+
response_builder.finish()
281+
}
282+
Ok(Some(measurement)) => {
283+
let metrics_str = serde_json::to_string(&measurement);
284+
match metrics_str {
285+
Err(error) => {
286+
let error_offset = builder.create_string(&error.to_string());
287+
let mut response_builder = CollectMetricsResponseBuilder::new(builder);
288+
response_builder.add_error(error_offset);
289+
response_builder.finish()
290+
}
291+
Ok(m_str) => {
292+
let collect_response = builder.create_string(&m_str);
293+
let mut response_builder = CollectMetricsResponseBuilder::new(builder);
294+
response_builder.add_response(collect_response);
295+
response_builder.finish()
296+
}
297+
}
298+
}
299+
Ok(None) => {
300+
let resp_builder = CollectMetricsResponseBuilder::new(builder);
301+
resp_builder.finish()
302+
}
303+
}
304+
}
305+
}
306+
262307
impl FlatMessage<Result<Option<ResponseMessage<ExtendedVariantDef>>, FlatError>>
263308
for Variant<'static>
264309
{

0 commit comments

Comments
 (0)