use crate::p2p::wire; use commonware_cryptography::PublicKey; use commonware_p2p::{utils::codec::WrappedSender, Recipients, Sender}; use commonware_runtime::{ telemetry::metrics::{ histogram::Buckets, status::{self, CounterExt, GaugeExt, Status}, }, Clock, Metrics, }; use commonware_utils::{PrioritySet, Span, SystemTimeExt}; use prometheus_client::{ encoding::EncodeLabelSet, metrics::{family::Family, gauge::Gauge, histogram::Histogram}, }; use rand::{seq::SliceRandom, Rng}; use std::{ collections::{HashMap, HashSet}, marker::PhantomData, time::{Duration, SystemTime}, }; use tracing::debug; #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] struct Peer { peer: String, } /// Unique identifier for a request. /// /// Once u64 requests have been made, the ID wraps around (resetting to zero). /// As long as there are less than u64 requests outstanding, this should not be /// an issue. pub type ID = u64; /// Tracks an active request that has been sent to a peer. struct ActiveRequest { key: Key, peer: P, start: SystemTime, } /// Configuration for the fetcher. pub struct Config { /// Local identity of the participant (if any). pub me: Option

, /// Initial expected performance for new participants. pub initial: Duration, /// Timeout for requests. pub timeout: Duration, /// How long fetches remain in the pending queue before being retried. pub retry_timeout: Duration, /// Whether requests are sent with priority over other network messages. pub priority_requests: bool, } /// Maintains requests for data from other peers, called fetches. /// /// Requests are called fetches. Fetches may be in one of two states: /// - Active: Sent to a peer and is waiting for a response. /// - Pending: Not successfully sent to a peer. Waiting to be retried by timeout. /// /// Both types of requests will be retried after a timeout if not resolved (i.e. a response or a /// cancellation). Upon retry, requests may either be placed in active or pending state again. /// /// # Targets /// /// Peers can be registered as "targets" for specific keys, restricting fetches to only those /// peers. Targets represent "the only peers who might eventually have the data". When fetching, /// only target peers are tried. There is no fallback to other peers, if all targets are /// unavailable, the fetch waits for them. /// /// Targets persist through transient failures (timeout, "no data" response, send failure) since /// the peer might be slow or might receive the data later. Targets are only removed when: /// - A peer is blocked (sent invalid data) /// - The fetch succeeds (all targets for that key are cleared) pub struct Fetcher where E: Clock + Rng + Metrics, P: PublicKey, Key: Span, NetS: Sender, { context: E, // Peer management /// Local identity (to exclude from requests) me: Option

, /// Participants to exclude from requests (blocked peers) excluded: HashSet

, /// Participants and their performance (lower is better, in milliseconds) participants: PrioritySet, // Request tracking /// Next ID to use for a request request_id: ID, /// Active requests ordered by deadline (ID -> deadline) active: PrioritySet, /// Request data for active requests (ID -> request details) requests: HashMap>, /// Reverse lookup from key to request ID key_to_id: HashMap, // Config /// Initial expected performance for new participants initial: Duration, /// Timeout for requests timeout: Duration, /// Manages pending requests. When a request is registered (for both the first time and after /// a retry), it is added to this set. /// /// The value is a tuple of the next time to try the request and a boolean indicating if the request /// is a retry (in which case the request should be made to a random peer). pending: PrioritySet, /// If no peers are ready to handle a request (all filtered out or send failed), the waiter is set /// to the next time to try the request. waiter: Option, /// How long fetches remain in the pending queue before being retried retry_timeout: Duration, /// Whether requests are sent with priority over other network messages priority_requests: bool, /// Per-key target peers restricting which peers are used to fetch each key. /// Only target peers are tried, waiting for them if unavailable. There is no /// fallback to other peers. Targets persist through transient failures, they are /// only removed when blocked (invalid data) or cleared on successful fetch. targets: HashMap>, /// Per-peer performance metric (exponential moving average of response time in ms) performance: Family, /// Status of request creation attempts (Success when eligible peers exist, Dropped otherwise) requests_created: status::Counter, /// Status of individual network requests sent to peers requests_sent: status::Counter, /// Histogram of successful response durations resolves: Histogram, /// Phantom data for networking types _s: PhantomData, } impl Fetcher where E: Clock + Rng + Metrics, P: PublicKey, Key: Span, NetS: Sender, { /// Creates a new fetcher. pub fn new(context: E, config: Config

) -> Self { let performance = Family::::default(); context.register( "peer_performance", "Per-peer performance (exponential moving average of response time in ms)", performance.clone(), ); let requests_created = status::Counter::default(); context.register( "requests_created", "Status of request creation attempts", requests_created.clone(), ); let requests_sent = status::Counter::default(); context.register( "requests_sent", "Status of individual network requests sent to peers", requests_sent.clone(), ); let resolves = Histogram::new(Buckets::NETWORK); context.register( "resolves", "Number and duration of requests that were resolved", resolves.clone(), ); Self { context, me: config.me, excluded: HashSet::new(), participants: PrioritySet::new(), request_id: 0, active: PrioritySet::new(), requests: HashMap::new(), key_to_id: HashMap::new(), initial: config.initial, timeout: config.timeout, pending: PrioritySet::new(), waiter: None, retry_timeout: config.retry_timeout, priority_requests: config.priority_requests, targets: HashMap::new(), performance, requests_created, requests_sent, resolves, _s: PhantomData, } } /// Generate the next request ID. const fn next_id(&mut self) -> ID { let id = self.request_id; self.request_id = self.request_id.wrapping_add(1); id } /// Calculate a participant's new priority using exponential moving average. fn update_performance(&mut self, participant: &P, elapsed: Duration) { let Some(past) = self.participants.get(participant) else { return; }; let next = past.saturating_add(elapsed.as_millis()) / 2; self.participants.put(participant.clone(), next); let label = Peer { peer: participant.to_string(), }; let _ = self.performance.get_or_create(&label).try_set(next); } /// Get eligible peers for a key in priority order. /// /// If `shuffle` is true, the peers are shuffled (used for retries to try different peers). fn get_eligible_peers(&mut self, key: &Key, shuffle: bool) -> Vec

