//! Max-min fair bandwidth planner. //! //! The planner progressively fills a set of active flows to compute per-flow //! transmission rates that respect both sender egress limits and receiver ingress limits //! (to provide max-min fairness). The caller is responsible for advancing flow progress //! according to the returned rates and invoking the planner whenever the active set //! changes (for example when a message finishes or a new message arrives). use commonware_utils::{time::NANOS_PER_SEC, BigRationalExt, DurationExt}; use num_rational::BigRational; use num_traits::Zero; use std::{cmp::Ordering, collections::BTreeMap, time::Duration}; /// Minimal description of a simulated transmission path. /// /// `delivered == false` means the flow only exercises the sender egress path (for example, /// packets that will be dropped before they reach the recipient) so we avoid charging ingress /// capacity for it. #[derive(Clone, Debug)] pub struct Flow

{ pub id: u64, pub origin: P, pub recipient: P, pub delivered: bool, } /// Resulting per-flow throughput expressed as bytes/second. #[derive(Clone, Debug, Eq, PartialEq)] pub enum Rate { Unlimited, Finite(BigRational), } /// Shared capacity constraint (either egress or ingress) tracked by the planner. /// /// `remaining` tracks the unassigned bytes/second, `members` contains the flows that consume the /// resource, and `active` counts how many of those flows are still eligible for additional /// bandwidth in the current filling round. #[derive(Debug)] struct Resource { remaining: BigRational, members: Vec, active: usize, } impl Resource { fn new(limit: u128) -> Self { Self { remaining: BigRational::from_u128(limit), members: Vec::new(), active: 0, } } } /// Identifier used to deduplicate resource entries across flows. #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] enum Constraint

{ Egress(P), Ingress(P), } /// Tracks the constraints participating in a flow and whether it is still constrained. struct State { resources: Vec, limited: bool, active: bool, } impl State { const fn new() -> Self { Self { resources: Vec::new(), limited: false, active: false, } } } /// Allocate bandwidth for a set of flows given some set of capacity constraints. struct Planner<'a, P> { /// Caller-supplied flow metadata (immutable throughout the run). flows: &'a [Flow

], /// All constrained resources participating in this planning step. resources: Vec, /// Reverse index from `(peer, direction)` to the corresponding resource slot. indices: BTreeMap, usize>, /// Per-flow membership and bookkeeping flags used during progressive filling. states: Vec, /// Final per-flow throughput; `None` indicates an unlimited flow. rates: Vec>, /// Number of flows still taking part in the current filling round. active: usize, /// Shared fill level applied to every active flow. fill: BigRational, } impl<'a, P: Clone + Ord> Planner<'a, P> { /// Build and register resources for all flows up front. fn new(flows: &'a [Flow

], egress_cap: &mut E, ingress_cap: &mut I) -> Self where E: FnMut(&P) -> Option, I: FnMut(&P) -> Option, { let mut planner = Self { flows, resources: Vec::new(), indices: BTreeMap::new(), states: Vec::with_capacity(flows.len()), rates: vec![None; flows.len()], active: 0, fill: BigRational::zero(), }; planner.register(egress_cap, ingress_cap); planner } /// Ensure a resource entry exists, returning its index if the resource is rate-limited. /// /// Unbounded resources return `None`, allowing callers to skip any additional bookkeeping for /// flows that touch them. fn constrain(&mut self, constraint: Constraint

