//! Resolve data identified by a fixed-length key by using the P2P network. //! //! # Overview //! //! The `p2p` module enables resolving data by fixed-length keys in a P2P network. Central to the //! module is the `peer` actor which manages the fetch lifecycle. Its mailbox allows //! initiation and pruning of fetches via the `Resolver` interface. //! //! The peer handles an arbitrarily large number of concurrent fetches by sending requests //! to other peers and processing their responses. It selects peers based on performance, retrying //! with another peer if one fails or provides invalid data. Fetches persist until pruned or //! fulfilled, delivering data to the `Consumer` for verification. //! //! The `Consumer` checks data integrity and authenticity (critical in an adversarial environment) //! and returns `true` if valid, completing the fetch, or `false` to retry. Pruning a fetch with //! in-progress response validation aborts that validation. If the aborted validation would have //! returned `false`, the peer is not blocked for that response. //! //! The peer also serves data to other peers, forwarding network requests to the `Producer`. The //! `Producer` provides data asynchronously (e.g., from storage). If it fails, the peer sends an //! empty response, prompting the requester to retry elsewhere. Each message between peers contains //! an ID. Each request is sent with a unique ID, and each response includes the ID of the request //! it responds to. //! //! # Targeting //! //! Callers can restrict fetches to specific target peers using //! [`TargetedResolver::fetch_targeted`](crate::TargetedResolver::fetch_targeted). //! Only target peers are tried, there is no automatic fallback to other peers. Targets persist through //! transient failures (timeout, "no data" response, send failure) since the peer might be slow or //! receive the data later. //! //! While a fetch is in progress, callers can modify targeting: //! - [`TargetedResolver::fetch_targeted`](crate::TargetedResolver::fetch_targeted) adds peers to the existing target set //! (only if the fetch already has targets, an "all" fetch remains unrestricted) //! - [`Resolver::fetch`](crate::Resolver::fetch) clears all targets, allowing fallback to any peer //! //! These modifications only apply to in-progress fetches. Once a fetch completes (success, pruning, //! or blocked peer), the targets for that key are cleared automatically. //! //! # Subscribers //! //! [`Resolver::fetch`](crate::Resolver::fetch) accepts a peer-visible key and a //! subscriber. This is useful when several subscribers can share the same peer-visible //! fetch. A fetch remains active while at least one attached subscriber satisfies the latest //! [`Resolver::retain`](crate::Resolver::retain) predicate. When the fetch resolves, the //! key and currently retained subscribers are supplied to //! [`Consumer::deliver`](crate::Consumer::deliver). Subscribers added while response validation //! is in progress are delivered the same accepted response locally. //! //! # Peer Selection //! //! Outbound fetches are only sent to peers in `latest.primary` (see [commonware_p2p::Provider]) but inbound //! requests are handled for all connected peers. Thus, callers that still expect a key to be fetchable after //! a peer set update must ensure the latest primary set can serve it. //! //! [`TargetedResolver::fetch_targeted`](crate::TargetedResolver::fetch_targeted) can narrow the current primary set //! further, but it does not bypass that latest-primary filter. Explicit targets that are no longer //! in the latest primary set are ignored until they become primary again. //! //! # Performance Considerations //! //! The peer supports arbitrarily many concurrent fetches, but resource usage generally //! depends on the rate-limiting configuration of the underlying P2P network. use bytes::Bytes; use commonware_utils::{channel::oneshot, Span}; mod config; pub use config::Config; mod engine; pub use engine::Engine; mod fetcher; mod inflight; mod ingress; pub use ingress::Mailbox; mod metrics; mod wire; #[cfg(feature = "mocks")] pub mod mocks; /// Serves data requested by the network. pub trait Producer: Clone + Send + 'static { /// Type used to key data requested from peers. type Key: Span; /// Serve a request received from the network. fn produce(&mut self, key: Self::Key) -> oneshot::Receiver; } #[cfg(test)] mod tests { use super::{ mocks::{Consumer, Key, Producer}, Config, Engine, Mailbox, }; use crate::{Delivery, Fetch, Resolver, TargetedResolver}; use bytes::Bytes; use commonware_cryptography::{ ed25519::{PrivateKey, PublicKey}, Signer, }; use commonware_macros::{select, test_traced}; use commonware_p2p::{ simulated::{Link, Network, Oracle, Receiver, Sender}, Blocker, Manager as _, Provider, TrackedPeers, }; use commonware_runtime::{ deterministic, telemetry::metrics::count_running_tasks, Clock, Metrics as _, Quota, Runner, Spawner as _, Supervisor as _, }; use commonware_utils::{ channel::{fallible::FallibleExt, mpsc, oneshot}, non_empty_vec, ordered::Set, sync::Mutex, NZUsize, NZU32, }; use std::{ collections::{HashMap, VecDeque}, num::{NonZeroU32, NonZeroUsize}, sync::Arc, time::Duration, }; const MAILBOX_SIZE: NonZeroUsize = NZUsize!(1024); const RATE_LIMIT: NonZeroU32 = NZU32!(10); const INITIAL_DURATION: Duration = Duration::from_millis(100); const TIMEOUT: Duration = Duration::from_millis(400); const FETCH_RETRY_TIMEOUT: Duration = Duration::from_millis(100); const LINK: Link = Link { latency: Duration::from_millis(10), jitter: Duration::from_millis(1), success_rate: 1.0, }; const LINK_UNRELIABLE: Link = Link { latency: Duration::from_millis(10), jitter: Duration::from_millis(1), success_rate: 0.5, }; fn status_metric_total(metrics: &str, name: &str, status: &str) -> u64 { let prefix = format!("{name}{{"); let status_label = format!("status=\"{status}\""); metrics .lines() .filter(|line| line.starts_with(&prefix) && line.contains(&status_label)) .map(|line| { line.split_whitespace() .next_back() .expect("metric line must have a value") .parse::() .expect("status metric value must be an integer") }) .sum() } async fn setup_network_and_peers( context: &deterministic::Context, peer_seeds: &[u64], ) -> ( Oracle, Vec, Vec, Vec<( Sender, Receiver, )>, ) { setup_network_and_peers_with_rate_limit(context, peer_seeds, Quota::per_second(RATE_LIMIT)) .await } async fn setup_network_and_peers_with_rate_limit( context: &deterministic::Context, peer_seeds: &[u64], rate_limit: Quota, ) -> ( Oracle, Vec, Vec, Vec<( Sender, Receiver, )>, ) { let (network, oracle) = Network::new( context.child("network"), commonware_p2p::simulated::Config { max_size: 1024 * 1024, disconnect_on_block: true, tracked_peer_sets: NZUsize!(3), }, ); network.start(); let schemes: Vec = peer_seeds .iter() .map(|seed| PrivateKey::from_seed(*seed)) .collect(); let peers: Vec = schemes.iter().map(|s| s.public_key()).collect(); let mut manager = oracle.manager(); manager.track(0, Set::try_from(peers.clone()).unwrap()); let mut connections = Vec::new(); for peer in &peers { let (sender, receiver) = oracle .control(peer.clone()) .register(0, rate_limit) .await .unwrap(); connections.push((sender, receiver)); } (oracle, schemes, peers, connections) } async fn add_link( oracle: &mut Oracle, link: Link, peers: &[PublicKey], from: usize, to: usize, ) { oracle .add_link(peers[from].clone(), peers[to].clone(), link.clone()) .await .unwrap(); oracle .add_link(peers[to].clone(), peers[from].clone(), link) .await .unwrap(); } #[derive(Clone, Default)] struct SequencedProducer { data: Arc>>>, } impl SequencedProducer { fn insert(&mut self, key: Key, values: impl IntoIterator) { self.data.lock().insert(key, values.into_iter().collect()); } fn remaining(&self, key: &Key) -> Vec { self.data .lock() .get(key) .map(|values| values.iter().cloned().collect()) .unwrap_or_default() } } impl crate::p2p::Producer for SequencedProducer { type Key = Key; fn produce(&mut self, key: Self::Key) -> oneshot::Receiver { let (sender, receiver) = oneshot::channel(); if let Some(value) = self.data.lock().get_mut(&key).and_then(VecDeque::pop_front) { let _ = sender.send(value); } receiver } } fn setup_and_spawn_actor( context: &deterministic::Context, provider: impl Provider, blocker: impl Blocker, signer: impl Signer, connection: ( Sender, Receiver, ), consumer: C, producer: Producer, ) -> Mailbox where C: crate::Consumer, R: Clone + Ord + Send + 'static, { setup_and_spawn_actor_with_producer( context, provider, blocker, signer, connection, consumer, producer, ) } fn setup_and_spawn_actor_with_producer( context: &deterministic::Context, provider: impl Provider, blocker: impl Blocker, signer: impl Signer, connection: ( Sender, Receiver, ), consumer: C, producer: Pro, ) -> Mailbox where C: crate::Consumer, Pro: crate::p2p::Producer, R: Clone + Ord + Send + 'static, { let public_key = signer.public_key(); let (engine, mailbox) = Engine::new( context.child("actor").with_attribute("peer", &public_key), Config { peer_provider: provider, blocker, consumer, producer, mailbox_size: MAILBOX_SIZE, me: Some(public_key), initial: INITIAL_DURATION, timeout: TIMEOUT, fetch_retry_timeout: FETCH_RETRY_TIMEOUT, priority_requests: false, priority_responses: false, }, ); engine.start(connection); mailbox } type DeliveryGate = (oneshot::Receiver<()>, bool); type DeliveryGates = Arc>>; #[derive(Clone)] struct BlockingConsumer { context: Arc, sender: mpsc::UnboundedSender<(Key, Bytes)>, started: mpsc::UnboundedSender, gates: DeliveryGates, } impl BlockingConsumer { fn new( context: deterministic::Context, gates: Vec, ) -> ( Self, mpsc::UnboundedReceiver<(Key, Bytes)>, mpsc::UnboundedReceiver, ) { let (sender, receiver) = mpsc::unbounded_channel(); let (started, started_receiver) = mpsc::unbounded_channel(); ( Self { context: Arc::new(context), sender, started, gates: Arc::new(Mutex::new(gates.into())), }, receiver, started_receiver, ) } } impl crate::Consumer for BlockingConsumer { type Key = Key; type Value = Bytes; type Subscriber = (); fn deliver( &mut self, delivery: Delivery, value: Self::Value, ) -> oneshot::Receiver { let key = delivery.key; self.started.send_lossy(key.clone()); let (gate, valid) = self .gates .lock() .pop_front() .map_or((None, true), |(gate, valid)| (Some(gate), valid)); let (mut response, receiver) = oneshot::channel(); let sender = self.sender.clone(); self.context.child("delivery").spawn(move |_| async move { if let Some(gate) = gate { select! { _ = response.closed() => return, result = gate => { if result.is_err() { let _ = response.send(false); return; } }, } } if valid { sender.send_lossy((key, value)); } let _ = response.send(valid); }); receiver } } #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)] struct SubscriberTag(u16); type RecordedDelivery = (Delivery, Bytes); #[derive(Clone)] struct BlockingSubscriberRecordingConsumer { context: Arc, sender: mpsc::UnboundedSender, started: mpsc::UnboundedSender>, gates: DeliveryGates, } impl BlockingSubscriberRecordingConsumer { fn new( context: deterministic::Context, gates: Vec, ) -> ( Self, mpsc::UnboundedReceiver, mpsc::UnboundedReceiver>, ) { let (sender, receiver) = mpsc::unbounded_channel(); let (started, started_receiver) = mpsc::unbounded_channel(); ( Self { context: Arc::new(context), sender, started, gates: Arc::new(Mutex::new(gates.into())), }, receiver, started_receiver, ) } } impl crate::Consumer for BlockingSubscriberRecordingConsumer { type Key = Key; type Value = Bytes; type Subscriber = SubscriberTag; fn deliver( &mut self, delivery: Delivery, value: Self::Value, ) -> oneshot::Receiver { self.started.send_lossy(delivery.clone()); let (gate, valid) = self .gates .lock() .pop_front() .map_or((None, true), |(gate, valid)| (Some(gate), valid)); let (mut response, receiver) = oneshot::channel(); let sender = self.sender.clone(); self.context.child("delivery").spawn(move |_| async move { if let Some(gate) = gate { select! { _ = response.closed() => return, result = gate => { if result.is_err() { let _ = response.send(false); return; } }, } } if valid { sender.send_lossy((delivery, value)); } let _ = response.send(valid); }); receiver } } #[derive(Clone)] struct SubscriberRecordingConsumer { sender: mpsc::UnboundedSender, } impl SubscriberRecordingConsumer { fn new() -> (Self, mpsc::UnboundedReceiver) { let (sender, receiver) = mpsc::unbounded_channel(); (Self { sender }, receiver) } } impl crate::Consumer for SubscriberRecordingConsumer { type Key = Key; type Value = Bytes; type Subscriber = SubscriberTag; fn deliver( &mut self, delivery: Delivery, value: Self::Value, ) -> oneshot::Receiver { let (sender, receiver) = oneshot::channel(); self.sender.send_lossy((delivery, value)); let _ = sender.send(true); receiver } } fn dummy_consumer() -> Consumer { Consumer::dummy() } fn consumer() -> (Consumer, mpsc::UnboundedReceiver<(Key, Bytes)>) { Consumer::new() } async fn wait_for_blocked( context: &deterministic::Context, oracle: &Oracle, blocker: &PublicKey, blocked: &PublicKey, ) { loop { let blocked_peers = oracle.blocked().await.unwrap(); if blocked_peers .iter() .any(|(a, b)| a == blocker && b == blocked) { return; } context.sleep(Duration::from_millis(10)).await; } } /// Tests that fetching a key from another peer succeeds when data is available. /// This test sets up two peers, where Peer 1 requests data that Peer 2 has, /// and verifies that the data is correctly delivered to Peer 1's consumer. #[test_traced] fn test_fetch_success() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; let key = Key(2); let mut prod2 = Producer::default(); prod2.insert(key.clone(), Bytes::from("data for key 2")); let (cons1, mut cons_out1) = consumer(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); mailbox1.fetch(key.clone()); let (key_actual, value) = cons_out1.recv().await.unwrap(); assert_eq!(key_actual, key); assert_eq!(value, Bytes::from("data for key 2")); }); } #[test_traced] fn test_pending_delivery_does_not_block_engine() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2, 3]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await; let key1 = Key(1); let key2 = Key(2); let data1 = Bytes::from("data for key 1"); let data2 = Bytes::from("data for key 2"); let mut prod2 = Producer::default(); prod2.insert(key1.clone(), data1.clone()); let mut prod3 = Producer::default(); prod3.insert(key2.clone(), data2.clone()); let (gate_sender1, gate_receiver1) = oneshot::channel(); let (gate_sender2, gate_receiver2) = oneshot::channel(); let (cons1, mut cons_out1, mut started) = BlockingConsumer::new( context.child("consumer"), vec![(gate_receiver1, true), (gate_receiver2, true)], ); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); let scheme = schemes.remove(0); let _mailbox3 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod3, ); mailbox1.fetch(key1.clone()); let started_key = started.recv().await.expect("delivery did not start"); assert_eq!(started_key, key1); mailbox1.fetch(key2.clone()); select! { started_key = started.recv() => { assert_eq!(started_key.expect("delivery did not start"), key2); }, _ = context.sleep(Duration::from_secs(2)) => { panic!("resolver engine blocked on pending delivery"); }, }; gate_sender2.send(()).unwrap(); let (key_actual, value) = cons_out1.recv().await.expect("consumer channel closed"); assert_eq!(key_actual, key2); assert_eq!(value, data2); gate_sender1.send(()).unwrap(); let (key_actual, value) = cons_out1.recv().await.expect("consumer channel closed"); assert_eq!(key_actual, key1); assert_eq!(value, data1); }); } #[test_traced] fn test_retain_drops_pending_delivery() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; let key = Key(1); let data = Bytes::from("data for key 1"); let mut prod2 = Producer::default(); prod2.insert(key.clone(), data.clone()); let (mut first_gate_sender, first_gate_receiver) = oneshot::channel(); let (second_gate_sender, second_gate_receiver) = oneshot::channel(); let (cons1, mut cons_out1, mut started) = BlockingConsumer::new( context.child("consumer"), vec![(first_gate_receiver, true), (second_gate_receiver, true)], ); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); mailbox1.fetch(key.clone()); let started_key = started.recv().await.expect("delivery did not start"); assert_eq!(started_key, key); let canceled = key.clone(); mailbox1.retain(move |key, _| key != &canceled); mailbox1.fetch(key.clone()); first_gate_sender.closed().await; let started_key = started.recv().await.expect("second delivery did not start"); assert_eq!(started_key, key); second_gate_sender.send(()).unwrap(); let (key_actual, value) = cons_out1.recv().await.expect("consumer channel closed"); assert_eq!(key_actual, key); assert_eq!(value, data); select! { _ = cons_out1.recv() => panic!("unexpected extra event"), _ = context.sleep(Duration::from_millis(100)) => {}, }; }); } #[test_traced] fn test_invalid_delivery_retries_and_rearms_slot() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2, 3]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; let key = Key(1); let data = Bytes::from("data for key 1"); let mut prod2 = Producer::default(); prod2.insert(key.clone(), data.clone()); let mut prod3 = Producer::default(); prod3.insert(key.clone(), data.clone()); let (first_gate_sender, first_gate_receiver) = oneshot::channel(); let (second_gate_sender, second_gate_receiver) = oneshot::channel(); let (cons1, mut cons_out1, mut started) = BlockingConsumer::new( context.child("consumer"), vec![(first_gate_receiver, false), (second_gate_receiver, true)], ); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); let scheme = schemes.remove(0); let _mailbox3 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod3, ); mailbox1.fetch_targeted( key.clone(), non_empty_vec![peers[1].clone(), peers[2].clone()], ); let started_key = started.recv().await.expect("delivery did not start"); assert_eq!(started_key, key); first_gate_sender.send(()).unwrap(); wait_for_blocked(&context, &oracle, &peers[0], &peers[1]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await; oracle.manager().track( 1, Set::try_from([peers[0].clone(), peers[2].clone()]).unwrap(), ); let started_key = started.recv().await.expect("retry delivery did not start"); assert_eq!(started_key, key); second_gate_sender.send(()).unwrap(); let (key_actual, value) = cons_out1.recv().await.expect("consumer channel closed"); assert_eq!(key_actual, key); assert_eq!(value, data); }); } async fn run_pending_invalid_delivery_race( context: &deterministic::Context, validation_first: bool, ) { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(context, &[1, 2]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; let key = Key(1); let mut prod2 = Producer::default(); prod2.insert(key.clone(), Bytes::from("data for key 1")); let (mut gate_sender, gate_receiver) = oneshot::channel(); let (cons1, mut cons_out1, mut started) = BlockingConsumer::new(context.child("consumer"), vec![(gate_receiver, false)]); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); mailbox1.fetch(key.clone()); let started_key = started.recv().await.expect("delivery did not start"); assert_eq!(started_key, key); if validation_first { gate_sender.send(()).unwrap(); wait_for_blocked(context, &oracle, &peers[0], &peers[1]).await; mailbox1.retain(|_, _| false); let blocked = oracle.blocked().await.unwrap(); assert_eq!(blocked.len(), 1); assert_eq!(blocked[0].0, peers[0]); assert_eq!(blocked[0].1, peers[1]); } else { mailbox1.retain(|_, _| false); gate_sender.closed().await; assert!(oracle.blocked().await.unwrap().is_empty()); } select! { _ = cons_out1.recv() => panic!("unexpected event"), _ = context.sleep(Duration::from_millis(100)) => {}, }; } #[test_traced] fn test_retain_pending_invalid_delivery_race() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { run_pending_invalid_delivery_race(&context, false).await; run_pending_invalid_delivery_race(&context, true).await; }); } /// Tests that pruning a fetch leaves the consumer untouched. /// This test initiates a fetch and immediately prunes it, verifying /// that the consumer does not receive any event. #[test_traced] fn test_retain_drops_fetch() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (oracle, mut schemes, _peers, mut connections) = setup_network_and_peers(&context, &[1]).await; let (cons1, mut cons_out1) = consumer(); let prod1 = Producer::default(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, prod1, ); let key = Key(3); mailbox1.fetch(key.clone()); let canceled = key.clone(); mailbox1.retain(move |key, _| key != &canceled); select! { _ = cons_out1.recv() => panic!("unexpected event"), _ = context.sleep(Duration::from_millis(100)) => {}, }; }); } /// Tests fetching data from a peer when some peers lack the data. /// This test sets up three peers, where Peer 1 requests data that only Peer 3 has. /// It verifies that the resolver retries with another peer and successfully /// delivers the data to Peer 1's consumer. #[test_traced] fn test_peer_no_data() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2, 3]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await; let prod1 = Producer::default(); let prod2 = Producer::default(); let mut prod3 = Producer::default(); let key = Key(3); prod3.insert(key.clone(), Bytes::from("data for key 3")); let (cons1, mut cons_out1) = consumer(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, prod1, ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); let scheme = schemes.remove(0); let _mailbox3 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod3, ); mailbox1.fetch(key.clone()); let (key_actual, value) = cons_out1.recv().await.unwrap(); assert_eq!(key_actual, key); assert_eq!(value, Bytes::from("data for key 3")); }); } /// Tests fetching when no peers are available. /// This test sets up a single peer with an empty peer provider (no peers). /// It initiates a fetch, waits beyond the retry timeout, prunes the fetch, /// and verifies that the consumer receives a failure notification. #[test_traced] fn test_no_peers_available() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (oracle, mut schemes, _peers, mut connections) = setup_network_and_peers(&context, &[1]).await; let (cons1, mut cons_out1) = consumer(); let prod1 = Producer::default(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, prod1, ); mailbox1.fetch(Key(4)); context.sleep(Duration::from_secs(5)).await; // With no peers, no event should arrive select! { _ = cons_out1.recv() => panic!("Fetch should have failed due to no peers"), _ = context.sleep(Duration::from_millis(100)) => {}, }; }); } /// Tests that fetches issued before the first peer set arrives stay pending and complete once /// the initial update is tracked. #[test_traced] fn test_fetch_before_initial_peer_set_waits_for_update() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (network, mut oracle) = Network::new( context.child("network"), commonware_p2p::simulated::Config { max_size: 1024 * 1024, disconnect_on_block: true, tracked_peer_sets: NZUsize!(1), }, ); network.start(); let mut schemes = [1_u64, 2] .into_iter() .map(PrivateKey::from_seed) .collect::>(); schemes.sort_by_key(|s| s.public_key()); let peers: Vec = schemes.iter().map(|s| s.public_key()).collect(); let mut connections = Vec::new(); for peer in &peers { let (sender, receiver) = oracle .control(peer.clone()) .register(0, Quota::per_second(RATE_LIMIT)) .await .unwrap(); connections.push((sender, receiver)); } add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; let key = Key(2); let mut prod2 = Producer::default(); prod2.insert(key.clone(), Bytes::from("data for key 2")); let (cons1, mut cons_out1) = consumer(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); mailbox1.fetch(key.clone()); select! { event = cons_out1.recv() => { panic!("fetch should wait for the initial peer set, got {event:?}"); }, _ = context.sleep(Duration::from_millis(200)) => {}, }; oracle .manager() .track(0, Set::try_from(peers.clone()).unwrap()); let (key_actual, value) = cons_out1.recv().await.unwrap(); assert_eq!(key_actual, key); assert_eq!(value, Bytes::from("data for key 2")); }); } /// Tests that concurrent fetches are handled correctly. /// Also tests that the peer can recover from having no peers available. /// Also tests that the peer can get data from multiple peers that have different sets of data. #[test_traced] fn test_concurrent_fetch_requests() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2, 3]).await; let key2 = Key(2); let key3 = Key(3); let mut prod2 = Producer::default(); prod2.insert(key2.clone(), Bytes::from("data for key 2")); let mut prod3 = Producer::default(); prod3.insert(key3.clone(), Bytes::from("data for key 3")); let (cons1, mut cons_out1) = consumer(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); let scheme = schemes.remove(0); let _mailbox3 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod3, ); // Add choppy links between the requester and the two producers add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 1).await; add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 2).await; // Run the fetches multiple times to ensure that the peer tries both of its peers for _ in 0..10 { // Initiate concurrent fetches. mailbox1.fetch(key2.clone()); mailbox1.fetch(key3.clone()); // Collect both events without assuming order let mut events = Vec::new(); events.push(cons_out1.recv().await.expect("Consumer channel closed")); events.push(cons_out1.recv().await.expect("Consumer channel closed")); // Check that both keys were successfully fetched let mut found_key2 = false; let mut found_key3 = false; for (key_actual, value) in events { if key_actual == key2 { assert_eq!(value, Bytes::from("data for key 2")); found_key2 = true; } else if key_actual == key3 { assert_eq!(value, Bytes::from("data for key 3")); found_key3 = true; } else { panic!("Unexpected key received"); } } assert!(found_key2 && found_key3,); } }); } /// Tests that pruning an inactive fetch has no effect. /// Prunes a key before, after, and during the fetch process. #[test_traced] fn test_retain_drops_key() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; let key = Key(6); let mut prod2 = Producer::default(); prod2.insert(key.clone(), Bytes::from("data for key 6")); let (cons1, mut cons_out1) = consumer(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); // Prune before sending the fetch, expecting no effect. let canceled = key.clone(); mailbox1.retain(move |key, _| key != &canceled); select! { _ = cons_out1.recv() => { panic!("unexpected event"); }, _ = context.sleep(Duration::from_millis(100)) => {}, }; // Initiate fetch and wait for data to be delivered mailbox1.fetch(key.clone()); let (key_actual, value) = cons_out1.recv().await.unwrap(); assert_eq!(key_actual, key); assert_eq!(value, Bytes::from("data for key 6")); // Attempt to prune after data has been delivered, expecting no effect let canceled = key.clone(); mailbox1.retain(move |key, _| key != &canceled); select! { _ = cons_out1.recv() => { panic!("unexpected event"); }, _ = context.sleep(Duration::from_millis(100)) => {}, }; // Initiate and prune another fetch. let key = Key(7); mailbox1.fetch(key.clone()); let canceled = key.clone(); mailbox1.retain(move |key, _| key != &canceled); // No event should arrive after pruning. select! { _ = cons_out1.recv() => panic!("unexpected event"), _ = context.sleep(Duration::from_millis(100)) => {}, }; }); } /// Tests that a peer is blocked after delivering invalid data, /// preventing further fetches from that peer. #[test_traced] fn test_blocking_peer() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2, 3]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await; add_link(&mut oracle, LINK.clone(), &peers, 1, 2).await; let key_a = Key(1); let key_b = Key(2); let invalid_data_a = Bytes::from("invalid for A"); let valid_data_a = Bytes::from("valid for A"); let valid_data_b = Bytes::from("valid for B"); // Set up producers let mut prod2 = Producer::default(); prod2.insert(key_a.clone(), invalid_data_a.clone()); prod2.insert(key_b.clone(), valid_data_b.clone()); let mut prod3 = Producer::default(); prod3.insert(key_a.clone(), valid_data_a.clone()); // Set up consumer for Peer1 with expected values let (mut cons1, mut cons_out1) = consumer(); cons1.add_expected(key_a.clone(), valid_data_a.clone()); cons1.add_expected(key_b.clone(), valid_data_b.clone()); // Spawn actors let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); let scheme = schemes.remove(0); let _mailbox3 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod3, ); // Fetch keyA multiple times to ensure that Peer2 is blocked. for _ in 0..20 { // Fetch keyA mailbox1.fetch(key_a.clone()); // Wait for success event for keyA let (key_actual, value) = cons_out1.recv().await.unwrap(); assert_eq!(key_actual, key_a); assert_eq!(value, valid_data_a); } // Fetch keyB mailbox1.fetch(key_b.clone()); // Wait for some time (longer than retry timeout) context.sleep(Duration::from_secs(5)).await; // No success event should be received for keyB since the only peer with valid data is blocked select! { _ = cons_out1.recv() => panic!("unexpected event"), _ = context.sleep(Duration::from_millis(100)) => {}, }; // Prune the fetch for keyB. let canceled = key_b.clone(); mailbox1.retain(move |key, _| key != &canceled); // Check oracle let blocked = oracle.blocked().await.unwrap(); assert_eq!(blocked.len(), 1); assert_eq!(blocked[0].0, peers[0]); assert_eq!(blocked[0].1, peers[1]); }); } /// Tests that duplicate fetches for the same key are handled properly. /// The test verifies that when the same key is fetched multiple times, /// the data is correctly delivered once without errors. #[test_traced] fn test_duplicate_fetch_key() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; let key = Key(5); let mut prod2 = Producer::default(); prod2.insert(key.clone(), Bytes::from("data for key 5")); let (cons1, mut cons_out1) = consumer(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); // Send duplicate fetches for the same key. mailbox1.fetch(key.clone()); mailbox1.fetch(key.clone()); // Should receive the data only once let (key_actual, value) = cons_out1.recv().await.unwrap(); assert_eq!(key_actual, key); assert_eq!(value, Bytes::from("data for key 5")); // Make sure we don't receive a second event for the duplicate fetch select! { _ = cons_out1.recv() => { panic!("Unexpected second event received for duplicate fetch"); }, _ = context.sleep(Duration::from_millis(500)) => { // This is expected - no additional events should be produced }, }; }); } /// Tests that changing peer sets is handled correctly using the update channel. /// This test verifies that when the peer set changes from peer A to peer B, /// the resolver correctly adapts and fetches from the new peer. #[test_traced] fn test_changing_peer_sets() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2, 3]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await; let key1 = Key(1); let key2 = Key(2); let mut prod2 = Producer::default(); prod2.insert(key1.clone(), Bytes::from("data from peer 2")); let mut prod3 = Producer::default(); prod3.insert(key2.clone(), Bytes::from("data from peer 3")); let (cons1, mut cons_out1) = consumer(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); // Fetch key1 from peer 2 mailbox1.fetch(key1.clone()); // Wait for successful fetch let (key_actual, value) = cons_out1.recv().await.unwrap(); assert_eq!(key_actual, key1); assert_eq!(value, Bytes::from("data from peer 2")); // Change peer set to include peer 3 let scheme = schemes.remove(0); let _mailbox3 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod3, ); // Need to wait for the peer set change to propagate context.sleep(Duration::from_millis(200)).await; // Fetch key2 from peer 3 mailbox1.fetch(key2.clone()); // Wait for successful fetch let (key_actual, value) = cons_out1.recv().await.unwrap(); assert_eq!(key_actual, key2); assert_eq!(value, Bytes::from("data from peer 3")); }); } #[test_traced] fn test_fetch_targeted() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2, 3]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await; let key = Key(1); let invalid_data = Bytes::from("invalid data"); let valid_data = Bytes::from("valid data"); // Peer 2 has invalid data, peer 3 has valid data let mut prod2 = Producer::default(); prod2.insert(key.clone(), invalid_data.clone()); let mut prod3 = Producer::default(); prod3.insert(key.clone(), valid_data.clone()); // Consumer expects only valid_data let (mut cons1, mut cons_out1) = consumer(); cons1.add_expected(key.clone(), valid_data.clone()); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); let scheme = schemes.remove(0); let _mailbox3 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod3, ); // Wait for peer set to be established context.sleep(Duration::from_millis(100)).await; // Start fetch with targets for both peer 2 (invalid data) and peer 3 (valid data) // When peer 2 returns invalid data, only peer 2 should be removed from targets // Peer 3 should still be tried as a target and succeed mailbox1.fetch_targeted( key.clone(), non_empty_vec![peers[1].clone(), peers[2].clone()], ); // Should eventually succeed from peer 3 let (key_actual, value) = cons_out1.recv().await.unwrap(); assert_eq!(key_actual, key); assert_eq!(value, valid_data); // Verify peer 2 was blocked (sent invalid data) let blocked = oracle.blocked().await.unwrap(); assert_eq!(blocked.len(), 1); assert_eq!(blocked[0].0, peers[0]); assert_eq!(blocked[0].1, peers[1]); // Verify metrics: 1 successful fetch (from peer 3 after peer 2 was blocked) let metrics = context.encode(); assert_eq!( status_metric_total(&metrics, "actor_fetch_total", "Success"), 1 ); }); } #[test_traced] fn test_fetch_targeted_no_fallback() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2, 3, 4]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await; let key = Key(1); // Only peer 4 has the data, peers 2 and 3 don't let mut prod4 = Producer::default(); prod4.insert(key.clone(), Bytes::from("data from peer 4")); let (cons1, mut cons_out1) = consumer(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), Producer::default(), // no data ); let scheme = schemes.remove(0); let _mailbox3 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), Producer::default(), // no data ); let scheme = schemes.remove(0); let _mailbox4 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod4, ); // Wait for peer set to be established context.sleep(Duration::from_millis(100)).await; // Start fetch with targets for peers 2 and 3 (both don't have data) // Peer 4 has data but is NOT a target - it should NEVER be tried mailbox1.fetch_targeted( key.clone(), non_empty_vec![peers[1].clone(), peers[2].clone()], ); // Wait enough time for targets to fail and retry multiple times // The fetch should not succeed because peer 4 (which has data) is not targeted select! { event = cons_out1.recv() => { panic!("Fetch should not succeed, but got: {event:?}"); }, _ = context.sleep(Duration::from_secs(3)) => { // Expected: no success event because peer 4 is not targeted }, }; }); } #[test_traced] fn test_fetch_all_targeted() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2, 3, 4]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await; let key1 = Key(1); let key2 = Key(2); let key3 = Key(3); // Peer 2 has key1 let mut prod2 = Producer::default(); prod2.insert(key1.clone(), Bytes::from("data for key 1")); // Peer 3 has key3 let mut prod3 = Producer::default(); prod3.insert(key3.clone(), Bytes::from("data for key 3")); // Peer 4 has key2 let mut prod4 = Producer::default(); prod4.insert(key2.clone(), Bytes::from("data for key 2")); // Consumer expects all three keys let (mut cons1, mut cons_out1) = consumer(); cons1.add_expected(key1.clone(), Bytes::from("data for key 1")); cons1.add_expected(key2.clone(), Bytes::from("data for key 2")); cons1.add_expected(key3.clone(), Bytes::from("data for key 3")); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); let scheme = schemes.remove(0); let _mailbox3 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod3, ); let scheme = schemes.remove(0); let _mailbox4 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod4, ); // Wait for peer set to be established context.sleep(Duration::from_millis(100)).await; // Fetch keys with mixed targeting: // - key1 targeted to peer 2 (has data) -> should succeed from target // - key2 targeted to peer 4 (has data) -> should succeed from target // - key3 no targeting -> fetched from any peer (peer 3 has it) mailbox1.fetch_all_targeted(vec![ (key1.clone(), non_empty_vec![peers[1].clone()]), // peer 2 has key1 (key2.clone(), non_empty_vec![peers[3].clone()]), // peer 4 has key2 ]); mailbox1.fetch(key3.clone()); // no targeting for key3 // Collect all three events let mut results = HashMap::new(); for _ in 0..3 { let (key, value) = cons_out1.recv().await.unwrap(); results.insert(key, value); } // Verify all keys received correct data assert_eq!(results.len(), 3); assert_eq!(results.get(&key1).unwrap(), &Bytes::from("data for key 1")); assert_eq!(results.get(&key2).unwrap(), &Bytes::from("data for key 2")); assert_eq!(results.get(&key3).unwrap(), &Bytes::from("data for key 3")); // Verify metrics: 3 successful fetches let metrics = context.encode(); assert_eq!( status_metric_total(&metrics, "actor_fetch_total", "Success"), 3 ); }); } /// Tests that calling fetch() on an in-progress targeted fetch clears the targets, /// allowing the fetch to succeed from any available peer. #[test_traced] fn test_fetch_clears_targets() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2, 3]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await; let key = Key(1); let valid_data = Bytes::from("valid data"); // Peer 2 has no data, peer 3 has the data let mut prod3 = Producer::default(); prod3.insert(key.clone(), valid_data.clone()); let (cons1, mut cons_out1) = consumer(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), Producer::default(), // no data ); let scheme = schemes.remove(0); let _mailbox3 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod3, ); // Wait for peer set to be established context.sleep(Duration::from_millis(100)).await; // Start fetch with target for peer 2 only (who doesn't have data) mailbox1.fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()]); // Wait for the targeted fetch to fail a few times context.sleep(Duration::from_millis(500)).await; // Call fetch() which should clear the targets and allow fallback to any peer mailbox1.fetch(key.clone()); // Should now succeed from peer 3 (who has data but wasn't originally targeted) let (key_actual, value) = cons_out1.recv().await.unwrap(); assert_eq!(key_actual, key); assert_eq!(value, valid_data); }); } #[test_traced] fn test_fetch_targeted_does_not_restrict_all() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2, 3]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await; let key = Key(1); let valid_data = Bytes::from("valid data"); // Peer 2 has no data, peer 3 has the data let mut prod3 = Producer::default(); prod3.insert(key.clone(), valid_data.clone()); let (cons1, mut cons_out1) = consumer(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), Producer::default(), // no data ); let scheme = schemes.remove(0); let _mailbox3 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod3, ); // Wait for peer set to be established context.sleep(Duration::from_millis(100)).await; // Start fetch without targets (can try any peer) mailbox1.fetch(key.clone()); // Wait a bit for the fetch to start context.sleep(Duration::from_millis(50)).await; // Call fetch_targeted with peer 2 only (who doesn't have data) // This should NOT restrict the existing "all" fetch mailbox1.fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()]); // Should still succeed from peer 3 (who has data but wasn't in the targeted call) // because the original fetch was "all" and shouldn't be restricted let (key_actual, value) = cons_out1.recv().await.unwrap(); assert_eq!(key_actual, key); assert_eq!(value, valid_data); }); } #[test_traced] fn test_retain() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2]).await; let key = Key(5); let mut prod2 = Producer::default(); prod2.insert(key.clone(), Bytes::from("data for key 5")); let (cons1, mut cons_out1) = consumer(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); // Retain before fetching should have no effect mailbox1.retain(|_, _| true); select! { _ = cons_out1.recv() => { panic!("unexpected event"); }, _ = context.sleep(Duration::from_millis(100)) => {}, }; // Start a fetch (no link, so fetch stays in-flight) mailbox1.fetch(key.clone()); // Retain with predicate that excludes the key. This must clean up // the in-flight entry for the key. let key_clone = key.clone(); mailbox1.retain(move |key, _| key != &key_clone); // Now add link so fetches can complete add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; // Fetch same key again, if the in-flight entry wasn't cleaned up, this would // be treated as a duplicate and silently ignored mailbox1.fetch(key.clone()); // Should succeed let (key_actual, value) = cons_out1.recv().await.unwrap(); assert_eq!(key_actual, key); assert_eq!(value, Bytes::from("data for key 5")); }); } #[test_traced] fn test_retain_uses_subscribers() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2]).await; let key = Key(5); let mut prod2 = Producer::default(); prod2.insert(key.clone(), Bytes::from("data for key 5")); let (cons1, mut cons_out1): (Consumer, _) = Consumer::new(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); let dropped_subscriber = SubscriberTag(50); let kept_subscriber = SubscriberTag(51); mailbox1.fetch(Fetch { key: key.clone(), subscriber: dropped_subscriber, }); mailbox1.fetch(Fetch { key: key.clone(), subscriber: kept_subscriber.clone(), }); context.sleep(Duration::from_millis(100)).await; mailbox1.retain(move |_, subscriber| subscriber == &kept_subscriber); context.sleep(Duration::from_millis(100)).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; let (key_actual, value) = cons_out1.recv().await.unwrap(); assert_eq!(key_actual, key); assert_eq!(value, Bytes::from("data for key 5")); }); } #[test_traced] fn test_deliver_receives_subscribers() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2]).await; let key = Key(5); let mut prod2 = Producer::default(); prod2.insert(key.clone(), Bytes::from("data for key 5")); let (cons1, mut cons_out1) = SubscriberRecordingConsumer::new(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); let first_subscriber = SubscriberTag(50); let second_subscriber = SubscriberTag(51); mailbox1.fetch(Fetch { key: key.clone(), subscriber: second_subscriber.clone(), }); mailbox1.fetch(Fetch { key: key.clone(), subscriber: first_subscriber.clone(), }); add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; let (delivery, value) = cons_out1.recv().await.unwrap(); assert_eq!( delivery, Delivery { key, subscribers: non_empty_vec![first_subscriber, second_subscriber], } ); assert_eq!(value, Bytes::from("data for key 5")); }); } #[test_traced] fn test_deliver_receives_multiple_subscribers() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2]).await; let key = Key(5); let mut prod2 = Producer::default(); prod2.insert(key.clone(), Bytes::from("data for key 5")); let (cons1, mut cons_out1) = SubscriberRecordingConsumer::new(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); let first_subscriber = SubscriberTag(49); let second_subscriber = SubscriberTag(50); mailbox1.fetch(Fetch { key: key.clone(), subscriber: first_subscriber.clone(), }); mailbox1.fetch(Fetch { key: key.clone(), subscriber: second_subscriber.clone(), }); add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; let (delivery, value) = cons_out1.recv().await.unwrap(); assert_eq!( delivery, Delivery { key: key.clone(), subscribers: non_empty_vec![first_subscriber, second_subscriber], } ); assert_eq!(value, Bytes::from("data for key 5")); }); } #[test_traced] fn test_fetch_during_validation_reuses_response_after_success() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; let key = Key(5); let first_response = Bytes::from("data for key 5"); let second_response = Bytes::from("refetched data for key 5"); let mut prod2 = SequencedProducer::default(); prod2.insert( key.clone(), [first_response.clone(), second_response.clone()], ); let prod2_observer = prod2.clone(); let (first_gate_sender, first_gate_receiver) = oneshot::channel(); let (second_gate_sender, second_gate_receiver) = oneshot::channel(); let (cons1, mut deliveries, mut started) = BlockingSubscriberRecordingConsumer::new( context.child("consumer"), vec![(first_gate_receiver, true), (second_gate_receiver, true)], ); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor_with_producer( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); let first_subscriber = SubscriberTag(49); let second_subscriber = SubscriberTag(50); mailbox1.fetch(Fetch { key: key.clone(), subscriber: first_subscriber.clone(), }); let delivery = started.recv().await.expect("delivery did not start"); assert_eq!( delivery, Delivery { key: key.clone(), subscribers: non_empty_vec![first_subscriber.clone()], } ); mailbox1.fetch(Fetch { key: key.clone(), subscriber: second_subscriber.clone(), }); context.sleep(Duration::from_millis(100)).await; assert_eq!( prod2_observer.remaining(&key), vec![second_response.clone()] ); first_gate_sender.send(()).unwrap(); let (delivery, value) = deliveries.recv().await.expect("consumer channel closed"); assert_eq!( delivery, Delivery { key: key.clone(), subscribers: non_empty_vec![first_subscriber], } ); assert_eq!(value, first_response); let delivery = select! { delivery = started.recv() => delivery.expect("second delivery did not start"), _ = context.sleep(Duration::from_secs(2)) => { panic!("late subscriber was not delivered"); }, }; assert_eq!( delivery, Delivery { key: key.clone(), subscribers: non_empty_vec![second_subscriber.clone()], } ); second_gate_sender.send(()).unwrap(); let (delivery, value) = deliveries.recv().await.expect("consumer channel closed"); assert_eq!( delivery, Delivery { key: key.clone(), subscribers: non_empty_vec![second_subscriber], } ); assert_eq!(value, first_response); assert_eq!(prod2_observer.remaining(&key), vec![second_response]); }); } #[test_traced] fn test_late_subscriber_delivery_ignores_unrelated_waiter() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2, 3]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await; let blocked_key = Key(4); let waiting_key = Key(5); let main_key = Key(6); let data = Bytes::from("data for key 6"); let mut prod2 = Producer::default(); prod2.insert(blocked_key.clone(), Bytes::from("bad data")); let mut prod3 = Producer::default(); prod3.insert(main_key.clone(), data.clone()); let (first_gate_sender, first_gate_receiver) = oneshot::channel(); let (second_gate_sender, second_gate_receiver) = oneshot::channel(); let (cons1, mut deliveries, mut started) = BlockingSubscriberRecordingConsumer::new( context.child("consumer"), vec![(first_gate_receiver, false), (second_gate_receiver, true)], ); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); let scheme = schemes.remove(0); let _mailbox3 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod3, ); mailbox1.fetch(Fetch { key: blocked_key.clone(), subscriber: SubscriberTag(1), }); started .recv() .await .expect("blocking delivery did not start"); first_gate_sender.send(()).unwrap(); wait_for_blocked(&context, &oracle, &peers[0], &peers[1]).await; mailbox1.fetch_targeted( Fetch { key: waiting_key, subscriber: SubscriberTag(2), }, non_empty_vec![peers[1].clone()], ); context.sleep(Duration::from_millis(100)).await; let first_subscriber = SubscriberTag(3); let second_subscriber = SubscriberTag(4); mailbox1.fetch(Fetch { key: main_key.clone(), subscriber: first_subscriber.clone(), }); let delivery = started.recv().await.expect("delivery did not start"); assert_eq!( delivery, Delivery { key: main_key.clone(), subscribers: non_empty_vec![first_subscriber.clone()], } ); mailbox1.fetch(Fetch { key: main_key.clone(), subscriber: second_subscriber.clone(), }); context.sleep(Duration::from_millis(100)).await; second_gate_sender.send(()).unwrap(); let (delivery, value) = deliveries.recv().await.expect("consumer channel closed"); assert_eq!( delivery, Delivery { key: main_key.clone(), subscribers: non_empty_vec![first_subscriber], } ); assert_eq!(value, data); let delivery = select! { delivery = started.recv() => delivery.expect("second delivery did not start"), _ = context.sleep(Duration::from_secs(2)) => { panic!("late subscriber was not delivered while an unrelated waiter was armed"); }, }; assert_eq!( delivery, Delivery { key: main_key, subscribers: non_empty_vec![second_subscriber], } ); }); } #[test_traced] fn test_deliver_receives_distinct_subscriber_type() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2]).await; let key = Key(5); let mut prod2 = Producer::default(); prod2.insert(key.clone(), Bytes::from("data for key 5")); let (cons1, mut cons_out1) = SubscriberRecordingConsumer::new(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); let subscriber = SubscriberTag(50); let retained = subscriber.clone(); mailbox1.fetch(Fetch { key: key.clone(), subscriber: subscriber.clone(), }); context.sleep(Duration::from_millis(100)).await; mailbox1.retain(move |_, subscriber| subscriber == &retained); context.sleep(Duration::from_millis(100)).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; let (delivery, value) = cons_out1.recv().await.unwrap(); assert_eq!( delivery, Delivery { key, subscribers: non_empty_vec![subscriber], } ); assert_eq!(value, Bytes::from("data for key 5")); }); } #[test_traced] fn test_deliver_receives_single_subscriber() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2]).await; let key = Key(5); let mut prod2 = Producer::default(); prod2.insert(key.clone(), Bytes::from("data for key 5")); let (cons1, mut cons_out1) = SubscriberRecordingConsumer::new(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); let subscriber = SubscriberTag(50); mailbox1.fetch(Fetch { key: key.clone(), subscriber: subscriber.clone(), }); add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; let (delivery, value) = cons_out1.recv().await.unwrap(); assert_eq!( delivery, Delivery { key, subscribers: non_empty_vec![subscriber], } ); assert_eq!(value, Bytes::from("data for key 5")); }); } #[test_traced] fn test_retain_drops_all() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2]).await; // No link yet - fetch will stay in-flight let key = Key(6); let mut prod2 = Producer::default(); prod2.insert(key.clone(), Bytes::from("data for key 6")); let (cons1, mut cons_out1) = consumer(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); // Pruning before fetching should have no effect. mailbox1.retain(|_, _| false); select! { _ = cons_out1.recv() => { panic!("unexpected event"); }, _ = context.sleep(Duration::from_millis(100)) => {}, }; // Start a fetch (no link, so fetch stays in-flight) mailbox1.fetch(key.clone()); // Prune all fetches. mailbox1.retain(|_, _| false); // Now add link so fetches can complete add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; // Fetch same key again, if the in-flight entry wasn't cleaned up, this would // be treated as a duplicate and silently ignored mailbox1.fetch(key.clone()); // Should succeed let (key_actual, value) = cons_out1.recv().await.unwrap(); assert_eq!(key_actual, key); assert_eq!(value, Bytes::from("data for key 6")); }); } /// Tests that when a peer is rate-limited, the fetcher spills over to another peer. /// With 2 peers and rate limit of 1/sec each, 2 requests issued simultaneously should /// both complete immediately (one to each peer) without waiting for rate limit reset. #[test_traced] fn test_rate_limit_spillover() { let executor = deterministic::Runner::timed(Duration::from_secs(30)); executor.start(|context| async move { // Use a very restrictive rate limit: 1 request per second per peer let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers_with_rate_limit( &context, &[1, 2, 3], Quota::per_second(NZU32!(1)), ) .await; // Add links between peer 1 and both peer 2 and peer 3 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await; // Both peer 2 and peer 3 have the same data let mut prod2 = Producer::default(); let mut prod3 = Producer::default(); prod2.insert(Key(0), Bytes::from("data for key 0")); prod2.insert(Key(1), Bytes::from("data for key 1")); prod3.insert(Key(0), Bytes::from("data for key 0")); prod3.insert(Key(1), Bytes::from("data for key 1")); let (cons1, mut cons_out1) = consumer(); // Set up peer 1 (the requester) let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); // Set up peer 2 (has data) let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); // Set up peer 3 (also has data) let scheme = schemes.remove(0); let _mailbox3 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod3, ); // Wait for peer set to be established context.sleep(Duration::from_millis(100)).await; let start = context.current(); // Issue 2 fetches rapidly. // With rate limit of 1/sec per peer and 2 peers, both should complete // immediately via spill-over (one request to each peer) mailbox1.fetch(Key(0)); mailbox1.fetch(Key(1)); // Collect results let mut results = HashMap::new(); for _ in 0..2 { let (key, value) = cons_out1.recv().await.unwrap(); results.insert(key.clone(), value); } // Verify both keys were fetched successfully assert_eq!(results.len(), 2); assert_eq!( results.get(&Key(0)).unwrap(), &Bytes::from("data for key 0") ); assert_eq!( results.get(&Key(1)).unwrap(), &Bytes::from("data for key 1") ); // Verify it completed quickly (well under 1 second) - proves spill-over worked // Without spill-over, the second request would wait ~1 second for rate limit reset let elapsed = context.current().duration_since(start).unwrap(); assert!( elapsed < Duration::from_millis(500), "Expected quick completion via spill-over, but took {elapsed:?}" ); }); } /// Tests that rate limiting causes retries to eventually succeed after the rate limit resets. /// This test uses a single peer with a restrictive rate limit and verifies that /// fetches eventually complete after waiting for the rate limit to reset. #[test_traced] fn test_rate_limit_retry_after_reset() { let executor = deterministic::Runner::timed(Duration::from_secs(30)); executor.start(|context| async move { // Use a restrictive rate limit: 1 request per second let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers_with_rate_limit( &context, &[1, 2], Quota::per_second(NZU32!(1)), ) .await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; // Peer 2 has data for multiple keys let mut prod2 = Producer::default(); prod2.insert(Key(1), Bytes::from("data for key 1")); prod2.insert(Key(2), Bytes::from("data for key 2")); prod2.insert(Key(3), Bytes::from("data for key 3")); let (cons1, mut cons_out1) = consumer(); let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); // Wait for peer set to be established context.sleep(Duration::from_millis(100)).await; let start = context.current(); // Issue 3 fetches to a single peer with rate limit of 1/sec. // Only 1 can be sent immediately, the others must wait for rate limit reset mailbox1.fetch(Key(1)); mailbox1.fetch(Key(2)); mailbox1.fetch(Key(3)); // All 3 should eventually succeed (after rate limit resets) let mut results = HashMap::new(); for _ in 0..3 { let (key, value) = cons_out1.recv().await.unwrap(); results.insert(key.clone(), value); } assert_eq!(results.len(), 3); for i in 1..=3 { assert_eq!( results.get(&Key(i)).unwrap(), &Bytes::from(format!("data for key {}", i)) ); } // Verify it took significant time due to rate limiting // With 3 requests at 1/sec to a single peer, requests 2 and 3 must wait // for rate limit resets (~1 second each), so total should be > 2 seconds let elapsed = context.current().duration_since(start).unwrap(); assert!( elapsed > Duration::from_secs(2), "Expected rate limiting to cause delay > 2s, but took {elapsed:?}" ); }); } /// Tests that the resolver never sends fetches to itself (me exclusion). /// Even when the local peer has the data in its producer, it should fetch from /// another peer instead. #[test_traced] fn test_self_exclusion() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; let key = Key(1); let data = Bytes::from("shared data"); // Both peers have the data - peer 1 (requester) and peer 2 let mut prod1 = Producer::default(); prod1.insert(key.clone(), data.clone()); let mut prod2 = Producer::default(); prod2.insert(key.clone(), data.clone()); let (cons1, mut cons_out1) = consumer(); // Set up peer 1 with `me` set - it has the data but should NOT fetch from itself let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, prod1, // peer 1 has the data ); // Set up peer 2 - also has the data let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); // Wait for peer set to be established context.sleep(Duration::from_millis(100)).await; // Fetch the key - should get it from peer 2, not from self mailbox1.fetch(key.clone()); // Should succeed (from peer 2) let (key_actual, value) = cons_out1.recv().await.unwrap(); assert_eq!(key_actual, key); assert_eq!(value, data); }); } #[test_traced] fn test_fetch_uses_primary_peers_only() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (network, oracle) = Network::new( context.child("network"), commonware_p2p::simulated::Config { max_size: 1024 * 1024, disconnect_on_block: true, tracked_peer_sets: NZUsize!(1), }, ); network.start(); let schemes: Vec = [1u64, 2, 3] .into_iter() .map(PrivateKey::from_seed) .collect(); let peers: Vec = schemes.iter().map(|s| s.public_key()).collect(); let mut schemes = schemes; let mut connections = Vec::new(); for peer in &peers { let (sender, receiver) = oracle .control(peer.clone()) .register(0, Quota::per_second(RATE_LIMIT)) .await .unwrap(); connections.push((sender, receiver)); } // Topology: peer 1 (requester) linked to peers 2 and 3. // Peer 2 is primary (no data), peer 3 is secondary (has data). // Fetch should only query primary peers, so the request must time out. let mut oracle = oracle; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await; oracle.manager().track( 1, TrackedPeers::new( Set::try_from([peers[1].clone()]).unwrap(), Set::try_from([peers[2].clone()]).unwrap(), ), ); context.sleep(Duration::from_millis(100)).await; let key = Key(1); let data = Bytes::from("secondary only data"); let (cons1, mut cons_out1) = consumer(); // Peer 1: the requester, has no data. let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); // Peer 2: primary, has no data. let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), Producer::default(), ); // Peer 3: secondary, has the data. Should not be queried. let mut prod3 = Producer::default(); prod3.insert(key.clone(), data); let scheme = schemes.remove(0); let _mailbox3 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod3, ); // Fetch should time out because the only peer with data (peer 3) // is secondary and won't be queried. mailbox1.fetch(key.clone()); select! { event = cons_out1.recv() => { panic!("fetch should not succeed from a secondary peer, got: {event:?}"); }, _ = context.sleep(Duration::from_secs(2)) => {}, } }); } #[test_traced] fn test_fetch_uses_latest_primary_set_only() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (network, oracle) = Network::new( context.child("network"), commonware_p2p::simulated::Config { max_size: 1024 * 1024, disconnect_on_block: true, tracked_peer_sets: NZUsize!(2), }, ); network.start(); let schemes: Vec = [1u64, 2, 3] .into_iter() .map(PrivateKey::from_seed) .collect(); let peers: Vec = schemes.iter().map(|s| s.public_key()).collect(); let mut schemes = schemes; let mut connections = Vec::new(); for peer in &peers { let (sender, receiver) = oracle .control(peer.clone()) .register(0, Quota::per_second(RATE_LIMIT)) .await .unwrap(); connections.push((sender, receiver)); } let mut oracle = oracle; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await; // Keep the requester tracked across the cutover so the fetch path itself remains // active, while peer 2 is retained only through the overlap window after peer 3 // becomes the newest primary set. oracle .manager() .track( 0, Set::try_from([peers[0].clone(), peers[1].clone()]).unwrap(), ); context.sleep(Duration::from_millis(100)).await; let key = Key(7); let targeted_key = Key(8); let data = Bytes::from("old primary data"); let (cons1, mut cons_out1) = consumer(); // Peer 1: requester. let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); // Peer 2: old primary, still retained in `all.primary`, has the data. let mut prod2 = Producer::default(); prod2.insert(key.clone(), data.clone()); prod2.insert(targeted_key.clone(), data); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); // Peer 3: latest primary, has no data. let scheme = schemes.remove(0); let _mailbox3 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), Producer::default(), ); context.sleep(Duration::from_millis(100)).await; // Track peer 3 as the latest primary while keeping the requester in the peer set. // Peer 2 remains in the provider's overlap window (`all.primary`), but new resolver traffic // should use only `latest.primary`. oracle .manager() .track( 1, Set::try_from([peers[0].clone(), peers[2].clone()]).unwrap(), ); context.sleep(Duration::from_millis(100)).await; mailbox1.fetch(key); select! { event = cons_out1.recv() => { panic!( "fetch should not succeed from an old primary retained only in the overlap window, got: {event:?}" ); }, _ = context.sleep(Duration::from_secs(1)) => {}, } // Explicit targets still respect the latest-primary filter. mailbox1 .fetch_targeted(targeted_key, non_empty_vec![peers[1].clone()]); select! { event = cons_out1.recv() => { panic!( "targeted fetch should not bypass the latest-primary filter, got: {event:?}" ); }, _ = context.sleep(Duration::from_secs(1)) => {}, } }); } #[test_traced] fn test_fetch_after_cutover_relies_on_latest_primary_history() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (network, oracle) = Network::new( context.child("network"), commonware_p2p::simulated::Config { max_size: 1024 * 1024, disconnect_on_block: true, tracked_peer_sets: NZUsize!(2), }, ); network.start(); let schemes: Vec = [1u64, 2, 3] .into_iter() .map(PrivateKey::from_seed) .collect(); let peers: Vec = schemes.iter().map(|s| s.public_key()).collect(); let mut schemes = schemes; let mut connections = Vec::new(); for peer in &peers { let (sender, receiver) = oracle .control(peer.clone()) .register(0, Quota::per_second(RATE_LIMIT)) .await .unwrap(); connections.push((sender, receiver)); } let mut oracle = oracle; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await; // Keep the requester in the peer set across the cutover while peer 2 remains connected // only through the overlap window after the latest primary advances to peer 3. oracle.manager().track( 0, Set::try_from([peers[0].clone(), peers[1].clone()]).unwrap(), ); context.sleep(Duration::from_millis(100)).await; let key = Key(9); let invalid_history = Bytes::from("stale overlap history"); let valid_history = Bytes::from("latest primary history"); let (mut cons1, mut cons_out1) = consumer(); cons1.add_expected(key.clone(), valid_history.clone()); // Peer 1: requester. let scheme = schemes.remove(0); let mut mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons1, Producer::default(), ); // Peer 2: old primary retained only via overlap. If queried, it would be blocked for // serving invalid history. let mut prod2 = Producer::default(); prod2.insert(key.clone(), invalid_history); let scheme = schemes.remove(0); let _mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod2, ); // Peer 3: latest primary and the only peer that should satisfy the fetch. let mut prod3 = Producer::default(); prod3.insert(key.clone(), valid_history.clone()); let scheme = schemes.remove(0); let _mailbox3 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod3, ); context.sleep(Duration::from_millis(100)).await; oracle.manager().track( 1, Set::try_from([peers[0].clone(), peers[2].clone()]).unwrap(), ); context.sleep(Duration::from_millis(100)).await; mailbox1.fetch(key.clone()); let (key_actual, value) = cons_out1.recv().await.unwrap(); assert_eq!(key_actual, key); assert_eq!(value, valid_history); assert!( oracle.blocked().await.unwrap().is_empty(), "overlap-only peers should not be queried for post-cutover history" ); }); } #[test_traced] fn test_secondary_peer_requests_are_served() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2]).await; // Topology: peer 1 is primary (has data), peer 2 is secondary (requester). // Verifies that a primary peer serves requests from secondary peers // (i.e. secondary peers can't fetch via broadcast, but their direct // requests are still answered). add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; oracle.manager().track( 1, TrackedPeers::new( Set::try_from([peers[0].clone()]).unwrap(), Set::try_from([peers[1].clone()]).unwrap(), ), ); context.sleep(Duration::from_millis(100)).await; let key = Key(9); let data = Bytes::from("served to secondary"); // Peer 1: primary, has the data. let mut prod1 = Producer::default(); prod1.insert(key.clone(), data.clone()); let scheme = schemes.remove(0); let _mailbox1 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), dummy_consumer(), prod1, ); // Peer 2: secondary, uses fetch_targeted to explicitly request from peer 1. let (mut cons2, mut cons_out2) = consumer(); cons2.add_expected(key.clone(), data.clone()); let scheme = schemes.remove(0); let mut mailbox2 = setup_and_spawn_actor( &context, oracle.manager(), oracle.control(scheme.public_key()), scheme, connections.remove(0), cons2, Producer::default(), ); mailbox2.fetch_targeted(key.clone(), non_empty_vec![peers[0].clone()]); let (key_actual, value) = cons_out2.recv().await.unwrap(); assert_eq!(key_actual, key); assert_eq!(value, data); }); } #[test_traced] fn test_shutdown_aborts_pending_delivery_without_leaked_tasks() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, mut schemes, peers, mut connections) = setup_network_and_peers(&context, &[1, 2]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; let key = Key(1); let data = Bytes::from("data for key 1"); let mut prod2 = Producer::default(); prod2.insert(key.clone(), data); let (mut gate_sender, gate_receiver) = oneshot::channel(); let (cons1, mut cons_out1, mut started) = BlockingConsumer::new(context.child("consumer"), vec![(gate_receiver, true)]); let actor_context = context.child("actor"); let scheme = schemes.remove(0); let public_key = scheme.public_key(); let (engine, mut mailbox1): (_, Mailbox) = Engine::new( actor_context.child("peer").with_attribute("index", 0), Config { peer_provider: oracle.manager(), blocker: oracle.control(public_key.clone()), consumer: cons1, producer: Producer::::default(), mailbox_size: MAILBOX_SIZE, me: Some(public_key), initial: INITIAL_DURATION, timeout: TIMEOUT, fetch_retry_timeout: FETCH_RETRY_TIMEOUT, priority_requests: false, priority_responses: false, }, ); let handle1 = engine.start(connections.remove(0)); let scheme = schemes.remove(0); let public_key = scheme.public_key(); let (engine, _mailbox2): (_, Mailbox) = Engine::new( actor_context.child("peer").with_attribute("index", 1), Config { peer_provider: oracle.manager(), blocker: oracle.control(public_key.clone()), consumer: dummy_consumer(), producer: prod2, mailbox_size: MAILBOX_SIZE, me: Some(public_key), initial: INITIAL_DURATION, timeout: TIMEOUT, fetch_retry_timeout: FETCH_RETRY_TIMEOUT, priority_requests: false, priority_responses: false, }, ); let handle2 = engine.start(connections.remove(0)); mailbox1.fetch(key.clone()); let started_key = started.recv().await.expect("delivery did not start"); assert_eq!(started_key, key); assert!(count_running_tasks(&context, "actor") > 0); handle1.abort(); handle2.abort(); context.sleep(Duration::from_millis(100)).await; select! { _ = gate_sender.closed() => {}, _ = context.sleep(Duration::from_secs(2)) => { panic!("pending delivery was not aborted"); }, }; select! { event = cons_out1.recv() => assert!(event.is_none(), "unexpected event"), _ = context.sleep(Duration::from_millis(100)) => {}, }; let running_after = count_running_tasks(&context, "actor"); assert_eq!( running_after, 0, "all actor tasks should be stopped, but {running_after} still running" ); }); } #[allow(clippy::type_complexity)] fn spawn_actors_with_handles( context: &deterministic::Context, oracle: &Oracle, schemes: Vec, connections: Vec<( Sender, Receiver, )>, consumers: Vec>, producers: Vec>, ) -> ( Vec>, Vec>, ) { let actor_context = context.child("actor"); let mut mailboxes = Vec::new(); let mut handles = Vec::new(); for (idx, ((scheme, conn), (consumer, producer))) in schemes .into_iter() .zip(connections) .zip(consumers.into_iter().zip(producers)) .enumerate() { let ctx = actor_context.child("peer").with_attribute("index", idx); let public_key = scheme.public_key(); let (engine, mailbox) = Engine::new( ctx, Config { peer_provider: oracle.manager(), blocker: oracle.control(public_key.clone()), consumer, producer, mailbox_size: MAILBOX_SIZE, me: Some(public_key), initial: INITIAL_DURATION, timeout: TIMEOUT, fetch_retry_timeout: FETCH_RETRY_TIMEOUT, priority_requests: false, priority_responses: false, }, ); handles.push(engine.start(conn)); mailboxes.push(mailbox); } (mailboxes, handles) } #[test_traced] fn test_operations_after_shutdown_do_not_panic() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let (mut oracle, schemes, peers, connections) = setup_network_and_peers(&context, &[1, 2]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; let key = Key(1); let mut prod2 = Producer::default(); prod2.insert(key.clone(), Bytes::from("data for key 1")); let (cons1, mut cons_out1) = consumer(); let (mut mailboxes, handles) = spawn_actors_with_handles( &context, &oracle, schemes, connections, vec![cons1, dummy_consumer()], vec![Producer::default(), prod2], ); // Fetch to verify network is functional mailboxes[0].fetch(key.clone()); let (_, value) = cons_out1.recv().await.unwrap(); assert_eq!(value, Bytes::from("data for key 1")); // Abort all actors for handle in handles { handle.abort(); } context.sleep(Duration::from_millis(100)).await; // All operations should not panic after shutdown // Fetch should not panic let key2 = Key(2); mailboxes[0].fetch(key2.clone()); // Retain can prune a single key after shutdown without panicking. let canceled = key2; mailboxes[0].retain(move |key, _| key != &canceled); // Retain should not panic mailboxes[0].retain(|_, _| true); // Fetch targeted should not panic mailboxes[0].fetch_targeted(Key(3), non_empty_vec![peers[1].clone()]); }); } fn clean_shutdown(seed: u64) { let cfg = deterministic::Config::default() .with_seed(seed) .with_timeout(Some(Duration::from_secs(30))); let executor = deterministic::Runner::new(cfg); executor.start(|context| async move { let (mut oracle, schemes, peers, connections) = setup_network_and_peers(&context, &[1, 2]).await; add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await; let key = Key(1); let mut prod2 = Producer::default(); prod2.insert(key.clone(), Bytes::from("data for key 1")); let (cons1, mut cons_out1) = consumer(); let (mut mailboxes, handles) = spawn_actors_with_handles( &context, &oracle, schemes, connections, vec![cons1, dummy_consumer()], vec![Producer::default(), prod2], ); // Allow tasks to start context.sleep(Duration::from_millis(100)).await; // Count running tasks under the actor prefix let running_before = count_running_tasks(&context, "actor"); assert!( running_before > 0, "at least one actor task should be running" ); // Verify network is functional mailboxes[0].fetch(key.clone()); let (_, value) = cons_out1.recv().await.unwrap(); assert_eq!(value, Bytes::from("data for key 1")); // Abort all actors for handle in handles { handle.abort(); } context.sleep(Duration::from_millis(100)).await; // Verify all actor tasks are stopped let running_after = count_running_tasks(&context, "actor"); assert_eq!( running_after, 0, "all actor tasks should be stopped, but {running_after} still running" ); }); } #[test] fn test_clean_shutdown() { for seed in 0..25 { clean_shutdown(seed); } } }