//! Utility functions for interacting with any runtime. use commonware_utils::sync::{Condvar, Mutex}; use futures::task::ArcWake; use prometheus_client::{encoding::text::encode, registry::Registry as PrometheusRegistry}; use std::{ any::Any, collections::BTreeMap, future::Future, pin::Pin, sync::Arc, task::{Context, Poll}, }; commonware_macros::stability_mod!(BETA, pub mod buffer); pub mod signal; mod handle; pub use handle::Handle; #[commonware_macros::stability(ALPHA)] pub(crate) use handle::Panicked; pub(crate) use handle::{Aborter, MetricHandle, Panicker}; mod cell; pub use cell::Cell as ContextCell; pub(crate) mod supervision; /// The execution mode of a task. #[derive(Copy, Clone, Debug)] pub enum Execution { /// Task runs on a dedicated thread. Dedicated, /// Task runs on the shared executor. `true` marks short blocking work that should /// use the runtime's blocking-friendly pool. Shared(bool), } impl Default for Execution { fn default() -> Self { Self::Shared(false) } } /// Yield control back to the runtime. pub async fn reschedule() { struct Reschedule { yielded: bool, } impl Future for Reschedule { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { if self.yielded { Poll::Ready(()) } else { self.yielded = true; cx.waker().wake_by_ref(); Poll::Pending } } } Reschedule { yielded: false }.await } fn extract_panic_message(err: &(dyn Any + Send)) -> String { err.downcast_ref::<&str>().map_or_else( || { err.downcast_ref::() .map_or_else(|| format!("{err:?}"), |s| s.clone()) }, |s| s.to_string(), ) } /// Synchronization primitive that enables a thread to block until a waker delivers a signal. pub struct Blocker { /// Tracks whether a wake-up signal has been delivered (even if wait has not started yet). state: Mutex, /// Condvar used to park and resume the thread when the signal flips to true. cv: Condvar, } impl Blocker { /// Create a new [Blocker]. pub fn new() -> Arc { Arc::new(Self { state: Mutex::new(false), cv: Condvar::new(), }) } /// Block the current thread until a waker delivers a signal. pub fn wait(&self) { // Use a loop to tolerate spurious wake-ups and only proceed once a real signal arrives. let mut signaled = self.state.lock(); while !*signaled { self.cv.wait(&mut signaled); } // Reset the flag so subsequent waits park again until the next wake signal. *signaled = false; } } impl ArcWake for Blocker { fn wake_by_ref(arc_self: &Arc) { // Mark as signaled (and release lock before notifying). { let mut signaled = arc_self.state.lock(); *signaled = true; } // Notify a single waiter so the blocked thread re-checks the flag. arc_self.cv.notify_one(); } } #[cfg(any(test, feature = "test-utils"))] /// Count the number of running tasks whose name starts with the given prefix. /// /// This function encodes metrics and counts tasks that are currently running /// (have a value of 1) and whose name starts with the specified prefix. /// /// This is useful for verifying that all child tasks under a given label hierarchy /// have been properly shut down. /// /// # Example /// /// ```rust /// use commonware_runtime::{Clock, Metrics, Runner, Spawner, deterministic}; /// use commonware_runtime::utils::count_running_tasks; /// use std::time::Duration; /// /// let executor = deterministic::Runner::default(); /// executor.start(|context| async move { /// // Spawn a task under a labeled context /// let handle = context.with_label("worker").spawn(|ctx| async move { /// ctx.sleep(Duration::from_secs(100)).await; /// }); /// /// // Allow the task to start /// context.sleep(Duration::from_millis(10)).await; /// /// // Count running tasks with "worker" prefix /// let count = count_running_tasks(&context, "worker"); /// assert!(count > 0, "worker task should be running"); /// /// // Abort the task /// handle.abort(); /// let _ = handle.await; /// context.sleep(Duration::from_millis(10)).await; /// /// // Verify task is stopped /// let count = count_running_tasks(&context, "worker"); /// assert_eq!(count, 0, "worker task should be stopped"); /// }); /// ``` pub fn count_running_tasks(metrics: &impl crate::Metrics, prefix: &str) -> usize { let encoded = metrics.encode(); encoded .lines() .filter(|line| { line.starts_with("runtime_tasks_running{") && line.contains("kind=\"Task\"") && line.trim_end().ends_with(" 1") && line .split("name=\"") .nth(1) .is_some_and(|s| s.split('"').next().unwrap_or("").starts_with(prefix)) }) .count() } /// Validates that a label matches Prometheus metric name format: `[a-zA-Z][a-zA-Z0-9_]*`. /// /// # Panics /// /// Panics if the label is empty, starts with a non-alphabetic character, /// or contains characters other than `[a-zA-Z0-9_]`. pub fn validate_label(label: &str) { let mut chars = label.chars(); assert!( chars.next().is_some_and(|c| c.is_ascii_alphabetic()), "label must start with [a-zA-Z]: {label}" ); assert!( chars.all(|c| c.is_ascii_alphanumeric() || c == '_'), "label must only contain [a-zA-Z0-9_]: {label}" ); } /// Add an attribute to a sorted attribute list, maintaining sorted order via binary search. /// /// Returns `true` if the key was new, `false` if it was a duplicate (value overwritten). pub fn add_attribute( attributes: &mut Vec<(String, String)>, key: &str, value: impl std::fmt::Display, ) -> bool { let key_string = key.to_string(); let value_string = value.to_string(); match attributes.binary_search_by(|(k, _)| k.cmp(&key_string)) { Ok(pos) => { attributes[pos].1 = value_string; false } Err(pos) => { attributes.insert(pos, (key_string, value_string)); true } } } /// A writer that groups metrics by family name and deduplicates HELP/TYPE metadata /// during Prometheus encoding. /// /// When the same metric is registered across scoped registries (via /// `Registry::encode`), prometheus_client outputs each scope's metrics /// separately, interleaving families. This writer collects all lines and /// regroups them so that every sample for a given metric name appears /// together with a single HELP/TYPE header. /// /// Also strips `# EOF` lines so that `Registry::encode` can append exactly one at /// the end of the combined output. /// /// Uses "first wins" semantics: keeps the first HELP/TYPE description encountered /// for each metric name and discards subsequent duplicates. pub struct MetricEncoder { line_buffer: String, families: BTreeMap, active_family: Option, } #[derive(Default)] struct MetricFamily { help: Option, type_line: Option, unit: Option, metric_type: Option, data: Vec, } /// OpenMetrics data lines use type-specific suffixes that differ from the /// base name in HELP/TYPE headers (e.g., a counter named `foo` emits data /// as `foo_total`). Each suffix is only valid for specific metric types. /// /// See: const TYPED_SUFFIXES: &[(&str, &[&str])] = &[ ("_total", &["counter"]), ("_bucket", &["histogram", "gaugehistogram"]), ("_count", &["histogram", "summary"]), ("_sum", &["histogram", "summary"]), ("_gcount", &["gaugehistogram"]), ("_gsum", &["gaugehistogram"]), ("_created", &["counter", "histogram", "summary"]), ("_info", &["info"]), ]; /// Returns true if `sample_name` can belong to `family_name` (either an /// exact match or a valid type-specific suffix per the OpenMetrics spec). /// /// See: fn family_accepts_sample( families: &BTreeMap, family_name: &str, sample_name: &str, ) -> bool { if sample_name == family_name { return true; } let Some(metric_type) = families .get(family_name) .and_then(|family| family.metric_type.as_deref()) else { return false; }; let Some(suffix) = sample_name.strip_prefix(family_name) else { return false; }; TYPED_SUFFIXES.iter().any(|(known_suffix, valid_types)| { suffix == *known_suffix && valid_types.contains(&metric_type) }) } /// Extract the metric name from a sample line: `sample = metricname [labels] SP number ...` /// /// See: fn extract_metric_name(line: &str) -> &str { let end = line.find(['{', ' ']).unwrap_or(line.len()); &line[..end] } impl MetricEncoder { pub const fn new() -> Self { Self { line_buffer: String::new(), families: BTreeMap::new(), active_family: None, } } pub fn into_string(mut self) -> String { if !self.line_buffer.is_empty() { self.flush_line(); } let total: usize = self .families .values() .map(|f| { f.help.as_ref().map_or(0, |h| h.len() + 1) + f.type_line.as_ref().map_or(0, |t| t.len() + 1) + f.unit.as_ref().map_or(0, |u| u.len() + 1) + f.data.iter().map(|d| d.len() + 1).sum::() }) .sum(); let mut output = String::with_capacity(total); for family in self.families.values() { if let Some(help) = &family.help { output.push_str(help); output.push('\n'); } if let Some(type_line) = &family.type_line { output.push_str(type_line); output.push('\n'); } if let Some(unit) = &family.unit { output.push_str(unit); output.push('\n'); } for data in &family.data { output.push_str(data); output.push('\n'); } } output } /// Resolve a data line's metric name to its family key, inserting a new /// family if none exists, and return a mutable reference to it. /// /// OpenMetrics appends type-specific suffixes to data lines that differ /// from the base name in HELP/TYPE headers (e.g., a counter named "votes" /// emits data as "votes_total"). This method uses the TYPE declaration to /// correctly match suffixed data lines to their family, even when another /// family with the suffixed name exists (e.g., a gauge named "votes_total"). fn resolve_data_family(&mut self, name: &str) -> &mut MetricFamily { let key = self.find_typed_family(name).unwrap_or(name); self.families.entry(key.to_string()).or_default() } /// Try to find an existing family whose TYPE declaration expects the /// suffix present in `name`. fn find_typed_family<'a>(&self, name: &'a str) -> Option<&'a str> { TYPED_SUFFIXES.iter().find_map(|(suffix, valid_types)| { let base = name.strip_suffix(suffix)?; let family = self.families.get(base)?; let t = family.metric_type.as_deref()?; valid_types.contains(&t).then_some(base) }) } fn flush_line(&mut self) { let line = std::mem::take(&mut self.line_buffer); if line == "# EOF" { self.active_family = None; return; } if let Some(rest) = line.strip_prefix("# HELP ") { let name = rest.split_whitespace().next().unwrap_or("").to_string(); let family = self.families.entry(name.clone()).or_default(); if family.help.is_none() { family.help = Some(line); } self.active_family = Some(name); } else if let Some(rest) = line.strip_prefix("# TYPE ") { let mut parts = rest.split_whitespace(); let name = parts.next().unwrap_or("").to_string(); let metric_type = parts.next().map(|s| s.to_string()); let family = self.families.entry(name.clone()).or_default(); if family.type_line.is_none() { family.type_line = Some(line); family.metric_type = metric_type; } self.active_family = Some(name); } else if let Some(rest) = line.strip_prefix("# UNIT ") { let name = rest.split_whitespace().next().unwrap_or("").to_string(); let family = self.families.entry(name.clone()).or_default(); if family.unit.is_none() { family.unit = Some(line); } self.active_family = Some(name); } else { let name = extract_metric_name(&line); if let Some(family_name) = &self.active_family { if family_accepts_sample(&self.families, family_name, name) { self.families .get_mut(family_name.as_str()) .unwrap() .data .push(line); return; } } let family = self.resolve_data_family(name); family.data.push(line); } } } impl Default for MetricEncoder { fn default() -> Self { Self::new() } } impl std::fmt::Write for MetricEncoder { fn write_str(&mut self, s: &str) -> std::fmt::Result { let mut remaining = s; while let Some(pos) = remaining.find('\n') { self.line_buffer.push_str(&remaining[..pos]); self.flush_line(); remaining = &remaining[pos + 1..]; } self.line_buffer.push_str(remaining); Ok(()) } } /// Internal handle that deregisters a metric scope when dropped. /// /// Stored inside contexts via `Arc`. When the last context clone /// holding this handle is dropped, the scope's metrics are automatically removed. pub(crate) struct ScopeGuard { scope_id: u64, cleanup: Option>, } impl ScopeGuard { pub(crate) fn new(scope_id: u64, cleanup: impl FnOnce(u64) + Send + Sync + 'static) -> Self { Self { scope_id, cleanup: Some(Box::new(cleanup)), } } pub(crate) const fn scope_id(&self) -> u64 { self.scope_id } } impl Drop for ScopeGuard { fn drop(&mut self) { if let Some(cleanup) = self.cleanup.take() { cleanup(self.scope_id); } } } /// Manages multiple prometheus registries with lifecycle-based scoping. /// /// Holds a permanent root registry for long-lived metrics (runtime internals) /// and a collection of scoped registries that can be removed when the associated /// work (e.g., an epoch's consensus engine) is done. pub(crate) struct Registry { root: PrometheusRegistry, scopes: BTreeMap, next_scope_id: u64, } impl Registry { pub fn new() -> Self { Self { root: PrometheusRegistry::default(), scopes: BTreeMap::new(), next_scope_id: 0, } } pub const fn root_mut(&mut self) -> &mut PrometheusRegistry { &mut self.root } pub fn create_scope(&mut self) -> u64 { let id = self.next_scope_id; self.next_scope_id = self.next_scope_id.checked_add(1).expect("scope overflow"); self.scopes.insert(id, PrometheusRegistry::default()); id } pub fn get_scope(&mut self, scope: Option) -> &mut PrometheusRegistry { match scope { None => &mut self.root, Some(id) => self .scopes .get_mut(&id) .unwrap_or_else(|| panic!("scope {id} not found (already deregistered?)")), } } pub fn remove_scope(&mut self, id: u64) { self.scopes.remove(&id); } pub fn encode(&self) -> String { let mut encoder = MetricEncoder::new(); encode(&mut encoder, &self.root).expect("encoding root failed"); for registry in self.scopes.values() { encode(&mut encoder, registry).expect("encoding scope failed"); } let mut output = encoder.into_string(); output.push_str("# EOF\n"); output } } #[cfg(test)] mod tests { use super::*; use crate::{deterministic, Metrics, Runner}; use commonware_macros::test_traced; use futures::task::waker; use prometheus_client::metrics::counter::Counter; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; fn encode_dedup(input: &str) -> String { use std::fmt::Write; let mut encoder = MetricEncoder::new(); encoder.write_str(input).unwrap(); encoder.into_string() } #[test] fn test_metric_encoder_empty() { assert_eq!(encode_dedup(""), ""); assert_eq!(encode_dedup("# EOF\n"), ""); } #[test] fn test_metric_encoder_no_duplicates() { let input = r#"# HELP foo_total A counter. # TYPE foo_total counter foo_total 1 # HELP bar_gauge A gauge. # TYPE bar_gauge gauge bar_gauge 42 # EOF "#; let expected = r#"# HELP bar_gauge A gauge. # TYPE bar_gauge gauge bar_gauge 42 # HELP foo_total A counter. # TYPE foo_total counter foo_total 1 "#; assert_eq!(encode_dedup(input), expected); } #[test] fn test_metric_encoder_with_duplicates() { let input = r#"# HELP votes_total vote count. # TYPE votes_total counter votes_total{epoch="e5"} 1 # HELP votes_total vote count. # TYPE votes_total counter votes_total{epoch="e6"} 2 # EOF "#; let expected = r#"# HELP votes_total vote count. # TYPE votes_total counter votes_total{epoch="e5"} 1 votes_total{epoch="e6"} 2 "#; assert_eq!(encode_dedup(input), expected); } #[test] fn test_metric_encoder_multiple_metrics() { let input = r#"# HELP a_total First. # TYPE a_total counter a_total{tag="x"} 1 # HELP b_total Second. # TYPE b_total counter b_total 5 # HELP a_total First. # TYPE a_total counter a_total{tag="y"} 2 # EOF "#; let expected = r#"# HELP a_total First. # TYPE a_total counter a_total{tag="x"} 1 a_total{tag="y"} 2 # HELP b_total Second. # TYPE b_total counter b_total 5 "#; assert_eq!(encode_dedup(input), expected); } #[test] fn test_metric_encoder_groups_by_name() { let input = r#"# HELP a_total First. # TYPE a_total counter a_total{tag="x"} 1 # HELP b_total Second. # TYPE b_total counter b_total 5 # HELP a_total First. # TYPE a_total counter a_total{tag="y"} 2 # EOF "#; let expected = r#"# HELP a_total First. # TYPE a_total counter a_total{tag="x"} 1 a_total{tag="y"} 2 # HELP b_total Second. # TYPE b_total counter b_total 5 "#; assert_eq!(encode_dedup(input), expected); } #[test] fn test_metric_encoder_deterministic_order() { let input = r#"# HELP z First alphabetically last. # TYPE z counter z_total 1 # HELP a Last alphabetically first. # TYPE a counter a_total 2 # EOF "#; let expected = r#"# HELP a Last alphabetically first. # TYPE a counter a_total 2 # HELP z First alphabetically last. # TYPE z counter z_total 1 "#; assert_eq!(encode_dedup(input), expected); } #[test] fn test_metric_encoder_counter_suffix_grouping() { // prometheus_client uses the base name for HELP/TYPE (e.g., "ab_votes") // but appends "_total" to the data line (e.g., "ab_votes_total"). // A metric whose name sorts between these (e.g., "ab_votes_size") // must not split the family. let input = r#"# HELP ab_votes vote count. # TYPE ab_votes counter ab_votes_total{epoch="1"} 1 # HELP ab_votes_size size gauge. # TYPE ab_votes_size gauge ab_votes_size 99 # HELP ab_votes vote count. # TYPE ab_votes counter ab_votes_total{epoch="2"} 2 # EOF "#; let expected = r#"# HELP ab_votes vote count. # TYPE ab_votes counter ab_votes_total{epoch="1"} 1 ab_votes_total{epoch="2"} 2 # HELP ab_votes_size size gauge. # TYPE ab_votes_size gauge ab_votes_size 99 "#; assert_eq!(encode_dedup(input), expected); } #[test] fn test_metric_encoder_type_aware_suffix() { // A gauge named "foo_total" and a counter named "foo" both produce // data lines called "foo_total". The encoder must use the TYPE info // to route each data line to the correct family. let input = r#"# HELP foo_total A gauge. # TYPE foo_total gauge foo_total 42 # HELP foo A counter. # TYPE foo counter foo_total 1 # EOF "#; let expected = r#"# HELP foo A counter. # TYPE foo counter foo_total 1 # HELP foo_total A gauge. # TYPE foo_total gauge foo_total 42 "#; assert_eq!(encode_dedup(input), expected); } #[test] fn test_metric_encoder_literal_suffix_family_not_hijacked() { // A family may legitimately end with a reserved OpenMetrics suffix. // The encoder must not always remap "foo_created" to base family "foo". let input = r#"# HELP foo A counter. # TYPE foo counter foo_total 1 # HELP foo_created A gauge. # TYPE foo_created gauge foo_created 42 # EOF "#; let expected = r#"# HELP foo A counter. # TYPE foo counter foo_total 1 # HELP foo_created A gauge. # TYPE foo_created gauge foo_created 42 "#; assert_eq!(encode_dedup(input), expected); } #[test] fn test_metric_encoder_type_aware_suffix_interleaved_segments() { // Two families may emit lines with the same sample name (`foo_total`): // a counter named `foo` and a gauge named `foo_total`. // // Repeated counter descriptors (as emitted by separate scoped registries) // must keep all counter samples in family `foo` and not leak them into // family `foo_total`. let input = r#"# HELP foo Counter. # TYPE foo counter foo_total{scope="a"} 1 # HELP foo_total Gauge. # TYPE foo_total gauge foo_total 42 # HELP foo Counter. # TYPE foo counter foo_total{scope="b"} 2 # EOF "#; let expected = r#"# HELP foo Counter. # TYPE foo counter foo_total{scope="a"} 1 foo_total{scope="b"} 2 # HELP foo_total Gauge. # TYPE foo_total gauge foo_total 42 "#; assert_eq!(encode_dedup(input), expected); } #[test] fn test_metric_encoder_unit_metadata_is_grouped() { let input = r#"# HELP latency Latency histogram. # TYPE latency histogram # UNIT latency seconds latency_sum 1.2 latency_count 3 # HELP requests Requests. # TYPE requests counter requests_total 9 # EOF "#; let expected = r#"# HELP latency Latency histogram. # TYPE latency histogram # UNIT latency seconds latency_sum 1.2 latency_count 3 # HELP requests Requests. # TYPE requests counter requests_total 9 "#; assert_eq!(encode_dedup(input), expected); } #[test] fn test_metric_encoder_unit_metadata_deduped_across_segments() { let input = r#"# HELP req Requests. # TYPE req counter # UNIT req requests req_total{scope="a"} 1 # HELP req Requests. # TYPE req counter # UNIT req requests req_total{scope="b"} 2 # EOF "#; let expected = r#"# HELP req Requests. # TYPE req counter # UNIT req requests req_total{scope="a"} 1 req_total{scope="b"} 2 "#; assert_eq!(encode_dedup(input), expected); } #[test] fn test_metric_encoder_fallback_uses_typed_suffix_even_if_literal_exists() { // Regression test for a buggy fallback that preferred `contains_key(name)` // before typed-suffix mapping. // // Here `foo_total` exists as its own counter family, but a sample named // `foo_total` can only come from counter family `foo` (because // `foo_total` counter samples are `foo_total_total`). // // We force fallback mode by ending descriptor groups with `# EOF`, which // clears `active_family`. let input = r#"# HELP foo_total Counter with literal suffix. # TYPE foo_total counter foo_total_total 9 # EOF # HELP foo Base counter. # TYPE foo counter # EOF foo_total{scope="x"} 1 # EOF "#; let expected = r#"# HELP foo Base counter. # TYPE foo counter foo_total{scope="x"} 1 # HELP foo_total Counter with literal suffix. # TYPE foo_total counter foo_total_total 9 "#; assert_eq!(encode_dedup(input), expected); } #[test] fn test_metric_encoder_strips_intermediate_eof() { let input = r#"# HELP a_total Root. # TYPE a_total counter a_total 1 # EOF # HELP b_total Scoped. # TYPE b_total counter b_total 2 # EOF "#; let expected = r#"# HELP a_total Root. # TYPE a_total counter a_total 1 # HELP b_total Scoped. # TYPE b_total counter b_total 2 "#; assert_eq!(encode_dedup(input), expected); } #[test] fn test_blocker_waits_until_wake() { let blocker = Blocker::new(); let started = Arc::new(AtomicBool::new(false)); let completed = Arc::new(AtomicBool::new(false)); let thread_blocker = blocker.clone(); let thread_started = started.clone(); let thread_completed = completed.clone(); let handle = std::thread::spawn(move || { thread_started.store(true, Ordering::SeqCst); thread_blocker.wait(); thread_completed.store(true, Ordering::SeqCst); }); while !started.load(Ordering::SeqCst) { std::thread::yield_now(); } assert!(!completed.load(Ordering::SeqCst)); waker(blocker).wake(); handle.join().unwrap(); assert!(completed.load(Ordering::SeqCst)); } #[test] fn test_blocker_handles_pre_wake() { let blocker = Blocker::new(); waker(blocker.clone()).wake(); let completed = Arc::new(AtomicBool::new(false)); let thread_blocker = blocker; let thread_completed = completed.clone(); std::thread::spawn(move || { thread_blocker.wait(); thread_completed.store(true, Ordering::SeqCst); }) .join() .unwrap(); assert!(completed.load(Ordering::SeqCst)); } #[test] fn test_blocker_reusable_across_signals() { let blocker = Blocker::new(); let completed = Arc::new(AtomicUsize::new(0)); let thread_blocker = blocker.clone(); let thread_completed = completed.clone(); let handle = std::thread::spawn(move || { for _ in 0..2 { thread_blocker.wait(); thread_completed.fetch_add(1, Ordering::SeqCst); } }); for expected in 1..=2 { waker(blocker.clone()).wake(); while completed.load(Ordering::SeqCst) < expected { std::thread::yield_now(); } } handle.join().unwrap(); assert_eq!(completed.load(Ordering::SeqCst), 2); } #[test_traced] fn test_count_running_tasks() { use crate::{Metrics, Runner, Spawner}; use futures::future; let executor = deterministic::Runner::default(); executor.start(|context| async move { // Initially no tasks with "worker" prefix assert_eq!( count_running_tasks(&context, "worker"), 0, "no worker tasks initially" ); // Spawn a task under a labeled context that stays running let worker_ctx = context.with_label("worker"); let handle1 = worker_ctx.clone().spawn(|_| async move { future::pending::<()>().await; }); // Count running tasks with "worker" prefix let count = count_running_tasks(&context, "worker"); assert_eq!(count, 1, "worker task should be running"); // Non-matching prefix should return 0 assert_eq!( count_running_tasks(&context, "other"), 0, "no tasks with 'other' prefix" ); // Spawn a nested task (worker_child) let handle2 = worker_ctx.with_label("child").spawn(|_| async move { future::pending::<()>().await; }); // Count should include both parent and nested tasks let count = count_running_tasks(&context, "worker"); assert_eq!(count, 2, "both worker and worker_child should be counted"); // Abort parent task handle1.abort(); let _ = handle1.await; // Only nested task remains let count = count_running_tasks(&context, "worker"); assert_eq!(count, 1, "only worker_child should remain"); // Abort nested task handle2.abort(); let _ = handle2.await; // All tasks stopped assert_eq!( count_running_tasks(&context, "worker"), 0, "all worker tasks should be stopped" ); }); } #[test_traced] fn test_no_duplicate_metrics() { let executor = deterministic::Runner::default(); executor.start(|context| async move { // Register metrics under different labels (no duplicates) let c1 = Counter::::default(); context.with_label("a").register("test", "help", c1); let c2 = Counter::::default(); context.with_label("b").register("test", "help", c2); }); // Test passes if runtime doesn't panic on shutdown } #[test] #[should_panic(expected = "duplicate metric:")] fn test_duplicate_metrics_panics() { let executor = deterministic::Runner::default(); executor.start(|context| async move { // Register metrics with the same label, causing duplicates let c1 = Counter::::default(); context.with_label("a").register("test", "help", c1); let c2 = Counter::::default(); context.with_label("a").register("test", "help", c2); }); } }