diff --git a/Cargo.lock b/Cargo.lock index 073c14f257..4e74161001 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1815,6 +1815,7 @@ dependencies = [ "libc", "paste", "rmp-serde", + "serde_json", "tempfile", "tinybytes", "tracing", @@ -1979,6 +1980,7 @@ dependencies = [ "base64 0.22.1", "datadog-ddsketch", "ddcommon", + "ddcommon-ffi", "futures", "hashbrown 0.15.1", "http 1.1.0", diff --git a/datadog-sidecar-ffi/Cargo.toml b/datadog-sidecar-ffi/Cargo.toml index 871fc8a7ea..14cd1ed0a0 100644 --- a/datadog-sidecar-ffi/Cargo.toml +++ b/datadog-sidecar-ffi/Cargo.toml @@ -26,6 +26,7 @@ paste = "1" libc = "0.2" tracing = { version = "0.1", default-features = false } rmp-serde = "1.1.1" +serde_json = "1.0" [target.'cfg(windows)'.dependencies] diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 9452a89484..8830d71bd9 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -56,6 +56,8 @@ use std::ptr::NonNull; use std::slice; use std::sync::Arc; use std::time::Duration; +use serde_json::Value; + #[no_mangle] #[cfg(target_os = "windows")] @@ -410,6 +412,87 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_enqueueConfig( MaybeError::None } +/// Reports an endpoint to the telemetry. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( + transport: &mut Box, + instance_id: &InstanceId, + queue_id: &QueueId, + r#type: CharSlice, + method: ddtelemetry::data::Method, + path: CharSlice, + operation_name: CharSlice, + resource_name: CharSlice, + request_body_type:*mut ffi::Vec, + response_body_type:*mut ffi::Vec, + response_code:i32, + authentication:*mut ffi::Vec, + metadata: CharSlice +) -> MaybeError { + + let response_code_vec = vec![response_code]; + let request_body_type_local = Box::into_raw(Box::new(ffi::Vec::::new())); + let response_body_type_local = Box::into_raw(Box::new(ffi::Vec::::new())); + let authentication_local = Box::into_raw(Box::new(ffi::Vec::::new())); + if let Some(req_vec) = request_body_type.as_ref() { + if let Some(local_vec) = request_body_type_local.as_mut() { + for item in req_vec.to_vec() { + local_vec.push(item); + } + } + } + if let Some(resp_vec) = response_body_type.as_ref() { + if let Some(local_vec) = response_body_type_local.as_mut() { + for item in resp_vec.to_vec() { + local_vec.push(item); + } + } + } + if let Some(auth_vec) = authentication.as_ref() { + if let Some(local_vec) = authentication_local.as_mut() { + for item in auth_vec.to_vec() { + local_vec.push(item); + } + } + } + + let maybe_metadata: Result = serde_json::from_slice::(std::slice::from_raw_parts( + metadata.as_ptr() as *const u8, + metadata.len(), + )); + if let Err(e) = maybe_metadata { + return MaybeError::Some(e.to_string().into()); + } + #[allow(clippy::unwrap_used)] + let metadata_json = maybe_metadata.unwrap(); + let endpoint = TelemetryActions::AddEndpoint(ddtelemetry::data::Endpoint { + r#type: Some(r#type.to_utf8_lossy().into_owned()), + method: Some(method), + path: Some(path.to_utf8_lossy().into_owned()), + operation_name: operation_name.to_utf8_lossy().into_owned(), + resource_name: resource_name.to_utf8_lossy().into_owned(), + request_body_type: Some(request_body_type_local.as_ref().unwrap().to_vec().iter().map(|s| s.to_utf8_lossy().into_owned()).collect()), + response_body_type: Some(response_body_type_local.as_ref().unwrap().to_vec().iter().map(|s| s.to_utf8_lossy().into_owned()).collect()), + response_code: Some(response_code_vec), + authentication: Some(authentication_local.as_ref().unwrap().to_vec()), + metadata: Some(metadata_json), + }); + + try_c!(blocking::enqueue_actions( + transport, + instance_id, + queue_id, + vec![SidecarAction::Telemetry(endpoint)], + )); + + std::ptr::drop_in_place(request_body_type); + std::ptr::drop_in_place(response_body_type); + std::ptr::drop_in_place(authentication); + + MaybeError::None +} + /// Reports a dependency to the telemetry. #[no_mangle] #[allow(clippy::missing_safety_doc)] diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index 2ceaa9b7aa..cf863bb035 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -28,7 +28,7 @@ use std::collections::{HashMap, HashSet}; use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use tracing::{debug, error, info, trace, warn}; use futures::FutureExt; @@ -297,12 +297,12 @@ impl SidecarServer { let futures = clients .values() - .filter_map(|client| client.client.stats().ok()) + .filter_map(|client| client.client.lock_or_panic().worker.stats().ok()) .collect::>(); let metric_counts = clients .values() - .map(|client| client.telemetry_metrics.lock_or_panic().len() as u32) + .map(|client| client.client.lock_or_panic().telemetry_metrics.len() as u32) .collect::>(); (futures, metric_counts) @@ -403,7 +403,7 @@ impl SidecarInterface for SidecarServer { let env = entry.get().env.as_deref().unwrap_or("none"); // Lock telemetry client - let mut telemetry = self.telemetry_clients.get_or_create( + let telemetry_mutex = self.telemetry_clients.get_or_create( service, env, &instance_id, @@ -420,8 +420,9 @@ impl SidecarInterface for SidecarServer { }) }, ); + let mut telemetry = telemetry_mutex.lock_or_panic(); - let mut actions_to_process = vec![]; + let mut actions_to_process: Vec = vec![]; let mut composer_paths_to_process = vec![]; let mut buffered_info_changed = false; let mut remove_entry = false; @@ -448,6 +449,11 @@ impl SidecarInterface for SidecarServer { } SidecarAction::ClearQueueId => { remove_entry = true; + }, + SidecarAction::Telemetry(TelemetryActions::AddEndpoint(_)) => { + telemetry.last_endpoints_push = SystemTime::now(); + buffered_info_changed = true; + actions_to_process.push(action); } SidecarAction::Telemetry(TelemetryActions::Lifecycle( LifecycleAction::Stop, @@ -462,24 +468,25 @@ impl SidecarInterface for SidecarServer { } if !actions_to_process.is_empty() { - let client_clone = telemetry.clone(); - let mut handle = telemetry.handle.lock_or_panic(); - let last_handle = handle.take(); - *handle = Some(tokio::spawn(async move { + let telemetry_mutex_clone = telemetry_mutex.clone(); + let worker = telemetry.worker.clone(); + let last_handle = telemetry.handle.take(); + telemetry.handle = Some(tokio::spawn(async move { if let Some(last_handle) = last_handle { last_handle.await.ok(); }; - let processed = client_clone.process_actions(actions_to_process); + let processed = telemetry_mutex_clone + .lock_or_panic() + .process_actions(actions_to_process); debug!("Sending Processed Actions :{processed:?}"); - client_clone.client.send_msgs(processed).await.ok(); + worker.send_msgs(processed).await.ok(); })); } if !composer_paths_to_process.is_empty() { - let client_clone = telemetry.clone(); - let mut handle = telemetry.handle.lock_or_panic(); - let last_handle = handle.take(); - *handle = Some(tokio::spawn(async move { + let worker = telemetry.worker.clone(); + let last_handle = telemetry.handle.take(); + telemetry.handle = Some(tokio::spawn(async move { if let Some(last_handle) = last_handle { last_handle.await.ok(); }; @@ -487,7 +494,7 @@ impl SidecarInterface for SidecarServer { TelemetryCachedClient::process_composer_paths(composer_paths_to_process) .await; debug!("Sending Composer Paths :{composer_actions:?}"); - client_clone.client.send_msgs(composer_actions).await.ok(); + worker.send_msgs(composer_actions).await.ok(); })); } diff --git a/datadog-sidecar/src/service/telemetry.rs b/datadog-sidecar/src/service/telemetry.rs index a556f82866..c7fabc8bdd 100644 --- a/datadog-sidecar/src/service/telemetry.rs +++ b/datadog-sidecar/src/service/telemetry.rs @@ -54,20 +54,24 @@ struct ComposerPackages { packages: Vec, } -#[derive(Clone)] +pub struct TelemetryCachedEntry { + last_used: Instant, + pub client: Arc>, +} + pub struct TelemetryCachedClient { - pub client: TelemetryWorkerHandle, - pub shm_writer: Arc>, - pub last_used: Instant, + pub worker: TelemetryWorkerHandle, + pub shm_writer: OneWayShmWriter, pub config_sent: bool, pub buffered_integrations: HashSet, pub buffered_composer_paths: HashSet, - pub telemetry_metrics: Arc>>, - pub handle: Arc>>>, + pub last_endpoints_push: SystemTime, + pub telemetry_metrics: HashMap, + pub handle: Option>, } impl TelemetryCachedClient { - pub fn new( + fn new( service: &str, env: &str, instance_id: &InstanceId, @@ -90,17 +94,17 @@ impl TelemetryCachedClient { info!("spawning telemetry worker {config:?}"); Self { - client: handle.clone(), - shm_writer: Arc::new( + worker: handle.clone(), + shm_writer: { #[allow(clippy::unwrap_used)] - OneWayShmWriter::::new(path_for_telemetry(service, env)).unwrap(), - ), - last_used: Instant::now(), + OneWayShmWriter::::new(path_for_telemetry(service, env)).unwrap() + }, config_sent: false, buffered_integrations: HashSet::new(), buffered_composer_paths: HashSet::new(), + last_endpoints_push: SystemTime::UNIX_EPOCH, telemetry_metrics: Default::default(), - handle: Arc::new(Mutex::new(None)), + handle: None, } } @@ -109,6 +113,7 @@ impl TelemetryCachedClient { &self.config_sent, &self.buffered_integrations, &self.buffered_composer_paths, + &self.last_endpoints_push, )) { self.shm_writer.write(&buf); } else { @@ -116,12 +121,11 @@ impl TelemetryCachedClient { } } - pub fn register_metric(&self, metric: MetricContext) { - let mut metrics = self.telemetry_metrics.lock_or_panic(); - if !metrics.contains_key(&metric.name) { - metrics.insert( + pub fn register_metric(&mut self, metric: MetricContext) { + if !self.telemetry_metrics.contains_key(&metric.name) { + self.telemetry_metrics.insert( metric.name.clone(), - self.client.register_metric_context( + self.worker.register_metric_context( metric.name, metric.tags, metric.metric_type, @@ -137,14 +141,13 @@ impl TelemetryCachedClient { (name, val, tags): (String, f64, Vec), ) -> TelemetryActions { #[allow(clippy::unwrap_used)] - TelemetryActions::AddPoint(( - val, - *self.telemetry_metrics.lock_or_panic().get(&name).unwrap(), - tags, - )) + TelemetryActions::AddPoint((val, *self.telemetry_metrics.get(&name).unwrap(), tags)) } - pub fn process_actions(&self, sidecar_actions: Vec) -> Vec { + pub fn process_actions( + &mut self, + sidecar_actions: Vec, + ) -> Vec { let mut actions = vec![]; for action in sidecar_actions { match action { @@ -233,13 +236,13 @@ type EnvString = String; type TelemetryCachedClientKey = (ServiceString, EnvString); pub struct TelemetryCachedClientSet { - pub inner: Arc>>, + pub inner: Arc>>, cleanup_handle: Option>, } impl Default for TelemetryCachedClientSet { fn default() -> Self { - let inner: Arc>> = + let inner: Arc>> = Arc::new(Default::default()); let clients = inner.clone(); @@ -283,7 +286,7 @@ impl TelemetryCachedClientSet { instance_id: &InstanceId, runtime_meta: &RuntimeMetadata, get_config: F, - ) -> TelemetryCachedClient + ) -> Arc> where F: FnOnce() -> ddtelemetry::config::Config, { @@ -293,12 +296,10 @@ impl TelemetryCachedClientSet { if let Some(existing) = map.get_mut(&key) { existing.last_used = Instant::now(); - let client = existing.clone(); tokio::spawn({ - let telemetry = client.clone(); + let worker = existing.client.lock_or_panic().worker.clone(); async move { - telemetry - .client + worker .send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start)) .await .ok(); @@ -306,19 +307,26 @@ impl TelemetryCachedClientSet { }); info!("Reusing existing telemetry client for {key:?}"); - return client; + return existing.client.clone(); } - let client = - TelemetryCachedClient::new(service, env, instance_id, runtime_meta, get_config); + let entry = TelemetryCachedEntry { + last_used: Instant::now(), + client: Arc::new(Mutex::new(TelemetryCachedClient::new( + service, + env, + instance_id, + runtime_meta, + get_config, + ))), + }; - map.insert(key.clone(), client.clone()); + let entry = map.entry(key.clone()).or_insert(entry); tokio::spawn({ - let telemetry = client.clone(); + let worker = entry.client.lock_or_panic().worker.clone(); async move { - telemetry - .client + worker .send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start)) .await .ok(); @@ -327,7 +335,7 @@ impl TelemetryCachedClientSet { info!("Created new telemetry client for {key:?}"); - client + entry.client.clone() } pub fn remove_telemetry_client(&self, service: &str, env: &str) { @@ -385,10 +393,11 @@ pub(crate) async fn telemetry_action_receiver_task(sidecar: SidecarServer) { &actions.service_name, &actions.env_name, ); + let client = telemetry_client.lock_or_panic().worker.clone(); for action in actions.actions { let action_str = format!("{action:?}"); - match telemetry_client.client.send_msg(action).await { + match client.send_msg(action).await { Ok(_) => { debug!("Sent telemetry action to TelemetryWorker: {action_str}"); } @@ -406,7 +415,7 @@ fn get_telemetry_client( instance_id: &InstanceId, service_name: &str, env_name: &str, -) -> TelemetryCachedClient { +) -> Arc> { let session = sidecar.get_session(&instance_id.session_id); let trace_config = session.get_trace_config(); let runtime_meta = RuntimeMetadata::new( diff --git a/ddtelemetry/Cargo.toml b/ddtelemetry/Cargo.toml index 824a0e6490..5dacec6bed 100644 --- a/ddtelemetry/Cargo.toml +++ b/ddtelemetry/Cargo.toml @@ -32,6 +32,7 @@ uuid = { version = "1.3", features = ["v4"] } hashbrown = "0.15" ddcommon = { path = "../ddcommon", default-features = false} +ddcommon-ffi = { path = "../ddcommon-ffi", default-features = false } datadog-ddsketch = { path = "../ddsketch" } [dev-dependencies] diff --git a/ddtelemetry/src/data/payload.rs b/ddtelemetry/src/data/payload.rs index d966008534..db49fa2774 100644 --- a/ddtelemetry/src/data/payload.rs +++ b/ddtelemetry/src/data/payload.rs @@ -12,6 +12,7 @@ pub enum Payload { AppDependenciesLoaded(AppDependenciesLoaded), AppIntegrationsChange(AppIntegrationsChange), AppClientConfigurationChange(AppClientConfigurationChange), + AppEndpointsChange(AppEndpointsChange), AppHeartbeat(#[serde(skip_serializing)] ()), AppClosing(#[serde(skip_serializing)] ()), GenerateMetrics(GenerateMetrics), @@ -29,6 +30,7 @@ impl Payload { AppDependenciesLoaded(_) => "app-dependencies-loaded", AppIntegrationsChange(_) => "app-integrations-change", AppClientConfigurationChange(_) => "app-client-configuration-change", + AppEndpointsChange(_) => "app-endpoints-change", AppHeartbeat(_) => "app-heartbeat", AppClosing(_) => "app-closing", GenerateMetrics(_) => "generate-metrics", diff --git a/ddtelemetry/src/data/payloads.rs b/ddtelemetry/src/data/payloads.rs index 3b7d9a9c28..74edf55e59 100644 --- a/ddtelemetry/src/data/payloads.rs +++ b/ddtelemetry/src/data/payloads.rs @@ -61,6 +61,12 @@ pub struct AppClientConfigurationChange { pub configuration: Vec, } +#[derive(Debug, Serialize)] +pub struct AppEndpointsChange { + pub is_first: bool, + pub endpoints: Vec, +} + #[derive(Serialize, Debug)] pub struct GenerateMetrics { pub series: Vec, @@ -95,3 +101,58 @@ pub enum LogLevel { Warn, Debug, } + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] +#[serde(rename_all = "UPPERCASE")] +#[repr(C)] +pub enum Method { + Get = 0, + Post = 1, + Put = 2, + Delete = 3, + Patch = 4, + Head = 5, + Options = 6, + Trace = 7, + Connect = 8, + Other = 9, //This is specified as "*" in the OpenAPI spec +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] +#[serde(rename_all = "UPPERCASE")] +#[repr(C)] +pub enum Authentication { + Jwt = 0, + Basic = 1, + Oauth = 2, + Oidc = 3, + ApiKey = 4, + Session = 5, + Mtls = 6, + Saml = 7, + Ldap = 8, + Form = 9, + Other = 10, +} + +#[derive(Serialize, Deserialize, Debug, Hash, PartialEq, Eq, Clone, Default)] +pub struct Endpoint { + #[serde(default)] + pub r#type: Option, + #[serde(default)] + pub method: Option, + #[serde(default)] + pub path: Option, + pub operation_name: String, + pub resource_name: String, + #[serde(default)] + pub request_body_type: Option>, + #[serde(default)] + pub response_body_type: Option>, + #[serde(default)] + pub response_code: Option>, + #[serde(default)] + pub authentication: Option>, + #[serde(default)] + pub metadata: Option, +} diff --git a/ddtelemetry/src/worker/mod.rs b/ddtelemetry/src/worker/mod.rs index 1d33d88356..fea2d4df4d 100644 --- a/ddtelemetry/src/worker/mod.rs +++ b/ddtelemetry/src/worker/mod.rs @@ -7,10 +7,9 @@ pub mod store; use crate::{ config::Config, - data::{self, Application, Dependency, Host, Integration, Log, Payload, Telemetry}, + data::{self, Application, Dependency, Endpoint, Host, Integration, Log, Payload, Telemetry}, metrics::{ContextKey, MetricBuckets, MetricContexts}, }; -use ddcommon::Endpoint; use ddcommon::{hyper_migration, tag::Tag, worker::Worker}; use std::fmt::Debug; @@ -79,6 +78,7 @@ pub enum TelemetryActions { AddDependency(Dependency), AddIntegration(Integration), AddLog((LogIdentifier, Log)), + AddEndpoint(Endpoint), Lifecycle(LifecycleAction), #[serde(skip)] CollectStats(oneshot::Sender), @@ -110,6 +110,7 @@ struct TelemetryWorkerData { dependencies: store::Store, configurations: store::Store, integrations: store::Store, + endpoints: store::Store, logs: store::QueueHashMap, metric_contexts: MetricContexts, metric_buckets: MetricBuckets, @@ -181,6 +182,8 @@ pub struct TelemetryWorkerStats { pub configurations_unflushed: u32, pub integrations_stored: u32, pub integrations_unflushed: u32, + pub endpoints_stored: u32, + pub endpoints_unflushed: u32, pub logs: u32, pub metric_contexts: u32, pub metric_buckets: MetricBucketStats, @@ -197,6 +200,8 @@ impl Add for TelemetryWorkerStats { configurations_unflushed: self.configurations_unflushed + rhs.configurations_unflushed, integrations_stored: self.integrations_stored + rhs.integrations_stored, integrations_unflushed: self.integrations_unflushed + rhs.integrations_unflushed, + endpoints_stored: self.endpoints_stored + rhs.endpoints_stored, + endpoints_unflushed: self.endpoints_unflushed + rhs.endpoints_unflushed, logs: self.logs + rhs.logs, metric_contexts: self.metric_contexts + rhs.metric_contexts, metric_buckets: MetricBucketStats { @@ -315,7 +320,7 @@ impl TelemetryWorker { } } } - AddConfig(_) | AddDependency(_) | AddIntegration(_) | Lifecycle(ExtendedHeartbeat) => {} + AddConfig(_) | AddDependency(_) | AddIntegration(_) | AddEndpoint(_) | Lifecycle(ExtendedHeartbeat) => {} Lifecycle(Stop) => { if !self.data.started { return BREAK; @@ -372,6 +377,7 @@ impl TelemetryWorker { AddDependency(dep) => self.data.dependencies.insert(dep), AddIntegration(integration) => self.data.integrations.insert(integration), AddConfig(cfg) => self.data.configurations.insert(cfg), + AddEndpoint(endpoint) => self.data.endpoints.insert(endpoint), AddLog((identifier, log)) => { let (l, new) = self.data.logs.get_mut_or_insert(identifier, log); if !new { @@ -424,6 +430,7 @@ impl TelemetryWorker { self.data.dependencies.unflush_stored(); self.data.integrations.unflush_stored(); self.data.configurations.unflush_stored(); + self.data.endpoints.unflush_stored(); let app_started = data::Payload::AppStarted(self.build_app_started()); match self.send_payload(&app_started).await { @@ -516,6 +523,14 @@ impl TelemetryWorker { }, )) } + if self.data.endpoints.flush_not_empty() { + payloads.push(data::Payload::AppEndpointsChange( + data::AppEndpointsChange { + is_first: true, + endpoints: self.data.endpoints.unflushed().cloned().collect(), + }, + )) + } payloads } @@ -618,6 +633,9 @@ impl TelemetryWorker { .data .configurations .removed_flushed(p.configuration.len()), + AppEndpointsChange(p) => { + self.data.endpoints.removed_flushed(p.endpoints.len()) + } MessageBatch(batch) => { for p in batch { self.payload_sent_success(p); @@ -699,7 +717,7 @@ impl TelemetryWorker { if let Some(endpoint) = self.config.endpoint.as_ref() { endpoint.timeout_ms } else { - Endpoint::DEFAULT_TIMEOUT + ddcommon::Endpoint::DEFAULT_TIMEOUT })) => { Err(anyhow::anyhow!("Request timed out")) }, @@ -722,6 +740,8 @@ impl TelemetryWorker { configurations_unflushed: self.data.configurations.len_unflushed() as u32, integrations_stored: self.data.integrations.len_stored() as u32, integrations_unflushed: self.data.integrations.len_unflushed() as u32, + endpoints_stored: self.data.endpoints.len_stored() as u32, + endpoints_unflushed: self.data.endpoints.len_unflushed() as u32, logs: self.data.logs.len() as u32, metric_contexts: self.data.metric_contexts.lock().len() as u32, metric_buckets: self.data.metric_buckets.stats(), @@ -910,7 +930,7 @@ impl TelemetryWorkerHandle { } } -/// How many dependencies/integrations/configs we keep in memory at most +/// How many dependencies/integrations/configs/endpoints we keep in memory at most pub const MAX_ITEMS: usize = 5000; #[derive(Debug, Default, Clone, Copy)] @@ -930,6 +950,7 @@ pub struct TelemetryWorkerBuilder { pub dependencies: store::Store, pub integrations: store::Store, pub configurations: store::Store, + pub endpoints: store::Store, pub native_deps: bool, pub rust_shared_lib_deps: bool, pub config: Config, @@ -980,6 +1001,7 @@ impl TelemetryWorkerBuilder { dependencies: store::Store::new(MAX_ITEMS), integrations: store::Store::new(MAX_ITEMS), configurations: store::Store::new(MAX_ITEMS), + endpoints: store::Store::new(MAX_ITEMS), native_deps: true, rust_shared_lib_deps: false, config: Config::default(), @@ -1010,6 +1032,7 @@ impl TelemetryWorkerBuilder { dependencies: self.dependencies, integrations: self.integrations, configurations: self.configurations, + endpoints: self.endpoints, logs: store::QueueHashMap::default(), metric_contexts: contexts.clone(), metric_buckets: MetricBuckets::default(),