//! Utility functions for metrics pub mod histogram; mod registration; pub mod status; pub(crate) mod task; /// Prefix for runtime metrics. pub(crate) const METRICS_PREFIX: &str = "runtime"; pub use commonware_runtime_macros::{EncodeLabelSet, EncodeLabelValue, EncodeStruct}; pub use prometheus_client::{ collector, encoding, encoding::{ CounterValueEncoder, DescriptorEncoder, EncodeCounterValue, EncodeExemplarTime, EncodeExemplarValue, EncodeGaugeValue, EncodeLabel, EncodeLabelKey, EncodeLabelSet as EncodeLabelSetTrait, EncodeLabelValue as EncodeLabelValueTrait, EncodeMetric, ExemplarValueEncoder, GaugeValueEncoder, LabelEncoder, LabelKeyEncoder, LabelSetEncoder, LabelValueEncoder, MetricEncoder, NoLabelSet, }, metrics::{MetricType, TypedMetric}, registry, registry::Metric, }; /// Underlying Prometheus metric types. Used when constructing a metric /// to pass to [`crate::Metrics::register`]. pub mod raw { pub use prometheus_client::metrics::{ counter::Counter, family::{self, Family}, gauge::Gauge, histogram::Histogram, }; } use commonware_utils::sync::Mutex; use prometheus_client::encoding::{ text::{encode, encode_eof}, MetricEncoder as PromMetricEncoder, }; pub use registration::Registration; use std::{ any::Any, borrow::Cow, collections::{BTreeMap, HashMap}, ops::Deref, sync::{atomic::Ordering, Arc, Weak}, }; /// Native integer width used by [`raw::Gauge`] on this target. /// /// `i64` on platforms with 64-bit atomics, `i32` otherwise. Matches /// `prometheus_client::metrics::gauge::Gauge`'s backing type. #[cfg(target_has_atomic = "64")] pub type GaugeValue = i64; #[cfg(not(target_has_atomic = "64"))] pub type GaugeValue = i32; /// A registered counter metric. pub type Counter = Registered; /// A registered gauge metric. pub type Gauge = Registered; /// A registered histogram metric. pub type Histogram = Registered; /// A registered family of counters keyed by `L`. pub type CounterFamily = Registered>; /// A registered family of gauges keyed by `L`. pub type GaugeFamily = Registered>; /// Convenience methods for Prometheus gauges. pub trait GaugeExt { /// Set a gauge from a lossless integer conversion. fn try_set>(&self, value: T) -> Result; /// Atomically raise a gauge to at least the provided value. fn try_set_max + Copy>(&self, value: T) -> Result; } impl GaugeExt for raw::Gauge { fn try_set>(&self, value: T) -> Result { let value = value.try_into()?; Ok(self.set(value)) } fn try_set_max + Copy>(&self, value: T) -> Result { let value = value.try_into()?; Ok(self.inner().fetch_max(value, Ordering::Relaxed)) } } pub use histogram::HistogramExt; /// One-line constructors for the common metric types. pub trait MetricsExt: crate::Metrics { /// Register a counter with the runtime. fn counter, H: Into>(&self, name: N, help: H) -> Counter { self.register(name, help, raw::Counter::default()) } /// Register a gauge with the runtime. fn gauge, H: Into>(&self, name: N, help: H) -> Gauge { self.register(name, help, raw::Gauge::default()) } /// Register a histogram with the runtime. fn histogram, H: Into, I>( &self, name: N, help: H, buckets: I, ) -> Histogram where I: IntoIterator, { self.register(name, help, raw::Histogram::new(buckets)) } /// Register a metric family with the runtime. fn family(&self, name: N, help: H) -> Registered> where N: Into, H: Into, S: Clone + std::hash::Hash + Eq, M: Default, raw::Family: Metric, { self.register(name, help, raw::Family::::default()) } } impl MetricsExt for T {} /// Validates that a label matches Prometheus metric name format: `[a-zA-Z][a-zA-Z0-9_]*`. /// /// # Panics /// /// Panics if the label is empty, starts with a non-alphabetic character, /// or contains characters other than `[a-zA-Z0-9_]`. pub fn validate_label(label: &str) { let mut chars = label.chars(); assert!( chars.next().is_some_and(|c| c.is_ascii_alphabetic()), "label must start with [a-zA-Z]: {label}" ); assert!( chars.all(|c| c.is_ascii_alphanumeric() || c == '_'), "label must only contain [a-zA-Z0-9_]: {label}" ); } /// Add an attribute to a sorted attribute list, maintaining sorted order via binary search. /// /// Returns `true` if the key was new, `false` if it was a duplicate (value overwritten). pub fn add_attribute( attributes: &mut Vec<(String, String)>, key: &str, value: impl std::fmt::Display, ) -> bool { let key_string = key.to_string(); let value_string = value.to_string(); match attributes.binary_search_by(|(k, _)| k.cmp(&key_string)) { Ok(pos) => { attributes[pos].1 = value_string; false } Err(pos) => { attributes.insert(pos, (key_string, value_string)); true } } } #[cfg(any(test, feature = "test-utils"))] fn matches_metric_name(full: &str, name: &str) -> bool { full == name || full .strip_suffix(name) .is_some_and(|prefix| prefix.ends_with('_')) } /// Return `true` if encoded Prometheus metrics contain a sample with `name` and `value`. /// /// `name` may be either the full encoded metric name or its unprefixed suffix. /// Labels attached to the sample are ignored. #[cfg(any(test, feature = "test-utils"))] #[must_use] pub fn has_metric_value(metrics: &str, name: &str, value: impl std::fmt::Display) -> bool { let value = value.to_string(); metrics.lines().any(|line| { let line = line.trim(); if line.starts_with('#') { return false; } let Some(sample_end) = line.find(|c: char| c == '{' || c.is_whitespace()) else { return false; }; let sample_name = &line[..sample_end]; if !matches_metric_name(sample_name, name) { return false; } let mut rest = &line[sample_end..]; if let Some(labeled) = rest.strip_prefix('{') { let Some(labels_end) = labeled.find('}') else { return false; }; rest = &labeled[labels_end + 1..]; } if !rest.chars().next().is_some_and(char::is_whitespace) { return false; } rest.split_whitespace().next() == Some(value.as_str()) }) } /// Count the number of running tasks whose name starts with the given prefix. /// /// This function encodes metrics and counts tasks that are currently running /// (have a value of 1) and whose name starts with the specified prefix. /// /// This is useful for verifying that all child tasks under a given label hierarchy /// have been properly shut down. /// /// # Example /// /// ```rust /// use commonware_runtime::{ /// deterministic, telemetry::metrics::count_running_tasks, Clock, Metrics, Runner, Spawner, /// Supervisor, /// }; /// use std::time::Duration; /// /// let executor = deterministic::Runner::default(); /// executor.start(|context| async move { /// // Spawn a task under a labeled context /// let handle = context.child("worker").spawn(|ctx| async move { /// ctx.sleep(Duration::from_secs(100)).await; /// }); /// /// // Allow the task to start /// context.sleep(Duration::from_millis(10)).await; /// /// // Count running tasks with "worker" prefix /// let count = count_running_tasks(&context, "worker"); /// assert!(count > 0, "worker task should be running"); /// /// // Abort the task /// handle.abort(); /// let _ = handle.await; /// context.sleep(Duration::from_millis(10)).await; /// /// // Verify task is stopped /// let count = count_running_tasks(&context, "worker"); /// assert_eq!(count, 0, "worker task should be stopped"); /// }); /// ``` #[cfg(any(test, feature = "test-utils"))] pub fn count_running_tasks(metrics: &impl crate::Metrics, prefix: &str) -> usize { let encoded = metrics.encode(); encoded .lines() .filter_map(|line| { if !line.starts_with("runtime_tasks_running{") || !line.contains("kind=\"Task\"") { return None; } let name = line.split("name=\"").nth(1)?.split('"').next()?; if !name.starts_with(prefix) { return None; } line.trim_end().rsplit(' ').next()?.parse::().ok() }) .sum() } // Adaptation of client_rust's internal descriptor encoder. // // Source: // https://github.com/prometheus/client_rust/blob/4a6d40a55443d5b18f5be311d246c03e56f417d6/src/encoding/text.rs#L218-L275 fn encode_descriptor( writer: &mut W, name: &str, help: &str, metric_type: MetricType, ) -> Result<(), std::fmt::Error> where W: std::fmt::Write, { writer.write_str("# HELP ")?; writer.write_str(name)?; writer.write_str(" ")?; writer.write_str(help)?; writer.write_str("\n# TYPE ")?; writer.write_str(name)?; writer.write_str(" ")?; writer.write_str(metric_type.as_str())?; writer.write_str("\n")?; Ok(()) } /// Join a metric or label prefix with a child name using Prometheus' `_` separator. pub(crate) fn prefixed_name(prefix: &str, name: &str) -> String { if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") } } /// Build a child context label by appending `label` to `prefix`, asserting that /// `label` is valid and does not shadow the reserved runtime metric prefix. pub(crate) fn child_label(prefix: &str, label: &str) -> String { validate_label(label); let name = prefixed_name(prefix, label); assert!( !name.starts_with(METRICS_PREFIX), "using runtime label is not allowed" ); name } struct RegistryGuard { id: usize, registry: Weak>, } impl Drop for RegistryGuard { fn drop(&mut self) { let Some(registry) = self.registry.upgrade() else { return; }; registry.lock().release_registration(self.id); } } /// A metric handle whose lifetime controls registry exposure and attached cleanup. #[must_use = "registered metrics are removed when the returned handle is dropped"] pub struct Registered { metric: Arc, registration: Registration, } impl Clone for Registered { fn clone(&self) -> Self { Self { metric: self.metric.clone(), registration: self.registration.clone(), } } } impl Registered { /// Create a metric handle with an explicit lifecycle registration. /// /// The provided [`Registration`] controls what happens when the last clone /// of this handle is dropped. Use [`Registration::from`] with `()` for a /// raw handle that is not exposed by a runtime registry. pub fn with_registration(metric: M, registration: Registration) -> Self { Self { metric: Arc::new(metric), registration, } } pub fn metric(&self) -> &M { self.metric.as_ref() } } impl Registered> where S: Clone + std::hash::Hash + Eq, C: raw::family::MetricConstructor, { pub fn get_by(&self, label_set: &Q) -> Option + '_> where for<'a> S: From<&'a Q>, { let label_set = S::from(label_set); self.get(&label_set) } pub fn get_or_create_by(&self, label_set: &Q) -> impl Deref + '_ where for<'a> S: From<&'a Q>, { let label_set = S::from(label_set); self.get_or_create(&label_set) } pub fn remove_by(&self, label_set: &Q) -> bool where for<'a> S: From<&'a Q>, { let label_set = S::from(label_set); self.remove(&label_set) } } impl Deref for Registered { type Target = M; fn deref(&self) -> &Self::Target { self.metric() } } impl std::fmt::Debug for Registered { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Registered") .field("metric", self.metric()) .finish_non_exhaustive() } } type MetricAttributes = Vec<(Cow<'static, str>, Cow<'static, str>)>; type MetricKey = (String, MetricAttributes); type SampleEncoder = dyn Fn(&mut String) -> Result<(), std::fmt::Error> + Send + Sync; struct PendingMetricEntry { family_name: String, attributes: MetricAttributes, encode_samples: Box, metric_any: Arc, } pub(crate) struct SharedMetric(pub(crate) Arc); impl std::fmt::Debug for SharedMetric { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { self.0.fmt(f) } } impl EncodeMetric for SharedMetric { fn encode(&self, encoder: PromMetricEncoder<'_>) -> Result<(), std::fmt::Error> { self.0.encode(encoder) } fn metric_type(&self) -> MetricType { self.0.metric_type() } } fn create_sample_encoder( name: String, labels: MetricAttributes, metric: Arc, ) -> Box where M: Metric, { // TODO (#3659): Avoid allocating an upstream registry per metric once // `prometheus-client` exposes a public sample-only `MetricEncoder` path // for encoding one metric with const labels. let mut registry = registry::Registry::with_labels(labels.into_iter()); registry.register(name, "", SharedMetric(metric)); Box::new(move |samples| { let mut encoded = String::new(); encode(&mut encoded, ®istry).expect("encoding temporary metric registry failed"); for line in encoded.lines() { if line.starts_with('#') { continue; } samples.push_str(line); samples.push('\n'); } Ok(()) }) } fn owned_attributes(attributes: Vec<(String, String)>) -> MetricAttributes { attributes .into_iter() .map(|(k, v)| (Cow::Owned(k), Cow::Owned(v))) .collect() } // Match upstream prometheus-client's `Descriptor::new` normalization. // // Source: // https://github.com/prometheus/client_rust/blob/4a6d40a55443d5b18f5be311d246c03e56f417d6/src/registry.rs#L340-L348 fn normalize_help(help: String) -> String { help + "." } struct MetricEntry { family_name: String, attributes: MetricAttributes, encode_samples: Box, metric_any: Arc, claims: usize, family_index: usize, } #[derive(Debug)] struct MetricFamily { help: String, metric_type: MetricType, descriptor: String, metric_ids: Vec, } /// Manages metrics with explicit lifetimes. #[derive(Clone)] pub struct Registry { inner: Arc>, } struct RegistryInner { /// Dense metric storage indexed by stable metric id. metrics: Vec>, /// Metric ids that can be reused after a metric is fully unregistered. free_metric_ids: Vec, /// Metric families keyed by family name, kept sorted for deterministic encoding. families: BTreeMap, /// Exact metric keys for duplicate registration detection. keys: HashMap, /// Monotonic id source used when there is no reusable metric slot. next_metric_id: usize, } impl Default for Registry { fn default() -> Self { Self::new() } } impl Registry { pub fn new() -> Self { Self { inner: Arc::new(Mutex::new(RegistryInner::new())), } } pub(crate) fn register( &self, name: String, help: String, attributes: Vec<(String, String)>, metric: Arc, ) -> Registered where M: Metric, { let mut inner = self.inner.lock(); inner.register(Arc::downgrade(&self.inner), name, help, attributes, metric) } pub fn encode(&self) -> String { self.inner.lock().encode() } } impl RegistryInner { fn new() -> Self { Self { metrics: Vec::new(), free_metric_ids: Vec::new(), families: BTreeMap::new(), keys: HashMap::new(), next_metric_id: 0, } } fn register( &mut self, registry: Weak>, name: String, help: String, attributes: Vec<(String, String)>, metric: Arc, ) -> Registered where M: Metric, { let attributes = owned_attributes(attributes); let help = normalize_help(help); let metric_type = metric.metric_type(); let encode_samples = create_sample_encoder(name.clone(), attributes.clone(), metric.clone()); let key = (name.clone(), attributes.clone()); if let Some(existing_id) = self.keys.get(&key).copied() { let entry = self.metric_ref(existing_id); if let Some(family) = self.families.get(&name) { assert_eq!( family.help, help, "metric family `{}` registered with inconsistent help text", name ); } let existing_metric = Arc::clone(&entry.metric_any) .downcast::() .unwrap_or_else(|_| { panic!( "duplicate metric `{}` with attributes {:?} registered with different type", key.0, key.1 ) }); self.claim_registration(existing_id); return Registered { metric: existing_metric, registration: Registration::from(RegistryGuard { id: existing_id, registry, }), }; } self.assert_family_matches(&name, &help, metric_type); let id = self.allocate_metric_id(); let registration = Registration::from(RegistryGuard { id, registry }); let metric_any: Arc = metric.clone(); self.insert_metric_entry( id, help, metric_type, PendingMetricEntry { family_name: name, attributes, encode_samples, metric_any, }, ); Registered { metric, registration, } } fn metric_slot_mut(&mut self, id: usize) -> &mut Option { if id == self.metrics.len() { self.metrics.push(None); } &mut self.metrics[id] } fn metric_ref(&self, id: usize) -> &MetricEntry { self.metrics .get(id) .and_then(Option::as_ref) .expect("metric id missing from registry") } fn metric_mut(&mut self, id: usize) -> &mut MetricEntry { self.metrics .get_mut(id) .and_then(Option::as_mut) .expect("metric id missing from registry") } fn allocate_metric_id(&mut self) -> usize { if let Some(id) = self.free_metric_ids.pop() { return id; } let id = self.next_metric_id; self.next_metric_id = self .next_metric_id .checked_add(1) .expect("metric id overflow"); id } fn assert_family_matches(&self, name: &str, help: &str, metric_type: MetricType) { if let Some(family) = self.families.get(name) { assert_eq!( family.help, help, "metric family `{}` registered with inconsistent help text", name ); assert_eq!( family.metric_type.as_str(), metric_type.as_str(), "metric family `{}` registered with inconsistent metric type", name ); } } fn insert_metric_entry( &mut self, id: usize, help: String, metric_type: MetricType, entry: PendingMetricEntry, ) { let PendingMetricEntry { family_name, attributes, encode_samples, metric_any, } = entry; self.keys .insert((family_name.clone(), attributes.clone()), id); let family = match self.families.entry(family_name.clone()) { std::collections::btree_map::Entry::Occupied(entry) => entry.into_mut(), std::collections::btree_map::Entry::Vacant(entry) => { let mut descriptor = String::new(); encode_descriptor(&mut descriptor, &family_name, &help, metric_type) .expect("encoding cached descriptor failed"); entry.insert(MetricFamily { help, metric_type, descriptor, metric_ids: Vec::new(), }) } }; let family_index = family.metric_ids.len(); family.metric_ids.push(id); self.metric_slot_mut(id).replace(MetricEntry { family_name, attributes, encode_samples, metric_any, claims: 1, family_index, }); } fn claim_registration(&mut self, id: usize) { let entry = self.metric_mut(id); entry.claims = entry .claims .checked_add(1) .expect("registration claims overflow"); } fn release_registration(&mut self, id: usize) { let entry = self.metric_mut(id); entry.claims = entry .claims .checked_sub(1) .expect("registration claim count underflow"); if entry.claims > 0 { return; } self.drop_metric_entry(id); } fn drop_metric_entry(&mut self, id: usize) { let metric = self .metrics .get_mut(id) .and_then(Option::take) .expect("metric id missing from registry"); let MetricEntry { family_name, attributes, family_index, .. } = metric; let key = (family_name, attributes); if self.keys.get(&key).copied() == Some(id) { self.keys.remove(&key); } let (family_name, _) = key; let (swapped_metric_id, remove_family) = { let family = self .families .get_mut(&family_name) .expect("family missing during unregister"); let removed = family.metric_ids.swap_remove(family_index); assert_eq!(removed, id, "family index mismatch during unregister"); let swapped = family.metric_ids.get(family_index).copied(); (swapped, family.metric_ids.is_empty()) }; if let Some(swapped_metric_id) = swapped_metric_id { self.metric_mut(swapped_metric_id).family_index = family_index; } if remove_family { self.families.remove(&family_name); } self.free_metric_ids.push(id); } pub fn encode(&self) -> String { let mut output = String::new(); let mut samples = String::new(); for family in self.families.values() { samples.clear(); for metric_id in &family.metric_ids { let metric = self.metric_ref(*metric_id); (metric.encode_samples)(&mut samples).expect("encoding live metric samples failed"); } // Suppress the HELP/TYPE descriptor when the family produced no // samples (e.g. a `Family` with no child entries). Matches // upstream prometheus-client's empty-metric filtering. if samples.is_empty() { continue; } output.push_str(&family.descriptor); output.push_str(&samples); } encode_eof(&mut output).expect("encoding EOF failed"); output } } pub(crate) struct Scope { registry: Registry, prefix: String, } pub(crate) trait Register { /// Register a metric under this scope's prefix. fn register(&mut self, name: &str, help: &str, metric: M) -> Registered; /// Create a child scope by appending `prefix` to the current prefix. fn sub_registry(&mut self, prefix: &str) -> Scope; } impl Register for Registry { fn register(&mut self, name: &str, help: &str, metric: M) -> Registered { validate_label(name); Self::register( self, name.to_string(), help.to_string(), Vec::new(), Arc::new(metric), ) } fn sub_registry(&mut self, prefix: &str) -> Scope { validate_label(prefix); Scope { registry: self.clone(), prefix: prefix.to_string(), } } } impl Register for Scope { fn register(&mut self, name: &str, help: &str, metric: M) -> Registered { validate_label(name); let name = prefixed_name(&self.prefix, name); let help = help.to_string(); let metric = Arc::new(metric); Registry::register(&self.registry, name, help, Vec::new(), metric) } fn sub_registry(&mut self, prefix: &str) -> Scope { validate_label(prefix); Self { registry: self.registry.clone(), prefix: prefixed_name(&self.prefix, prefix), } } } #[cfg(test)] mod tests { use super::*; use crate::{deterministic, Metrics as _, Runner, Spawner, Supervisor as _}; use commonware_macros::test_traced; use futures::future; use std::sync::mpsc::{self, TryRecvError}; #[test] fn test_has_metric_value_unlabeled() { let metrics = "# HELP storage_items_tracked items\nstorage_items_tracked 2\n"; assert!(has_metric_value(metrics, "items_tracked", 2)); assert!(has_metric_value(metrics, "storage_items_tracked", 2)); assert!(!has_metric_value(metrics, "items_tracked_extra", 2)); assert!(!has_metric_value(metrics, "items_tracked", 3)); } #[test] fn test_has_metric_value_labeled() { let metrics = r#"storage_init_items_tracked{index="2"} 2"#; assert!(has_metric_value(metrics, "items_tracked", 2)); assert!(has_metric_value(metrics, "storage_init_items_tracked", 2)); } #[test_traced] fn test_count_running_tasks() { let executor = deterministic::Runner::default(); executor.start(|context| async move { // Initially no tasks with "worker" prefix assert_eq!( count_running_tasks(&context, "worker"), 0, "no worker tasks initially" ); // Spawn a task under a labeled context that stays running let handle1 = context.child("worker").spawn(|_| async move { future::pending::<()>().await; }); // Count running tasks with "worker" prefix let count = count_running_tasks(&context, "worker"); assert_eq!(count, 1, "worker task should be running"); // Non-matching prefix should return 0 assert_eq!( count_running_tasks(&context, "other"), 0, "no tasks with 'other' prefix" ); // Spawn a nested task (worker_child) let handle2 = context .child("worker") .child("child") .spawn(|_| async move { future::pending::<()>().await; }); // Count should include both parent and nested tasks let count = count_running_tasks(&context, "worker"); assert_eq!(count, 2, "both worker and worker_child should be counted"); // Abort parent task handle1.abort(); let _ = handle1.await; // Only nested task remains let count = count_running_tasks(&context, "worker"); assert_eq!(count, 1, "only worker_child should remain"); // Abort nested task handle2.abort(); let _ = handle2.await; // All tasks stopped assert_eq!( count_running_tasks(&context, "worker"), 0, "all worker tasks should be stopped" ); }); } #[test_traced] fn test_no_duplicate_metrics() { let executor = deterministic::Runner::default(); executor.start(|context| async move { // Register metrics under different labels (no duplicates) let c1 = raw::Counter::::default(); let _metric_a = context.child("a").register("test", "help", c1); let c2 = raw::Counter::::default(); let _metric_b = context.child("b").register("test", "help", c2); }); // Test passes if runtime doesn't panic on shutdown } #[test_traced] fn test_duplicate_metrics_reuse_existing_handle() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let c1 = raw::Counter::::default(); let metric_a = context.child("a").register("test", "help", c1); let c2 = raw::Counter::::default(); let metric_b = context.child("a").register("test", "help", c2); assert!(std::ptr::eq(metric_a.metric(), metric_b.metric())); metric_a.inc(); metric_b.inc_by(2); let encoded = context.encode(); assert!(encoded.contains("a_test_total 3")); }); } #[test] fn test_claims_track_register_calls_not_handle_clones() { let registry = Registry::new(); let key: MetricKey = ("votes".to_string(), Vec::new()); let first = registry.register( key.0.clone(), "vote count".to_string(), Vec::new(), Arc::new(raw::Counter::::default()), ); let first_clone = first.clone(); let id = { let registry = registry.inner.lock(); let id = *registry.keys.get(&key).expect("metric key missing"); assert_eq!(registry.metric_ref(id).claims, 1); id }; let second = registry.register( key.0, "vote count".to_string(), Vec::new(), Arc::new(raw::Counter::::default()), ); let second_clone = second.clone(); { let registry = registry.inner.lock(); assert_eq!(registry.metric_ref(id).claims, 2); } drop(first); drop(second); { let registry = registry.inner.lock(); assert_eq!(registry.metric_ref(id).claims, 2); } drop(second_clone); { let registry = registry.inner.lock(); assert_eq!(registry.metric_ref(id).claims, 1); } drop(first_clone); let registry = registry.inner.lock(); assert!( registry.keys.is_empty(), "keys left behind: {:?}", registry.keys ); assert!( registry.families.is_empty(), "families left behind: {:?}", registry.families ); } #[test] #[should_panic(expected = "registered with different type")] fn test_duplicate_metrics_different_type_panics() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let counter = raw::Counter::::default(); let _metric_a = context.child("a").register("test", "help", counter); let gauge = raw::Gauge::::default(); let _metric_b = context.child("a").register("test", "help", gauge); }); } #[test] fn test_duplicate_register_acquires_during_last_drop_window() { let registry = Registry::new(); let key: MetricKey = ("votes".to_string(), Vec::new()); let original = registry.register( key.0.clone(), "vote count".to_string(), Vec::new(), Arc::new(raw::Counter::::default()), ); let original_metric = Arc::clone(&original.metric); let _original = std::mem::ManuallyDrop::new(original); let original_id = { let registry = registry.inner.lock(); *registry.keys.get(&key).expect("metric key missing") }; // Simulate the final drop after it has decided to clean up but before // it obtains the registry lock. The dropped claim is still counted in // this window. let duplicate = registry.register( key.0, "vote count".to_string(), Vec::new(), Arc::new(raw::Counter::::default()), ); assert!(Arc::ptr_eq(&original_metric, &duplicate.metric)); registry.inner.lock().release_registration(original_id); duplicate.inc_by(7); let encoded = registry.encode(); assert!( encoded.contains("votes_total 7"), "last drop removed duplicate registration: {encoded}" ); drop(duplicate); let registry = registry.inner.lock(); assert!( registry.keys.is_empty(), "keys left behind: {:?}", registry.keys ); assert!( registry.families.is_empty(), "families left behind: {:?}", registry.families ); } #[test] fn test_registered_with_registration_notifies_on_last_drop() { struct NotifyOnDrop(mpsc::Sender<&'static str>); impl Drop for NotifyOnDrop { fn drop(&mut self) { let _ = self.0.send("dropped"); } } let (tx, rx) = mpsc::channel(); let registered = Registered::with_registration( raw::Counter::::default(), Registration::from(NotifyOnDrop(tx)), ); let clone = registered.clone(); drop(registered); assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty))); drop(clone); assert_eq!(rx.recv().unwrap(), "dropped"); assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected))); } fn register_counter(registry: &Registry, name: &str, help: &str, value: u64) -> Counter { let counter = raw::Counter::::default(); counter.inc_by(value); registry.register( name.to_string(), help.to_string(), Vec::new(), Arc::new(counter), ) } #[test] fn test_encode_is_deterministic() { let registry = Registry::default(); let _beta = register_counter(®istry, "beta", "beta counter", 2); let _alpha = register_counter(®istry, "alpha", "alpha counter", 1); let first = registry.encode(); let second = registry.encode(); assert_eq!(first, second); let alpha = first .find("# TYPE alpha") .expect("alpha family header present"); let beta = first .find("# TYPE beta") .expect("beta family header present"); assert!(alpha < beta, "families emitted in sorted order: {first}"); } #[test] fn test_encode_emits_single_eof() { let registry = Registry::default(); let _a = register_counter(®istry, "a", "help", 1); let _b = register_counter(®istry, "b", "help", 2); let encoded = registry.encode(); assert_eq!(encoded.matches("# EOF").count(), 1); assert!( encoded.ends_with("# EOF\n"), "must terminate with EOF: {encoded}" ); } #[test] fn test_encode_type_aware_suffixes() { let registry = Registry::default(); let _requests = register_counter(®istry, "requests", "request count", 3); let histogram = raw::Histogram::new([0.1, 1.0, 10.0]); histogram.observe(0.5); let _histogram = registry.register( "latency".to_string(), "latency seconds".to_string(), Vec::new(), Arc::new(histogram), ); let encoded = registry.encode(); assert!( encoded.contains("requests_total 3"), "counter _total suffix: {encoded}" ); assert!( encoded.contains("latency_bucket"), "histogram _bucket suffix: {encoded}" ); assert!( encoded.contains("latency_sum"), "histogram _sum suffix: {encoded}" ); assert!( encoded.contains("latency_count"), "histogram _count suffix: {encoded}" ); } #[test] fn test_encode_shares_family_header_across_attributes() { let registry = Registry::default(); let c1 = raw::Counter::::default(); c1.inc(); let _c1 = registry.register( "votes".to_string(), "vote count".to_string(), vec![("epoch".to_string(), "1".to_string())], Arc::new(c1), ); let c2 = raw::Counter::::default(); c2.inc_by(2); let _c2 = registry.register( "votes".to_string(), "vote count".to_string(), vec![("epoch".to_string(), "2".to_string())], Arc::new(c2), ); let encoded = registry.encode(); assert_eq!( encoded.matches("# HELP votes").count(), 1, "single HELP: {encoded}" ); assert_eq!( encoded.matches("# TYPE votes").count(), 1, "single TYPE: {encoded}" ); assert!(encoded.contains("votes_total{epoch=\"1\"} 1")); assert!(encoded.contains("votes_total{epoch=\"2\"} 2")); } #[test] fn test_encode_registers_without_prefix() { let registry = Registry::default(); let _registered = register_counter(®istry, "votes", "vote count", 1); let encoded = registry.encode(); assert!( encoded.contains("votes_total 1"), "no prefix applied: {encoded}" ); assert!( encoded.starts_with("# HELP votes"), "family header at start: {encoded}" ); } #[test] fn test_encode_suppresses_empty_family() { // A Family registered with no child entries should not emit its HELP/TYPE // descriptor on scrape. This matches upstream prometheus-client's // `encode_omit_empty` behavior. let registry = Registry::default(); let empty_family = raw::Family::, raw::Counter>::default(); let _empty_family = registry.register( "votes".to_string(), "vote count".to_string(), Vec::new(), Arc::new(empty_family), ); let _ticks = register_counter(®istry, "ticks", "tick count", 1); let encoded = registry.encode(); assert!(!encoded.contains("votes"), "empty family leaked: {encoded}"); assert!( encoded.contains("ticks_total 1"), "populated metric missing: {encoded}" ); assert_eq!(encoded.matches("# EOF").count(), 1); } #[test] fn test_encode_matches_upstream_registry() { // Byte-for-byte parity between our `Registry::encode` and upstream // prometheus-client's `registry::Registry::encode` on an equivalent // metric set. Covers HELP normalization (trailing `.`), TYPE lines, // counter `_total` suffix, histogram `_bucket`/`_sum`/`_count`, and // the single final `# EOF`. Our registry emits families in sorted // order (see `test_encode_is_deterministic`); upstream preserves // registration order. Register here in sorted order so the parity // assertion only flags real format divergences. let counter = raw::Counter::::default(); counter.inc_by(7); let gauge = raw::Gauge::::default(); gauge.set(-3); let histogram = raw::Histogram::new([0.1, 1.0]); histogram.observe(0.5); let ours = Registry::default(); let _latency = ours.register( "latency".to_string(), "request latency seconds".to_string(), Vec::new(), Arc::new(histogram.clone()), ); let _level = ours.register( "level".to_string(), "current level".to_string(), Vec::new(), Arc::new(gauge.clone()), ); let _votes = ours.register( "votes".to_string(), "number of votes".to_string(), Vec::new(), Arc::new(counter.clone()), ); let ours_encoded = ours.encode(); let mut theirs = registry::Registry::default(); theirs.register("latency", "request latency seconds", histogram); theirs.register("level", "current level", gauge); theirs.register("votes", "number of votes", counter); let mut theirs_encoded = String::new(); encode(&mut theirs_encoded, &theirs).expect("upstream encode failed"); assert_eq!( ours_encoded, theirs_encoded, "output diverged from upstream prometheus-client registry" ); } #[test] fn test_shuffled_duplicate_drops_do_not_leave_registry_entries() { let registry = Registry::new(); let mut handles = Vec::new(); for _ in 0..8 { handles.push(registry.register( "votes".to_string(), "vote count".to_string(), Vec::new(), Arc::new(raw::Counter::::default()), )); } for index in [3, 0, 6, 1] { let _ = handles.swap_remove(index); handles.push(registry.register( "votes".to_string(), "vote count".to_string(), Vec::new(), Arc::new(raw::Counter::::default()), )); } drop(handles); let registry = registry.inner.lock(); assert!( registry.keys.is_empty(), "keys left behind: {:?}", registry.keys ); assert!( registry.families.is_empty(), "families left behind: {:?}", registry.families ); } }