, limit: Option) -> Option { if let Some(index) = self.indices.get(&constraint) { return Some(*index); } let limit = limit?; let idx = self.resources.len(); self.resources.push(Resource::new(limit)); self.indices.insert(constraint, idx); Some(idx) } /// Link a flow to a resource, marking it as constrained. fn attach(&mut self, resource_idx: usize, flow_idx: usize, state: &mut State) { let resource = &mut self.resources[resource_idx]; resource.members.push(flow_idx); resource.active += 1; // We have to update both the resource and flow views so freezing can walk in either // direction without extra lookups. state.resources.push(resource_idx); state.limited = true; } /// Register each flow with the constrained resources it depends on. Resources without limits are ignored. fn register(&mut self, egress_cap: &mut E, ingress_cap: &mut I) where E: FnMut(&P) -> Option, I: FnMut(&P) -> Option, { for (flow_idx, flow) in self.flows.iter().enumerate() { let mut state = State::new(); // Register the flow with its egress resource if the sender is bandwidth-limited. if let Some(resource_idx) = self.constrain( Constraint::Egress(flow.origin.clone()), egress_cap(&flow.origin), ) { self.attach(resource_idx, flow_idx, &mut state); } // Only track ingress when the recipient actually needs to receive the bytes. if flow.delivered { // Register the flow with its ingress resource if the recipient is bandwidth-limited. if let Some(resource_idx) = self.constrain( Constraint::Ingress(flow.recipient.clone()), ingress_cap(&flow.recipient), ) { self.attach(resource_idx, flow_idx, &mut state); } } // The flow participates in progressive filling until one of its constraints saturates. if state.limited { state.active = true; self.active += 1; } self.states.push(state); } } /// Freeze all flows that rely on a saturated resource. fn freeze(&mut self, res_idx: usize) { let members = self.resources[res_idx].members.clone(); for flow_idx in members { // Finalize the rate for `flow_idx` and update every referenced resource. // // The flow's share at the moment of freezing becomes its permanent rate; afterwards we // subtract it from every referenced resource so the next progressive-filling iteration only // considers the remaining active flows. let state = &mut self.states[flow_idx]; if !state.active { continue; } // The flow's max-min allocation equals the current fill level. self.rates[flow_idx] = Some(self.fill.clone()); state.active = false; self.active -= 1; // Subtract the flow's share from other referenced resources for &other_res_idx in &state.resources { let resource = &mut self.resources[other_res_idx]; if resource.active > 0 { // Stop counting the flow toward future shares once it is frozen. resource.active -= 1; } } } } /// Run the progressive filling algorithm until every constrained flow is frozen. fn fill(&mut self) { while self.active > 0 { let mut limiting = Vec::new(); let mut min_delta: Option = None; // Step 1: among all resources still serving active flows, locate the smallest per-flow // headroom (i.e. the next constraint that will be hit if we increase every active flow // uniformly). for (res_idx, resource) in self.resources.iter().enumerate() { if resource.active == 0 { continue; } // This resource is already saturated; any flow touching it should freeze immediately. if resource.remaining.is_zero() { limiting.clear(); limiting.push(res_idx); min_delta = Some(BigRational::zero()); break; } let share = &resource.remaining / BigRational::from_usize(resource.active); match &min_delta { None => { // First candidate: provisionally treat it as the tightest constraint. min_delta = Some(share); limiting.clear(); limiting.push(res_idx); } Some(current) => match share.cmp(current) { Ordering::Less => { // Found a tougher constraint (lower headroom), so reset the limiting set. min_delta = Some(share); limiting.clear(); limiting.push(res_idx); } Ordering::Equal => limiting.push(res_idx), Ordering::Greater => { // This resource still has extra headroom relative to the current // bottleneck, so we leave it out of the limiting set and let it // keep contributing capacity in this round. } }, } } // Step 2: if the limiting resources still have headroom, advance every active flow by // `delta`. If `delta` is zero we already exhausted a resource, so we skip the advance // and immediately freeze the affected flows instead. let delta = match min_delta { Some(delta) => delta, None => { // Every active flow should have been tied to at least one limited resource. assert_eq!(self.active, 0, "active flows without constraints"); break; } }; if delta.is_zero() { for &res_idx in &limiting { self.freeze(res_idx); } continue; } // Raise the shared fill level; individual rates are materialized on freeze. self.fill += δ let mut saturated = Vec::new(); for (res_idx, resource) in self.resources.iter_mut().enumerate() { // Skip resources that are not active. if resource.active == 0 { continue; } // Charge each resource for the uniform allocation it just handed out. let usage = &delta * BigRational::from_usize(resource.active); if usage.is_zero() { continue; } // Track newly saturated resources so their flows freeze this iteration. resource.remaining -= usage; if resource.remaining.is_zero() { saturated.push(res_idx); } } saturated.extend(limiting); if saturated.is_empty() { continue; } // Step 3: freeze every flow touching the resources that just saturated so they are not // considered in the next iteration. saturated.sort(); saturated.dedup(); for res_idx in saturated { self.freeze(res_idx); } } } /// Consume the planner, finalizing the rate for every flow and returning the result map. fn solve(mut self) -> BTreeMap { // Run the progressive filling algorithm until every constrained flow is frozen. self.fill(); // Finalize the rates for every flow. let mut result = BTreeMap::new(); for (idx, flow) in self.flows.iter().enumerate() { let rate = self.rates[idx] .as_ref() .map_or(Rate::Unlimited, |ratio| Rate::Finite(ratio.clone())); result.insert(flow.id, rate); } result } #[cfg(test)] fn resources(&self) -> &[Resource] { &self.resources } #[cfg(test)] fn states(&self) -> &[State] { &self.states } #[cfg(test)] const fn active(&self) -> usize { self.active } #[cfg(test)] fn rates(&self) -> &[Option] { &self.rates } } /// Computes a fair allocation for the provided `flows`, returning per-flow rates. /// /// Each sender/receiver cap is modeled as a shared resource. Every flow registers /// with the resources it touches, after which we perform progressive filling: raise /// the rate of all unfrozen flows uniformly until a resource is depleted, freeze flows /// using that resource, and repeat. This yields a deterministic, starvation-free assignment /// that honors both ingress and egress limits. pub fn allocate( flows: &[Flow

], mut egress_cap: E, mut ingress_cap: I, ) -> BTreeMap where P: Clone + Ord, E: FnMut(&P) -> Option, I: FnMut(&P) -> Option, { // If there are no flows, there is nothing to allocate. if flows.is_empty() { return BTreeMap::new(); } // Register the flows and solve. Construction hydrates the planner with all resource // membership data, and `solve` consumes it to run progressive filling and return the final map. let planner = Planner::new(flows, &mut egress_cap, &mut ingress_cap); planner.solve() } /// Calculate the time it will take to deplete a given amount of capacity at some [Rate]. /// /// The computation rounds up so callers receive the minimum duration that guarantees at least the /// requested amount of work was transmitted. pub fn duration(rate: &Rate, remaining: &BigRational) -> Option { match rate { Rate::Unlimited => Some(Duration::ZERO), Rate::Finite(rate) => { // If the rate is zero, the transfer will never complete. if rate.is_zero() { return None; } // Find the minimum number of nanoseconds that will complete the transfer (rounding up to cover // fractional progress). let seconds = remaining / rate; let nanos = seconds * BigRational::from_u128(NANOS_PER_SEC); let ns = nanos.ceil_to_u128()?; Some(Duration::from_nanos_saturating(ns)) } } } /// Calculate the remaining work after transferring data for `elapsed` at the provided [Rate]. /// /// Feed the returned ratio back into subsequent calls to preserve fractional progress across /// discrete scheduling ticks. pub fn transfer(rate: &Rate, elapsed: Duration, mut remaining: BigRational) -> BigRational { if remaining.is_zero() { return remaining; } match rate { Rate::Unlimited => BigRational::zero(), Rate::Finite(ratio) => { if ratio.is_zero() || elapsed.is_zero() { return remaining; } let delta_ns = elapsed.as_nanos(); if delta_ns == 0 { return remaining; } let elapsed = BigRational::from_frac_u128(delta_ns, NANOS_PER_SEC); let usage = ratio * &elapsed; if usage >= remaining { return BigRational::zero(); } remaining -= usage; remaining } } } #[cfg(test)] mod tests { use super::*; use num_rational::BigRational; use std::collections::BTreeMap; fn constant(limit: u128) -> impl FnMut(&u8) -> Option { move |_| Some(limit) } fn unlimited() -> impl FnMut(&u8) -> Option { move |_| None } fn assert_rational_eq(r: &BigRational, num: u64, den: u64) { assert_eq!(r, &BigRational::from_frac_u64(num, den)); } #[test] fn equal_share_on_single_egress() { let flows = vec![ Flow { id: 1, origin: 1, recipient: 10, delivered: true, }, Flow { id: 2, origin: 1, recipient: 11, delivered: true, }, ]; let allocations = allocate(&flows, constant(1_000), unlimited()); assert_eq!(allocations.len(), 2); for rate in allocations.values() { let Rate::Finite(ratio) = rate else { panic!("expected finite rate"); }; assert_rational_eq(ratio, 500, 1); } } #[test] fn ingress_cap_enforced() { let flows = vec![Flow { id: 1, origin: 1, recipient: 5, delivered: true, }]; let allocations = allocate(&flows, unlimited(), constant(2_000)); let rate = allocations.get(&1).expect("missing flow"); let Rate::Finite(ratio) = rate else { panic!("expected finite rate"); }; assert_rational_eq(ratio, 2_000, 1); } #[test] fn unlimited_flow_finishes_immediately() { let flows = vec![Flow { id: 7, origin: 1, recipient: 2, delivered: false, }]; let allocations = allocate(&flows, unlimited(), unlimited()); assert_eq!(allocations[&7], Rate::Unlimited); } #[test] fn transfer_accumulates_carry() { let ratio = BigRational::from_frac_u64(1, 2); // 0.5 bytes per second let rate = Rate::Finite(ratio); let initial = BigRational::from_u128(10); let after_short = transfer(&rate, Duration::from_millis(500), initial); assert_eq!(after_short, BigRational::from_frac_u64(39, 4)); let after_long = transfer(&rate, Duration::from_millis(1500), after_short); assert_eq!(after_long, BigRational::from_u128(9)); } #[test] fn finish_duration_accounts_for_fractional_progress() { let rate = Rate::Finite(BigRational::from_frac_u64(1, 2)); let initial = BigRational::from_u128(1); let partial = transfer(&rate, Duration::from_millis(500), initial.clone()); assert_eq!(partial, BigRational::from_frac_u64(3, 4)); let duration_full = duration(&rate, &initial).expect("finite duration"); assert_eq!(duration_full, Duration::from_secs(2)); let finish = duration(&rate, &partial).expect("finish duration"); assert_eq!(finish, Duration::from_millis(1500)); assert!(finish < duration_full); } #[test] fn bandwidth_duration() { let ratio = BigRational::from_u128(500); let rate = Rate::Finite(ratio); let time = duration(&rate, &BigRational::from_u128(1_000)).expect("finite time"); assert_eq!(time.as_secs(), 2); } fn rate_of(map: &BTreeMap, id: u64) -> BigRational { match map.get(&id).expect("missing flow") { Rate::Finite(ratio) => ratio.clone(), Rate::Unlimited => panic!("unexpected unlimited rate"), } } #[test] fn three_peer_fair_sharing() { let flows = vec![ Flow { id: 1, origin: 'A', recipient: 'B', delivered: true, }, Flow { id: 2, origin: 'A', recipient: 'B', delivered: true, }, Flow { id: 3, origin: 'B', recipient: 'C', delivered: true, }, Flow { id: 4, origin: 'A', recipient: 'C', delivered: true, }, Flow { id: 5, origin: 'C', recipient: 'B', delivered: true, }, ]; let allocations = allocate( &flows, |origin: &char| match origin { 'A' => Some(1_000_000), // 1_000 KB/s 'B' => Some(750_000), 'C' => Some(100_000), _ => None, }, |recipient: &char| match recipient { 'A' => Some(500_000), 'B' => Some(250_000), 'C' => Some(1_000_000), _ => None, }, ); let rate_ab1 = rate_of(&allocations, 1); assert_rational_eq(&rate_ab1, 250_000, 3); let rate_ab2 = rate_of(&allocations, 2); assert_rational_eq(&rate_ab2, 250_000, 3); let rate_ac = rate_of(&allocations, 4); assert_rational_eq(&rate_ac, 500_000, 1); let rate_bc = rate_of(&allocations, 3); assert_rational_eq(&rate_bc, 500_000, 1); let rate_cb = rate_of(&allocations, 5); assert_rational_eq(&rate_cb, 250_000, 3); } #[test] fn upstream_bottleneck_propagates() { let flows = vec![ Flow { id: 1, origin: 'A', recipient: 'B', delivered: true, }, Flow { id: 2, origin: 'A', recipient: 'C', delivered: true, }, ]; let allocations = allocate( &flows, |origin: &char| match origin { 'A' => Some(1_000_000), 'B' => Some(1_000_000), 'C' => Some(1_000_000), _ => None, }, |recipient: &char| match recipient { 'A' => Some(500_000), 'B' => Some(1_000), 'C' => Some(500_000), _ => None, }, ); let rate_ab = rate_of(&allocations, 1); assert_rational_eq(&rate_ab, 1_000, 1); let rate_bc = rate_of(&allocations, 2); assert_rational_eq(&rate_bc, 500_000, 1); } #[test] fn limited_capacity_still_guarantees_fair_share() { let flows = vec![ Flow { id: 1, origin: 'A', recipient: 'B', delivered: true, }, Flow { id: 2, origin: 'A', recipient: 'C', delivered: true, }, ]; let allocations = allocate( &flows, |origin: &char| match origin { 'A' => Some(50_000), 'B' => Some(1_000_000), 'C' => Some(1_000_000), _ => None, }, |recipient: &char| match recipient { 'A' => Some(500_000), 'B' => Some(1_000), 'C' => Some(500_000), _ => None, }, ); let rate_ab = rate_of(&allocations, 1); assert_rational_eq(&rate_ab, 1_000, 1); let rate_bc = rate_of(&allocations, 2); assert_rational_eq(&rate_bc, 49_000, 1); } #[test] fn planner_skips_unlimited_resources() { let flows = vec![Flow { id: 99, origin: 1u8, recipient: 2u8, delivered: true, }]; let mut egress = unlimited(); let mut ingress = unlimited(); let planner = Planner::new(&flows, &mut egress, &mut ingress); assert_eq!(planner.resources().len(), 0); assert!(planner.states().iter().all(|state| !state.limited)); assert_eq!(planner.active(), 0); } #[test] fn planner_tracks_shared_resource_membership() { let flows = vec![ Flow { id: 1, origin: 1u8, recipient: 10u8, delivered: true, }, Flow { id: 2, origin: 1u8, recipient: 11u8, delivered: true, }, ]; let mut egress = constant(1_000); let mut ingress = unlimited(); let planner = Planner::new(&flows, &mut egress, &mut ingress); let resources = planner.resources(); assert_eq!(resources.len(), 1); let resource = &resources[0]; assert_eq!(resource.members, vec![0, 1]); assert_eq!(resource.active, 2); assert!(planner.states().iter().all(|state| state.active)); } #[test] fn planner_freeze_clears_active_counts() { let flows = vec![ Flow { id: 1, origin: 1u8, recipient: 2u8, delivered: true, }, Flow { id: 2, origin: 1u8, recipient: 3u8, delivered: true, }, ]; let mut egress = constant(1_000); let mut ingress = unlimited(); let mut planner = Planner::new(&flows, &mut egress, &mut ingress); assert_eq!(planner.active(), 2); // Freezing the shared egress resource should freeze both flows and zero the counters. planner.freeze(0); let resources = planner.resources(); assert_eq!(resources[0].active, 0); assert_eq!(planner.active(), 0); assert!(planner .rates() .iter() .filter_map(|opt| opt.as_ref()) .all(|ratio| ratio.is_zero())); } }