{ let targets = self.targets.get(key); // Prepare participant iterator let participant_iter = self.participants.iter(); // Collect eligible peers let mut eligible: Vec

= participant_iter .filter(|(p, _)| self.me.as_ref() != Some(p)) // not self .filter(|(p, _)| !self.excluded.contains(p)) // not blocked .filter(|(p, _)| targets.is_none_or(|t| t.contains(p))) // matches target if any .map(|(p, _)| p.clone()) .collect(); // Shuffle if requested if shuffle { eligible.shuffle(&mut self.context); } eligible } /// Attempts to send a fetch request for a pending key. /// /// Iterates through pending keys until a send succeeds. For each key, tries /// eligible peers in priority order. On success, the key moves from pending /// to active. On failure, the key remains pending for retry. /// /// Sets `self.waiter` to control when the next fetch attempt should occur: /// - Rate limit expiry time if any peer was rate-limited /// - `retry_timeout` if peers exist but all sends failed /// - `Duration::MAX` if no eligible peers (wait for external changes) pub async fn fetch(&mut self, sender: &mut WrappedSender>) { self.waiter = None; // Collect keys to try (need to clone since we mutate self during iteration) let pending_keys: Vec<(Key, bool)> = self .pending .iter() .map(|(k, (_, retry))| (k.clone(), *retry)) .collect(); // Try each pending key until one succeeds let mut earliest_rate_limit: Option = None; let mut found_eligible_peers = false; for (key, retry) in pending_keys { // Skip keys with no eligible peers let peers = self.get_eligible_peers(&key, retry); if peers.is_empty() { self.requests_created.inc(Status::Dropped); continue; } // Mark that an eligible peer was found self.requests_created.inc(Status::Success); found_eligible_peers = true; // Try each peer until one succeeds for peer in peers { // Check rate limit (consumes a token if not rate-limited) let checked = match sender.check(Recipients::One(peer.clone())).await { Ok(checked) => checked, Err(not_until) => { // Peer is rate-limited, track earliest retry time earliest_rate_limit = Some(earliest_rate_limit.map_or(not_until, |t| t.min(not_until))); continue; } }; // Attempt send let id = self.next_id(); let message = wire::Message { id, payload: wire::Payload::Request(key.clone()), }; match checked.send(message, self.priority_requests).await { Ok(sent) if !sent.is_empty() => { // Success - move from pending to active self.requests_sent.inc(Status::Success); self.pending.remove(&key); let now = self.context.current(); let deadline = now.checked_add(self.timeout).expect("time overflowed"); self.active.put(id, deadline); self.requests.insert( id, ActiveRequest { key: key.clone(), peer, start: now, }, ); self.key_to_id.insert(key, id); return; } Ok(_) => { // Peer dropped message, try next peer self.requests_sent.inc(Status::Dropped); debug!(?peer, "send returned empty"); self.update_performance(&peer, self.timeout); } Err(err) => { // Send failed, try next peer self.requests_sent.inc(Status::Failure); debug!(?err, ?peer, "send failed"); self.update_performance(&peer, self.timeout); } } } } // Set waiter for next fetch attempt self.waiter = Some(if let Some(rate_limit_time) = earliest_rate_limit { // Use rate limit expiry time rate_limit_time } else if found_eligible_peers { // Peers exist but all sends failed - use retry timeout self.context.current() + self.retry_timeout } else { // No eligible peers - wait for external changes self.context.current().saturating_add(Duration::MAX) }); } /// Retains only the fetches with keys greater than the given key. pub fn retain(&mut self, predicate: impl Fn(&Key) -> bool) { // Collect IDs to remove based on key predicate let ids_to_remove: Vec = self .requests .iter() .filter(|(_, req)| !predicate(&req.key)) .map(|(id, _)| *id) .collect(); for id in ids_to_remove { self.active.remove(&id); self.requests.remove(&id); } self.key_to_id.retain(|k, _| predicate(k)); self.pending.retain(&predicate); self.targets.retain(|k, _| predicate(k)); // Clear waiter since the key that caused it may have been removed self.waiter = None; } /// Cancels a fetch request. /// /// Returns `true` if the fetch was canceled. pub fn cancel(&mut self, key: &Key) -> bool { // Remove targets for this key self.clear_targets(key); // Check the pending queue first if self.pending.remove(key) { return true; } // Check the active fetches if let Some(id) = self.key_to_id.remove(key) { self.active.remove(&id); self.requests.remove(&id); return true; } false } /// Cancel all fetches. pub fn clear(&mut self) { self.pending.clear(); self.active.clear(); self.requests.clear(); self.key_to_id.clear(); self.targets.clear(); } /// Adds a key to the front of the pending queue. pub fn add_ready(&mut self, key: Key) { assert!(!self.pending.contains(&key)); self.pending.put(key, (self.context.current(), false)); } /// Adds a key to the pending queue. /// /// Panics if the key is already pending. pub fn add_retry(&mut self, key: Key) { assert!(!self.pending.contains(&key)); let deadline = self.context.current() + self.retry_timeout; self.pending.put(key, (deadline, true)); } /// Returns the deadline for the next pending retry. pub fn get_pending_deadline(&self) -> Option { // Pending may be emptied by cancel/retain if self.pending.is_empty() { return None; } // Return the greater of the waiter and the next pending deadline let pending_deadline = self.pending.peek().map(|(_, (deadline, _))| *deadline); pending_deadline.max(self.waiter) } /// Returns the deadline for the next active request timeout. pub fn get_active_deadline(&self) -> Option { self.active.peek().map(|(_, deadline)| *deadline) } /// Removes and returns the key with the next request timeout. /// /// Targets are not removed on timeout. pub fn pop_active(&mut self) -> Option { // Pop the next deadline let (id, _) = self.active.pop()?; // Remove the request and update performance with timeout penalty let req = self.requests.remove(&id)?; self.key_to_id.remove(&req.key); self.update_performance(&req.peer, self.timeout); Some(req.key) } /// Processes a response from a peer. Removes and returns the relevant key. /// /// Returns the key if the response was valid. Returns `None` if the response was /// invalid or unsolicited. /// /// Targets are not removed here, regardless of response type. Targets persist through /// "no data" responses (peer might get data later). On valid data response, caller /// should call `clear_targets()`. On invalid data, caller should block the peer which /// removes them from all target sets. pub fn pop_by_id(&mut self, id: ID, peer: &P, has_response: bool) -> Option { // Confirm ID exists and is for the peer let req = self.requests.get(&id)?; if &req.peer != peer { return None; } // Remove the request let req = self.requests.remove(&id)?; self.active.remove(&id); self.key_to_id.remove(&req.key); // Update the peer's performance if has_response { // Compute elapsed time and update performance let elapsed = self .context .current() .duration_since(req.start) .unwrap_or_default(); self.update_performance(&req.peer, elapsed); self.resolves.observe(elapsed.as_secs_f64()); } else { // Treat lack of response as a timeout self.update_performance(&req.peer, self.timeout); } Some(req.key) } /// Reconciles the list of peers that can be used to fetch data. pub fn reconcile(&mut self, keep: &[P]) { self.participants.reconcile(keep, self.initial.as_millis()); // Clear waiter (may no longer apply) self.waiter = None; } /// Blocks a peer from being used to fetch data. /// /// Also removes the peer from all target sets. pub fn block(&mut self, peer: P) { // Remove peer from all target sets (keeping empty entries) for targets in self.targets.values_mut() { targets.remove(&peer); } self.excluded.insert(peer); } /// Add target peers for fetching a key. /// /// Targets are added to any existing targets for this key. /// /// Clears the waiter to allow immediate retry if the fetch was blocked waiting for targets. pub fn add_targets(&mut self, key: Key, peers: impl IntoIterator) { self.targets.entry(key).or_default().extend(peers); // Clear waiter to allow retry with new targets self.waiter = None; } /// Clear targeting for a key. /// /// If there is an ongoing fetch for this key, it will try any available peer instead /// of being restricted to targets. Also used to clean up targets after a successful /// or cancelled fetch. /// /// Clears the waiter to allow immediate retry with any available peer. pub fn clear_targets(&mut self, key: &Key) { self.targets.remove(key); // Clear waiter to allow retry without targets self.waiter = None; } /// Returns whether a key has targets set. pub fn has_targets(&self, key: &Key) -> bool { self.targets.contains_key(key) } /// Returns the number of fetches. pub fn len(&self) -> usize { self.pending.len() + self.requests.len() } /// Returns the number of pending fetches. pub fn len_pending(&self) -> usize { self.pending.len() } /// Returns the number of active fetches. pub fn len_active(&self) -> usize { self.requests.len() } /// Returns the number of blocked peers. pub fn len_blocked(&self) -> usize { self.excluded.len() } /// Returns true if the fetch is in progress. #[cfg(test)] pub fn contains(&self, key: &Key) -> bool { self.key_to_id.contains_key(key) || self.pending.contains(key) } } #[cfg(test)] mod tests { use super::*; use crate::p2p::mocks::Key as MockKey; use bytes::Bytes; use commonware_cryptography::{ ed25519::{PrivateKey, PublicKey}, Signer, }; use commonware_p2p::{LimitedSender, Recipients, UnlimitedSender}; use commonware_runtime::{ deterministic::{self, Context, Runner}, KeyedRateLimiter, Quota, Runner as _, RwLock, }; use commonware_utils::NZU32; use std::{fmt, sync::Arc, time::Duration}; // Mock error type for testing #[derive(Debug)] struct MockError; impl fmt::Display for MockError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "mock error") } } impl std::error::Error for MockError {} #[derive(Debug)] struct CheckedSender<'a, S: UnlimitedSender> { sender: &'a mut S, recipients: Recipients, } impl<'a, S: UnlimitedSender> commonware_p2p::CheckedSender for CheckedSender<'a, S> { type PublicKey = S::PublicKey; type Error = S::Error; async fn send( self, message: Bytes, priority: bool, ) -> Result, Self::Error> { self.sender.send(self.recipients, message, priority).await } } #[derive(Default, Clone, Debug)] struct FailMockSenderInner; impl UnlimitedSender for FailMockSenderInner { type PublicKey = PublicKey; type Error = MockError; async fn send( &mut self, _recipients: Recipients, _message: Bytes, _priority: bool, ) -> Result, Self::Error> { Ok(vec![]) } } // Mock sender that fails #[derive(Default, Clone, Debug)] struct FailMockSender(FailMockSenderInner); impl LimitedSender for FailMockSender { type PublicKey = PublicKey; type Checked<'a> = CheckedSender<'a, FailMockSenderInner>; async fn check<'a>( &'a mut self, recipients: Recipients, ) -> Result, SystemTime> { Ok(CheckedSender { sender: &mut self.0, recipients, }) } } // Mock sender that succeeds #[derive(Default, Clone, Debug)] struct SuccessMockSenderInner; impl UnlimitedSender for SuccessMockSenderInner { type PublicKey = PublicKey; type Error = MockError; async fn send( &mut self, recipients: Recipients, _message: Bytes, _priority: bool, ) -> Result, Self::Error> { match recipients { Recipients::One(peer) => Ok(vec![peer]), _ => unimplemented!(), } } } // Mock sender that succeeds #[derive(Default, Clone, Debug)] struct SuccessMockSender(SuccessMockSenderInner); impl LimitedSender for SuccessMockSender { type PublicKey = PublicKey; type Checked<'a> = CheckedSender<'a, SuccessMockSenderInner>; async fn check<'a>( &'a mut self, recipients: Recipients, ) -> Result, SystemTime> { Ok(CheckedSender { sender: &mut self.0, recipients, }) } } // Mock sender that rate-limits per peer #[derive(Clone)] struct LimitedMockSender { inner: SuccessMockSenderInner, rate_limiter: Arc>>, } impl LimitedMockSender { fn new(quota: Quota, clock: E) -> Self { Self { inner: SuccessMockSenderInner, rate_limiter: Arc::new(RwLock::new(KeyedRateLimiter::hashmap_with_clock( quota, clock, ))), } } } impl LimitedSender for LimitedMockSender { type PublicKey = PublicKey; type Checked<'a> = CheckedSender<'a, SuccessMockSenderInner>; async fn check<'a>( &'a mut self, recipients: Recipients, ) -> Result, SystemTime> { let peer = match &recipients { Recipients::One(p) => p, _ => unimplemented!(), }; { let rate_limiter = self.rate_limiter.write().await; if let Err(not_until) = rate_limiter.check_key(peer) { return Err(not_until.earliest_possible()); } } Ok(CheckedSender { sender: &mut self.inner, recipients, }) } } fn create_test_fetcher>( context: Context, ) -> Fetcher { let public_key = PrivateKey::from_seed(0).public_key(); let config = Config { me: Some(public_key), initial: Duration::from_millis(100), timeout: Duration::from_secs(5), retry_timeout: Duration::from_millis(100), priority_requests: false, }; Fetcher::new(context, config) } /// Helper to add an active request directly for testing fn add_test_active>( fetcher: &mut Fetcher, id: ID, key: MockKey, ) { let peer = PrivateKey::from_seed(1).public_key(); let now = fetcher.context.current(); let deadline = now + Duration::from_secs(5); fetcher.active.put(id, deadline); fetcher.requests.insert( id, ActiveRequest { key: key.clone(), peer, start: now, }, ); fetcher.key_to_id.insert(key, id); } #[test] fn test_retain_function() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); // Add some keys to pending and active states fetcher.add_retry(MockKey(1)); fetcher.add_retry(MockKey(2)); fetcher.add_retry(MockKey(3)); // Add keys to active state by simulating successful fetch add_test_active(&mut fetcher, 100, MockKey(10)); add_test_active(&mut fetcher, 101, MockKey(20)); add_test_active(&mut fetcher, 102, MockKey(30)); // Verify initial state assert_eq!(fetcher.len(), 6); assert_eq!(fetcher.len_pending(), 3); assert_eq!(fetcher.len_active(), 3); // Retain keys with value <= 10 fetcher.retain(|key| key.0 <= 10); // Check that only keys with value <= 10 remain // Pending: MockKey(1), MockKey(2), MockKey(3) all remain (1, 2, 3 <= 10) // Active: MockKey(10) remains, MockKey(20) and MockKey(30) removed (20, 30 > 10) assert_eq!(fetcher.len(), 4); // Key(1), Key(2), Key(3), Key(10) assert_eq!(fetcher.len_pending(), 3); // Key(1), Key(2), Key(3) assert_eq!(fetcher.len_active(), 1); // Key(10) // Verify specific keys assert!(fetcher.pending.contains(&MockKey(1))); assert!(fetcher.pending.contains(&MockKey(2))); assert!(fetcher.pending.contains(&MockKey(3))); assert!(fetcher.key_to_id.contains_key(&MockKey(10))); assert!(!fetcher.key_to_id.contains_key(&MockKey(20))); assert!(!fetcher.key_to_id.contains_key(&MockKey(30))); }); } #[test] fn test_clear_function() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); // Add some keys to pending and active states fetcher.add_retry(MockKey(1)); fetcher.add_retry(MockKey(2)); fetcher.add_retry(MockKey(3)); // Add keys to active state add_test_active(&mut fetcher, 100, MockKey(10)); add_test_active(&mut fetcher, 101, MockKey(20)); add_test_active(&mut fetcher, 102, MockKey(30)); // Verify initial state assert_eq!(fetcher.len(), 6); assert_eq!(fetcher.len_pending(), 3); assert_eq!(fetcher.len_active(), 3); // Clear all fetches fetcher.clear(); // Verify everything is cleared assert_eq!(fetcher.len(), 0); assert_eq!(fetcher.len_pending(), 0); assert_eq!(fetcher.len_active(), 0); // Verify specific collections are empty assert!(fetcher.pending.is_empty()); assert!(fetcher.requests.is_empty()); }); } #[test] fn test_len_functions() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); // Initially empty assert_eq!(fetcher.len(), 0); assert_eq!(fetcher.len_pending(), 0); assert_eq!(fetcher.len_active(), 0); // Add pending keys fetcher.add_retry(MockKey(1)); fetcher.add_retry(MockKey(2)); assert_eq!(fetcher.len(), 2); assert_eq!(fetcher.len_pending(), 2); assert_eq!(fetcher.len_active(), 0); // Add active keys add_test_active(&mut fetcher, 100, MockKey(10)); add_test_active(&mut fetcher, 101, MockKey(20)); assert_eq!(fetcher.len(), 4); assert_eq!(fetcher.len_pending(), 2); assert_eq!(fetcher.len_active(), 2); // Remove one pending key assert!(fetcher.pending.remove(&MockKey(1))); assert_eq!(fetcher.len(), 3); assert_eq!(fetcher.len_pending(), 1); assert_eq!(fetcher.len_active(), 2); // Remove one active key via cancel assert!(fetcher.cancel(&MockKey(10))); assert_eq!(fetcher.len(), 2); assert_eq!(fetcher.len_pending(), 1); assert_eq!(fetcher.len_active(), 1); }); } #[test] fn test_retain_with_empty_collections() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); // Test retain on empty collections fetcher.retain(|_| true); assert_eq!(fetcher.len(), 0); fetcher.retain(|_| false); assert_eq!(fetcher.len(), 0); }); } #[test] fn test_retain_all_elements_match_predicate() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); // Add keys fetcher.add_retry(MockKey(1)); fetcher.add_retry(MockKey(2)); add_test_active(&mut fetcher, 100, MockKey(10)); add_test_active(&mut fetcher, 101, MockKey(20)); let initial_len = fetcher.len(); // Retain all (predicate always returns true) fetcher.retain(|_| true); // Nothing should be removed assert_eq!(fetcher.len(), initial_len); assert_eq!(fetcher.len_pending(), 2); assert_eq!(fetcher.len_active(), 2); }); } #[test] fn test_retain_no_elements_match_predicate() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); // Add keys fetcher.add_retry(MockKey(1)); fetcher.add_retry(MockKey(2)); add_test_active(&mut fetcher, 100, MockKey(10)); add_test_active(&mut fetcher, 101, MockKey(20)); // Retain none (predicate always returns false) fetcher.retain(|_| false); // Everything should be removed assert_eq!(fetcher.len(), 0); assert_eq!(fetcher.len_pending(), 0); assert_eq!(fetcher.len_active(), 0); }); } #[test] fn test_cancel_function() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); // Add keys to both pending and active states fetcher.add_retry(MockKey(1)); fetcher.add_retry(MockKey(2)); add_test_active(&mut fetcher, 100, MockKey(10)); add_test_active(&mut fetcher, 101, MockKey(20)); // Test canceling pending key assert!(fetcher.cancel(&MockKey(1))); assert_eq!(fetcher.len_pending(), 1); assert!(!fetcher.contains(&MockKey(1))); // Test canceling active key assert!(fetcher.cancel(&MockKey(10))); assert_eq!(fetcher.len_active(), 1); assert!(!fetcher.contains(&MockKey(10))); // Test canceling non-existent key assert!(!fetcher.cancel(&MockKey(99))); // Test canceling already canceled key assert!(!fetcher.cancel(&MockKey(1))); // Cancel remaining pending key assert!(fetcher.cancel(&MockKey(2))); assert_eq!(fetcher.len_pending(), 0); // Ensure pending deadline is None assert!(fetcher.get_pending_deadline().is_none()); }); } #[test] fn test_contains_function() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); // Initially empty assert!(!fetcher.contains(&MockKey(1))); // Add to pending fetcher.add_retry(MockKey(1)); assert!(fetcher.contains(&MockKey(1))); // Add to active add_test_active(&mut fetcher, 100, MockKey(10)); assert!(fetcher.contains(&MockKey(10))); // Test non-existent key assert!(!fetcher.contains(&MockKey(99))); // Remove from pending fetcher.pending.remove(&MockKey(1)); assert!(!fetcher.contains(&MockKey(1))); // Remove from active via cancel fetcher.cancel(&MockKey(10)); assert!(!fetcher.contains(&MockKey(10))); }); } #[test] fn test_add_retry_function() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); // Add first key fetcher.add_retry(MockKey(1)); assert_eq!(fetcher.len_pending(), 1); assert!(fetcher.contains(&MockKey(1))); // Add second key fetcher.add_retry(MockKey(2)); assert_eq!(fetcher.len_pending(), 2); assert!(fetcher.contains(&MockKey(2))); // Verify deadline is set assert!(fetcher.get_pending_deadline().is_some()); }); } #[test] #[should_panic(expected = "assertion failed")] fn test_add_retry_duplicate_panics() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); fetcher.add_retry(MockKey(1)); // This should panic fetcher.add_retry(MockKey(1)); }); } #[test] fn test_get_pending_deadline() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); // No deadline when empty assert!(fetcher.get_pending_deadline().is_none()); // Add key and check deadline exists fetcher.add_retry(MockKey(1)); assert!(fetcher.get_pending_deadline().is_some()); // Add another key - should still have a deadline fetcher.add_retry(MockKey(2)); assert!(fetcher.get_pending_deadline().is_some()); // Clear and check no deadline fetcher.pending.clear(); assert!(fetcher.get_pending_deadline().is_none()); }); } #[test] fn test_get_active_deadline() { let runner = Runner::default(); runner.start(|context| async { let fetcher = create_test_fetcher::(context); // No deadline when empty (requester has no timeouts) assert!(fetcher.get_active_deadline().is_none()); }); } #[test] fn test_pop_active() { let runner = Runner::default(); runner.start(|context| async { let fetcher = create_test_fetcher::(context); // No active requests, should return None when popping // (This tests the case where requester.next() returns None or the active map doesn't contain the key) assert!(fetcher.get_active_deadline().is_none()); }); } #[test] fn test_pop_by_id() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); let dummy_peer = PrivateKey::from_seed(1).public_key(); // Add key to active state add_test_active(&mut fetcher, 100, MockKey(10)); // Test pop with non-existent ID assert!(fetcher.pop_by_id(999, &dummy_peer, true).is_none()); // The active entry should still be there since the ID wasn't found assert_eq!(fetcher.len_active(), 1); // Test pop with correct ID and peer assert_eq!(fetcher.pop_by_id(100, &dummy_peer, true), Some(MockKey(10))); assert_eq!(fetcher.len_active(), 0); }); } #[test] fn test_reconcile_and_block() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); let peer1 = PrivateKey::from_seed(1).public_key(); let peer2 = PrivateKey::from_seed(2).public_key(); // Test reconcile with peers fetcher.reconcile(&[peer1.clone(), peer2]); // Test block peer fetcher.block(peer1); // Initially no blocked peers (this depends on internal requester state) // The len_blocked function returns the count from the requester }); } #[test] fn test_len_blocked() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); // Initially no blocked peers let initial_blocked = fetcher.len_blocked(); // Block a peer let peer = PrivateKey::from_seed(1).public_key(); fetcher.block(peer); // The count should potentially increase (depends on requester implementation) let after_block = fetcher.len_blocked(); assert!(after_block >= initial_blocked); }); } #[test] fn test_edge_cases_empty_state() { let runner = Runner::default(); runner.start(|context| async { let fetcher = create_test_fetcher::(context); // Test all functions on empty fetcher assert_eq!(fetcher.len(), 0); assert_eq!(fetcher.len_pending(), 0); assert_eq!(fetcher.len_active(), 0); assert!(!fetcher.contains(&MockKey(1))); assert!(fetcher.get_pending_deadline().is_none()); assert!(fetcher.get_active_deadline().is_none()); }); } #[test] fn test_cancel_edge_cases() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); // Cancel from empty fetcher assert!(!fetcher.cancel(&MockKey(1))); // Add key, cancel it, then try to cancel again fetcher.add_retry(MockKey(1)); assert!(fetcher.cancel(&MockKey(1))); assert!(!fetcher.cancel(&MockKey(1))); // Should return false }); } #[test] fn test_retain_preserves_active_state() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); // Add keys to active with specific IDs add_test_active(&mut fetcher, 100, MockKey(1)); add_test_active(&mut fetcher, 101, MockKey(2)); // Retain only MockKey(1) fetcher.retain(|key| key.0 == 1); // Verify the ID mapping is preserved correctly assert_eq!(fetcher.len_active(), 1); assert!(fetcher.key_to_id.contains_key(&MockKey(1))); assert!(!fetcher.key_to_id.contains_key(&MockKey(2))); // Verify the request data for MockKey(1) is preserved let id = fetcher.key_to_id.get(&MockKey(1)).unwrap(); assert!(fetcher.requests.contains_key(id)); }); } #[test] fn test_mixed_operations() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); // Add keys to both pending and active fetcher.add_retry(MockKey(1)); fetcher.add_retry(MockKey(2)); add_test_active(&mut fetcher, 100, MockKey(10)); add_test_active(&mut fetcher, 101, MockKey(20)); assert_eq!(fetcher.len(), 4); // Cancel one from each assert!(fetcher.cancel(&MockKey(1))); // pending assert!(fetcher.cancel(&MockKey(10))); // active assert_eq!(fetcher.len(), 2); // Retain only keys <= 20 fetcher.retain(|key| key.0 <= 20); // Should still have MockKey(2) pending and MockKey(20) active assert_eq!(fetcher.len(), 2); assert!(fetcher.contains(&MockKey(2))); assert!(fetcher.contains(&MockKey(20))); // Clear all fetcher.clear(); assert_eq!(fetcher.len(), 0); }); } #[test] fn test_ready_vs_retry() { let runner = Runner::default(); runner.start(|context| async move { let mut fetcher = create_test_fetcher::(context.clone()); // Add some keys to pending and active states fetcher.add_retry(MockKey(1)); fetcher.add_ready(MockKey(2)); // Verify initial state assert_eq!(fetcher.len(), 2); assert_eq!(fetcher.len_pending(), 2); assert_eq!(fetcher.len_active(), 0); // Get next (should be the ready key with current time deadline) let deadline = fetcher.get_pending_deadline().unwrap(); assert_eq!(deadline, context.current()); // Pop key (ready key should come first) let (key, _) = fetcher.pending.pop().unwrap(); assert_eq!(key, MockKey(2)); // Get next (should be the retry key with delayed deadline) let deadline = fetcher.get_pending_deadline().unwrap(); assert_eq!(deadline, context.current() + Duration::from_millis(100)); // Pop key let (key, _) = fetcher.pending.pop().unwrap(); assert_eq!(key, MockKey(1)); }); } #[test] fn test_waiter_after_empty() { let runner = Runner::default(); runner.start(|context| async move { let public_key = PrivateKey::from_seed(0).public_key(); let other_public_key = PrivateKey::from_seed(1).public_key(); let config = Config { me: Some(public_key.clone()), initial: Duration::from_millis(100), timeout: Duration::from_secs(5), retry_timeout: Duration::from_millis(100), priority_requests: false, }; let mut fetcher: Fetcher<_, _, MockKey, FailMockSender> = Fetcher::new(context.clone(), config); fetcher.reconcile(&[public_key, other_public_key]); let mut sender = WrappedSender::new(FailMockSender::default()); // Add a key to pending fetcher.add_ready(MockKey(1)); fetcher.fetch(&mut sender).await; // won't be delivered, so immediately re-added fetcher.fetch(&mut sender).await; // waiter activated // Check pending deadline assert_eq!(fetcher.len_pending(), 1); let pending_deadline = fetcher.get_pending_deadline().unwrap(); assert!(pending_deadline > context.current()); // Cancel key assert!(fetcher.cancel(&MockKey(1))); assert!(fetcher.get_pending_deadline().is_none()); // Advance time past previous deadline context.sleep(Duration::from_secs(10)).await; // Add a new key for retry (should be larger than original waiter wait) fetcher.add_retry(MockKey(2)); let next_deadline = fetcher.get_pending_deadline().unwrap(); assert_eq!( next_deadline, context.current() + Duration::from_millis(100) ); }); } #[test] fn test_waiter_cleared_on_target_modification() { let runner = Runner::default(); runner.start(|context| async move { let public_key = PrivateKey::from_seed(0).public_key(); let peer1 = PrivateKey::from_seed(1).public_key(); let blocked_peer = PrivateKey::from_seed(99).public_key(); let config = Config { me: Some(public_key.clone()), initial: Duration::from_millis(100), timeout: Duration::from_secs(5), retry_timeout: Duration::from_millis(100), priority_requests: false, }; let mut fetcher: Fetcher<_, _, MockKey, FailMockSender> = Fetcher::new(context.clone(), config); fetcher.reconcile(&[public_key, peer1.clone()]); let mut sender = WrappedSender::new(FailMockSender::default()); // Block the peer we'll use as target, so fetch has no eligible participants fetcher.block(blocked_peer.clone()); // Add key with targets pointing only to blocked peer fetcher.add_ready(MockKey(1)); fetcher.add_targets(MockKey(1), [blocked_peer.clone()]); fetcher.fetch(&mut sender).await; // Waiter should be set to far future (no eligible peers at all) assert!(fetcher.waiter.is_some()); let waiter_time = fetcher.waiter.unwrap(); assert!(waiter_time > context.current() + Duration::from_secs(1000)); // Add targets should clear the waiter fetcher.add_targets(MockKey(1), [peer1.clone()]); assert!(fetcher.waiter.is_none()); // Pending deadline should now be reasonable let deadline = fetcher.get_pending_deadline().unwrap(); assert!(deadline <= context.current() + Duration::from_millis(100)); // Set waiter again by targeting blocked peer fetcher.clear_targets(&MockKey(1)); fetcher.add_targets(MockKey(1), [blocked_peer.clone()]); fetcher.fetch(&mut sender).await; assert!(fetcher.waiter.is_some()); // clear_targets should clear the waiter fetcher.clear_targets(&MockKey(1)); assert!(fetcher.waiter.is_none()); }); } #[test] fn test_waiter_uses_retry_timeout_on_send_failure() { let cfg = deterministic::Config::default().with_timeout(Some(Duration::from_secs(5))); let runner = Runner::new(cfg); runner.start(|context| async move { let public_key = PrivateKey::from_seed(0).public_key(); let peer1 = PrivateKey::from_seed(1).public_key(); let peer2 = PrivateKey::from_seed(2).public_key(); let retry_timeout = Duration::from_millis(100); let config = Config { me: Some(public_key.clone()), initial: Duration::from_millis(100), timeout: Duration::from_secs(5), retry_timeout, priority_requests: false, }; let mut fetcher: Fetcher<_, _, MockKey, FailMockSender> = Fetcher::new(context.clone(), config); // Add peers (FailMockSender doesn't rate limit, just fails sends) fetcher.reconcile(&[public_key, peer1, peer2]); let mut sender = WrappedSender::new(FailMockSender::default()); // Add key and attempt fetch - all sends will fail fetcher.add_ready(MockKey(1)); fetcher.fetch(&mut sender).await; // Key should still be pending (send failed) assert_eq!(fetcher.len_pending(), 1); // Waiter should be set to retry_timeout from now, not Duration::MAX let pending_deadline = fetcher.get_pending_deadline().unwrap(); let max_expected = context.current() + retry_timeout + Duration::from_millis(10); assert!( pending_deadline <= max_expected, "pending deadline {:?} should be within retry_timeout of now, not Duration::MAX", pending_deadline.duration_since(context.current()) ); // Wait for pending deadline and retry - should succeed quickly let wait_duration = pending_deadline .duration_since(context.current()) .unwrap_or(Duration::ZERO); context.sleep(wait_duration).await; // Should be able to fetch again (this would hang if waiter was Duration::MAX) fetcher.fetch(&mut sender).await; }); } #[test] fn test_add_targets() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); let peer1 = PrivateKey::from_seed(1).public_key(); let peer2 = PrivateKey::from_seed(2).public_key(); let peer3 = PrivateKey::from_seed(3).public_key(); // Initially no targets assert!(fetcher.targets.is_empty()); // Add targets for a key fetcher.add_targets(MockKey(1), [peer1.clone()]); assert_eq!(fetcher.targets.len(), 1); assert!(fetcher.targets.get(&MockKey(1)).unwrap().contains(&peer1)); // Add more targets for the same key (accumulates) fetcher.add_targets(MockKey(1), [peer2.clone()]); assert_eq!(fetcher.targets.len(), 1); let targets = fetcher.targets.get(&MockKey(1)).unwrap(); assert_eq!(targets.len(), 2); assert!(targets.contains(&peer1)); assert!(targets.contains(&peer2)); // Add target for a different key fetcher.add_targets(MockKey(2), [peer1.clone()]); assert_eq!(fetcher.targets.len(), 2); assert!(fetcher.targets.get(&MockKey(2)).unwrap().contains(&peer1)); // Adding duplicate target is idempotent fetcher.add_targets(MockKey(1), [peer1.clone()]); assert_eq!(fetcher.targets.get(&MockKey(1)).unwrap().len(), 2); // Add more to reach three targets fetcher.add_targets(MockKey(1), [peer3.clone()]); assert_eq!(fetcher.targets.get(&MockKey(1)).unwrap().len(), 3); assert!(fetcher.targets.get(&MockKey(1)).unwrap().contains(&peer3)); // clear_targets() removes all targets for a key fetcher.clear_targets(&MockKey(1)); assert!(!fetcher.targets.contains_key(&MockKey(1))); // Add targets on non-existent key creates new entry fetcher.add_targets(MockKey(3), [peer1.clone()]); assert!(fetcher.targets.get(&MockKey(3)).unwrap().contains(&peer1)); }); } #[test] fn test_targets_cleanup() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); let peer1 = PrivateKey::from_seed(1).public_key(); let peer2 = PrivateKey::from_seed(2).public_key(); // cancel() clears targets for key fetcher.add_targets(MockKey(1), [peer1.clone()]); fetcher.add_targets(MockKey(2), [peer1.clone()]); fetcher.add_retry(MockKey(1)); fetcher.add_retry(MockKey(2)); assert_eq!(fetcher.targets.len(), 2); assert!(fetcher.cancel(&MockKey(1))); assert!(!fetcher.targets.contains_key(&MockKey(1))); assert!(fetcher.targets.contains_key(&MockKey(2))); assert!(fetcher.cancel(&MockKey(2))); assert!(fetcher.targets.is_empty()); // clear() clears all targets fetcher.add_targets(MockKey(1), [peer1.clone(), peer2.clone()]); fetcher.add_targets(MockKey(2), [peer1.clone()]); fetcher.add_targets(MockKey(3), [peer2]); assert_eq!(fetcher.targets.len(), 3); fetcher.clear(); assert!(fetcher.targets.is_empty()); // retain() filters targets fetcher.add_targets(MockKey(1), [peer1.clone()]); fetcher.add_targets(MockKey(2), [peer1.clone()]); fetcher.add_targets(MockKey(10), [peer1.clone()]); fetcher.add_targets(MockKey(20), [peer1]); assert_eq!(fetcher.targets.len(), 4); fetcher.retain(|key| key.0 <= 5); assert_eq!(fetcher.targets.len(), 2); assert!(fetcher.targets.contains_key(&MockKey(1))); assert!(fetcher.targets.contains_key(&MockKey(2))); assert!(!fetcher.targets.contains_key(&MockKey(10))); assert!(!fetcher.targets.contains_key(&MockKey(20))); }); } #[test] fn test_block_removes_from_targets() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); let peer1 = PrivateKey::from_seed(1).public_key(); let peer2 = PrivateKey::from_seed(2).public_key(); let peer3 = PrivateKey::from_seed(3).public_key(); // Add targets for multiple keys with various peers fetcher.add_targets(MockKey(1), [peer1.clone(), peer2.clone()]); fetcher.add_targets(MockKey(2), [peer1.clone(), peer3.clone()]); fetcher.add_targets(MockKey(3), [peer2.clone()]); // Verify initial state assert_eq!(fetcher.targets.get(&MockKey(1)).unwrap().len(), 2); assert_eq!(fetcher.targets.get(&MockKey(2)).unwrap().len(), 2); assert_eq!(fetcher.targets.get(&MockKey(3)).unwrap().len(), 1); // Block peer1 fetcher.block(peer1.clone()); // peer1 should be removed from all target sets let key1_targets = fetcher.targets.get(&MockKey(1)).unwrap(); assert_eq!(key1_targets.len(), 1); assert!(!key1_targets.contains(&peer1)); assert!(key1_targets.contains(&peer2)); let key2_targets = fetcher.targets.get(&MockKey(2)).unwrap(); assert_eq!(key2_targets.len(), 1); assert!(!key2_targets.contains(&peer1)); assert!(key2_targets.contains(&peer3)); // MockKey(3) shouldn't be affected (peer1 wasn't a target) let key3_targets = fetcher.targets.get(&MockKey(3)).unwrap(); assert_eq!(key3_targets.len(), 1); assert!(key3_targets.contains(&peer2)); // Block peer2 - should remove from MockKey(1) and MockKey(3) fetcher.block(peer2); // MockKey(1) now has empty targets (entry kept to prevent fallback) assert!(fetcher.targets.contains_key(&MockKey(1))); assert!(fetcher.targets.get(&MockKey(1)).unwrap().is_empty()); // MockKey(2) still has peer3 let key2_targets = fetcher.targets.get(&MockKey(2)).unwrap(); assert_eq!(key2_targets.len(), 1); assert!(key2_targets.contains(&peer3)); // MockKey(3) now has empty targets (entry kept to prevent fallback) assert!(fetcher.targets.contains_key(&MockKey(3))); assert!(fetcher.targets.get(&MockKey(3)).unwrap().is_empty()); }); } #[test] fn test_target_behavior_on_send_failure() { let runner = Runner::default(); runner.start(|context| async move { let mut fetcher = create_test_fetcher::(context.clone()); let public_key = PrivateKey::from_seed(0).public_key(); let peer1 = PrivateKey::from_seed(1).public_key(); let peer2 = PrivateKey::from_seed(2).public_key(); let peer3 = PrivateKey::from_seed(3).public_key(); fetcher.reconcile(&[public_key, peer1.clone(), peer2.clone(), peer3.clone()]); let mut sender = WrappedSender::new(FailMockSender::default()); // Add targets and attempt fetch fetcher.add_targets(MockKey(2), [peer1.clone(), peer2.clone()]); fetcher.add_ready(MockKey(2)); assert_eq!(fetcher.targets.get(&MockKey(2)).unwrap().len(), 2); fetcher.fetch(&mut sender).await; // Both targets should still be present (not removed on send failure) assert_eq!(fetcher.targets.get(&MockKey(2)).unwrap().len(), 2); assert!(fetcher.pending.contains(&MockKey(2))); }); } #[test] fn test_target_retention_on_pop() { let runner = Runner::default(); runner.start(|context| async move { let mut fetcher = create_test_fetcher::(context.clone()); let public_key = PrivateKey::from_seed(0).public_key(); let peer1 = PrivateKey::from_seed(1).public_key(); let peer2 = PrivateKey::from_seed(2).public_key(); fetcher.reconcile(&[public_key, peer1.clone(), peer2.clone()]); let mut sender = WrappedSender::new(SuccessMockSender::default()); // Timeout does not remove target fetcher.add_targets(MockKey(1), [peer1.clone(), peer2.clone()]); fetcher.add_ready(MockKey(1)); assert_eq!(fetcher.targets.get(&MockKey(1)).unwrap().len(), 2); fetcher.fetch(&mut sender).await; context.sleep(Duration::from_millis(200)).await; assert_eq!(fetcher.pop_active(), Some(MockKey(1))); // Both targets should still be present after timeout assert_eq!(fetcher.targets.get(&MockKey(1)).unwrap().len(), 2); fetcher.targets.clear(); // Error response ("no data") does not remove target fetcher.add_targets(MockKey(2), [peer1.clone()]); fetcher.add_ready(MockKey(2)); fetcher.fetch(&mut sender).await; let id = *fetcher.active.iter().next().unwrap().0; assert_eq!(fetcher.pop_by_id(id, &peer1, false), Some(MockKey(2))); // Target should still be present after "no data" response assert!(fetcher.targets.get(&MockKey(2)).unwrap().contains(&peer1)); fetcher.targets.clear(); // Data response also preserves targets // (caller must clear targets after data validation) fetcher.add_targets(MockKey(3), [peer1.clone()]); fetcher.add_ready(MockKey(3)); fetcher.fetch(&mut sender).await; let id = *fetcher.active.iter().next().unwrap().0; assert_eq!(fetcher.pop_by_id(id, &peer1, true), Some(MockKey(3))); assert!(fetcher.targets.get(&MockKey(3)).unwrap().contains(&peer1)); }); } #[test] fn test_no_fallback_when_targets_unavailable() { let runner = Runner::default(); runner.start(|context| async move { let mut fetcher = create_test_fetcher::(context.clone()); let public_key = PrivateKey::from_seed(0).public_key(); let peer1 = PrivateKey::from_seed(1).public_key(); let peer2 = PrivateKey::from_seed(2).public_key(); let peer3 = PrivateKey::from_seed(3).public_key(); // Add only peer1 and peer2 to the peer set (peer3 is not in the peer set) fetcher.reconcile(&[public_key, peer1, peer2]); // Target peer3, which is not in the peer set (disconnected) fetcher.add_targets(MockKey(1), [peer3]); assert!(fetcher.targets.contains_key(&MockKey(1))); // Add key to pending fetcher.add_ready(MockKey(1)); // Fetch should not fallback to any peer - it should wait for targets let mut sender = WrappedSender::new(SuccessMockSender::default()); fetcher.fetch(&mut sender).await; // Targets should still exist (no fallback cleared them) assert!(fetcher.targets.contains_key(&MockKey(1))); // Key should still be in pending state (no fallback to available peers) assert_eq!(fetcher.len_pending(), 1); assert_eq!(fetcher.len_active(), 0); // Waiter should be set to far future (no eligible peers at all) assert!(fetcher.waiter.is_some()); let waiter_time = fetcher.waiter.unwrap(); assert!(waiter_time > context.current() + Duration::from_secs(1000)); }); } #[test] fn test_clear_targets() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); let peer1 = PrivateKey::from_seed(1).public_key(); let peer2 = PrivateKey::from_seed(2).public_key(); // Add targets fetcher.add_targets(MockKey(1), [peer1.clone(), peer2]); fetcher.add_targets(MockKey(2), [peer1]); assert_eq!(fetcher.targets.len(), 2); // clear_targets() removes the targets entry entirely fetcher.clear_targets(&MockKey(1)); assert!(!fetcher.targets.contains_key(&MockKey(1))); assert!(fetcher.targets.contains_key(&MockKey(2))); // clear_targets() on non-existent key is a no-op fetcher.clear_targets(&MockKey(99)); assert_eq!(fetcher.targets.len(), 1); // clear_targets() remaining key fetcher.clear_targets(&MockKey(2)); assert!(fetcher.targets.is_empty()); }); } #[test] fn test_skips_keys_with_rate_limited_targets() { let runner = Runner::default(); runner.start(|context| async move { let public_key = PrivateKey::from_seed(0).public_key(); let peer1 = PrivateKey::from_seed(1).public_key(); let peer2 = PrivateKey::from_seed(2).public_key(); let config = Config { me: Some(public_key.clone()), initial: Duration::from_millis(100), timeout: Duration::from_secs(5), retry_timeout: Duration::from_millis(100), priority_requests: false, }; let mut fetcher: Fetcher<_, _, MockKey, LimitedMockSender> = Fetcher::new(context.clone(), config); fetcher.reconcile(&[public_key, peer1.clone(), peer2.clone()]); let quota = Quota::per_second(NZU32!(1)); let mut sender = WrappedSender::new(LimitedMockSender::new(quota, context.clone())); // Add three keys with different targets: // - MockKey(1) targeted to peer1 // - MockKey(2) targeted to peer1 (same peer, will be rate-limited after first) // - MockKey(3) targeted to peer2 fetcher.add_targets(MockKey(1), [peer1.clone()]); fetcher.add_targets(MockKey(2), [peer1.clone()]); fetcher.add_targets(MockKey(3), [peer2.clone()]); fetcher.add_ready(MockKey(1)); context.sleep(Duration::from_millis(1)).await; fetcher.add_ready(MockKey(2)); context.sleep(Duration::from_millis(1)).await; fetcher.add_ready(MockKey(3)); // First fetch: should pick MockKey(1) targeting peer1 fetcher.fetch(&mut sender).await; assert_eq!(fetcher.len_active(), 1); assert_eq!(fetcher.len_pending(), 2); assert!(!fetcher.pending.contains(&MockKey(1))); // MockKey(1) was fetched // Second fetch: MockKey(2) is blocked (peer1 rate-limited), should skip to MockKey(3) fetcher.fetch(&mut sender).await; assert_eq!(fetcher.len_active(), 2); assert_eq!(fetcher.len_pending(), 1); assert!(fetcher.pending.contains(&MockKey(2))); // MockKey(2) is still pending assert!(!fetcher.pending.contains(&MockKey(3))); // MockKey(3) was fetched // Third fetch: only MockKey(2) remains, but peer1 is still rate-limited fetcher.fetch(&mut sender).await; assert_eq!(fetcher.len_active(), 2); // No change assert_eq!(fetcher.len_pending(), 1); // MockKey(2) still pending assert!(fetcher.waiter.is_some()); // Waiter set // Wait for rate limit to reset context.sleep(Duration::from_secs(1)).await; // Now MockKey(2) can be fetched fetcher.fetch(&mut sender).await; assert_eq!(fetcher.len_active(), 3); assert_eq!(fetcher.len_pending(), 0); }); } #[test] fn test_peer_prioritization() { let runner = Runner::default(); runner.start(|context| async { let mut fetcher = create_test_fetcher::(context); let public_key = PrivateKey::from_seed(0).public_key(); let peer1 = PrivateKey::from_seed(1).public_key(); let peer2 = PrivateKey::from_seed(2).public_key(); let peer3 = PrivateKey::from_seed(3).public_key(); // Add peers with initial performance (100ms) fetcher.reconcile(&[public_key, peer1.clone(), peer2.clone(), peer3.clone()]); // Simulate different response times by updating performance: // - peer1: very fast (10ms) // - peer2: slow (500ms) // - peer3: medium (200ms) // After update_performance with EMA: new = (past + elapsed) / 2 // peer1: simulate multiple fast responses to drive down its priority for _ in 0..5 { fetcher.update_performance(&peer1, Duration::from_millis(10)); } // peer2: simulate slow responses to increase its priority for _ in 0..5 { fetcher.update_performance(&peer2, Duration::from_millis(500)); } // peer3: simulate medium responses for _ in 0..5 { fetcher.update_performance(&peer3, Duration::from_millis(200)); } // Get eligible peers - should be ordered by priority (fastest first) let peers = fetcher.get_eligible_peers(&MockKey(1), false); // Verify we have 3 peers (excluding self) assert_eq!(peers.len(), 3); // Verify order: peer1 (fastest) should come first, peer2 (slowest) last assert_eq!( peers[0], peer1, "Fastest peer should be first, got {:?}", peers ); assert_eq!( peers[1], peer3, "Medium peer should be second, got {:?}", peers ); assert_eq!( peers[2], peer2, "Slowest peer should be last, got {:?}", peers ); // Verify that shuffling (used on retry) changes the order // Note: shuffling is random, so we check that it CAN change order // by calling multiple times and checking for any different order let mut found_different_order = false; for _ in 0..10 { let shuffled = fetcher.get_eligible_peers(&MockKey(1), true); if shuffled != peers { found_different_order = true; break; } } assert!( found_different_order, "Shuffling should produce different orders" ); }); } }