use arbitrary::Arbitrary; use bytes::Bytes; use commonware_codec::codec::FixedSize; use commonware_cryptography::{ed25519, Signer}; use commonware_p2p::{ authenticated::{ discovery, lookup::{self, Network as LookupNetwork}, }, Address, Blocker, Channel, Manager, Receiver, Recipients, Sender, }; use commonware_runtime::{ deterministic::{self, Context}, Clock, Handle, Metrics, Quota, Runner, }; use commonware_utils::{ ordered::{Map, Set}, TryCollect, NZU32, }; use rand::{seq::SliceRandom, Rng}; use std::{ collections::{HashMap, HashSet, VecDeque}, future::Future, net::{IpAddr, Ipv4Addr, SocketAddr}, sync::Arc, time::Duration, }; /// Stable identifier for a peer (index in the topology). pub type PeerId = u8; /// Immutable topology describing the entire peer cluster. pub struct Topology { /// All peers in the network. pub peers: Vec, /// Base port for peer addressing. pub base_port: u16, /// Number of tracked peer sets. pub tracked_peer_sets: usize, /// Map from public key to peer ID. pub pk_to_id: HashMap, } /// Per-peer view of the network. pub struct PeerCtx { /// This peer's ID. pub id: PeerId, /// This peer's info (keys and address). pub info: PeerInfo, /// Shared topology. pub topo: Arc, } const MAX_OPERATIONS: usize = 30; const MAX_PEERS: usize = 8; const MIN_PEERS: usize = 4; const MAX_MSG_SIZE: u32 = 1024 * 1024; // 1MB const MAX_INDEX: u8 = 10; const TRACKED_PEER_SETS: usize = 5; const DEFAULT_MESSAGE_BACKLOG: usize = 128; const MAX_SLEEP_DURATION_MS: u64 = 1000; /// Operations that can be performed on the p2p network during fuzzing. #[derive(Debug, Arbitrary)] pub enum NetworkOperation { /// Send a message from one peer to one or more recipients. SendMessage { /// Index of the peer sending the message. from_idx: u8, /// Number of recipients to send to (randomly selected). num_recipients: u8, /// Size of the message payload in bytes. msg_size: usize, }, /// Attempt to receive messages from any peers with pending messages. ReceiveMessages, /// Register a subset of peers under a specific index. RegisterPeers { /// Index of the peer whose oracle will register the subset. peer_idx: u8, /// The index to register the peer subset under. index: u8, /// Number of peers to include in the subset. peer_set_size: u8, }, /// Block a target peer from sending messages to the blocking peer. BlockPeer { /// Index of the peer that will block the target. peer_idx: u8, /// Index of the peer to be blocked. target_idx: u8, }, } /// Input generated by the fuzzer for testing p2p networks. #[derive(Debug)] pub struct FuzzInput { /// Random seed for deterministic execution. pub seed: u64, /// Sequence of operations to execute on the network. /// /// Length is in the range [1, MAX_OPERATIONS]. pub operations: Vec, /// Number of peers to create in the network. /// /// Must be in the range [MIN_PEERS, MAX_PEERS]. pub peers: u8, } impl<'a> Arbitrary<'a> for FuzzInput { fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { let seed = u.arbitrary()?; let num_operations = u.int_in_range(1..=MAX_OPERATIONS)?; let mut operations = Vec::with_capacity(num_operations); for _ in 0..num_operations { operations.push(u.arbitrary()?); } let peers = u.int_in_range(MIN_PEERS..=MAX_PEERS)? as u8; Ok(FuzzInput { seed, operations, peers, }) } } /// Information about a peer in the network. #[derive(Clone)] pub struct PeerInfo { /// The peer's private key. pub private_key: ed25519::PrivateKey, /// The peer's public key. pub public_key: ed25519::PublicKey, /// The network address where this peer can be reached. pub address: SocketAddr, } /// Per-peer network stack. pub struct PeerNetwork { /// Sender for sending messages to other peers. pub sender: S, /// Receiver for receiving messages from other peers. pub receiver: R, /// Oracle for registering peers, blocking peers, etc. pub oracle: O, /// Handle to the peer task. pub handle: Handle<()>, } /// All information and components for a single peer. /// /// The `network` field holds this peer's own networking stack (sender/receiver/oracle/handle). /// It is not the global network; each peer has a `PeerNetwork` instance. pub struct Peer { /// Peer information (keys and address). pub info: PeerInfo, /// Peer network stack. pub network: PeerNetwork, } /// Trait for abstracting over different p2p network implementations (Discovery, Lookup) during fuzzing. /// /// This allows the same fuzzing logic to test multiple network implementations by providing /// a common interface for network creation and peer registration. pub trait NetworkScheme: Send + 'static { /// The sender type for this network implementation. type Sender: Sender + Send; /// The receiver type for this network implementation. type Receiver: Receiver + Send; /// The oracle type for this network implementation. type Oracle: Blocker + Send; /// Creates and initializes a network instance for a single peer. /// /// # Parameters /// /// * `context` - The deterministic runtime context for this peer /// * `peer` - Per-peer view including ID, info, and shared topology /// /// # Returns /// /// A network instance for interacting with the network. fn create_network( context: Context, peer: &PeerCtx, ) -> impl Future>; /// Registers a peer set with the oracle. /// /// # Parameters /// /// * `oracle` - The network oracle to register peers with /// * `index` - The index to register this peer set under /// * `topo` - The shared topology /// * `peer_ids` - The peer IDs to register fn register_peers<'a>( oracle: &'a mut Self::Oracle, index: u64, topo: &'a Topology, peer_ids: &'a [PeerId], ) -> impl Future; } /// Discovery network implementation for fuzzing. pub struct Discovery; impl NetworkScheme for Discovery { type Sender = discovery::Sender; type Receiver = discovery::Receiver; type Oracle = discovery::Oracle; async fn create_network( mut context: Context, peer: &PeerCtx, ) -> PeerNetwork { // Collect all peer public keys for potential discovery let peer_pks = peer .topo .peers .iter() .map(|p| p.public_key.clone()) .collect::>(); // Set up bootstrappers: all peers except peer 0 bootstrap from peer 0 let bootstrappers = if peer.id > 0 { vec![( peer.topo.peers[0].public_key.clone(), SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), peer.topo.base_port).into(), )] } else { Vec::new() }; // Create config with recommended defaults let mut config = discovery::Config::recommended( peer.info.private_key.clone(), b"fuzz_namespace", peer.info.address, peer.info.address, bootstrappers, MAX_MSG_SIZE, ); // Override some settings for fuzzing environment config.mailbox_size = 100; // Small mailbox to encourage backpressure config.allow_private_ips = true; // Required for localhost testing config.tracked_peer_sets = peer.topo.tracked_peer_sets; // Create the network and oracle for controlling it let (mut network, mut oracle) = discovery::Network::new(context.with_label("fuzzed_discovery_network"), config); // Pre-register some peer subsets to seed the network // Each index gets a randomized subset of 3 peers for index in 0..peer.topo.tracked_peer_sets { let mut addrs = peer_pks.clone(); addrs.shuffle(&mut context); let subset: Set<_> = addrs[..3] .iter() .cloned() .try_collect() .expect("public keys are unique"); oracle.update(index as u64, subset).await; } let quota = Quota::per_second(NZU32!(100)); let (sender, receiver) = network.register(0, quota, DEFAULT_MESSAGE_BACKLOG); // Start the network background task let handle = network.start(); PeerNetwork { sender, receiver, oracle, handle, } } async fn register_peers<'a>( oracle: &'a mut Self::Oracle, index: u64, topo: &'a Topology, peer_ids: &'a [PeerId], ) { // Discovery only needs public keys (addresses discovered via protocol) let peer_pks: Set<_> = peer_ids .iter() .map(|&id| topo.peers[id as usize].public_key.clone()) .try_collect() .expect("public keys are unique"); let _ = oracle.update(index, peer_pks).await; } } /// Lookup network implementation for fuzzing. pub struct Lookup; impl NetworkScheme for Lookup { type Sender = lookup::Sender; type Receiver = lookup::Receiver; type Oracle = lookup::Oracle; async fn create_network( mut context: Context, peer: &PeerCtx, ) -> PeerNetwork { // Create lookup config - no bootstrappers needed since we register addresses directly let mut config = lookup::Config::recommended( peer.info.private_key.clone(), b"fuzz_namespace", peer.info.address, MAX_MSG_SIZE, ); config.allow_private_ips = true; // Required for localhost testing config.tracked_peer_sets = 2 * peer.topo.tracked_peer_sets; // Create the network and oracle let (mut network, mut oracle) = LookupNetwork::new(context.with_label("fuzzed_lookup_network"), config); // For lookup, we must provide both public keys AND addresses // (unlike discovery which finds addresses through the protocol) let peer_list: Vec<_> = peer .topo .peers .iter() .map(|p| (p.public_key.clone(), p.address.into())) .collect(); // Register multiple peer sets to seed the network // Register all peers for indices 0..TRACKED_PEER_SETS for index in 0..peer.topo.tracked_peer_sets { oracle .update( index as u64, peer_list .clone() .try_into() .expect("public keys are unique"), ) .await; } // Register randomized subsets of 3 peers for indices TRACKED_PEER_SETS..2*TRACKED_PEER_SETS for index in peer.topo.tracked_peer_sets..(peer.topo.tracked_peer_sets * 2) { let mut peers = peer_list.clone(); peers.shuffle(&mut context); let subset: Map<_, _> = peers[..3] .iter() .cloned() .try_collect() .expect("public keys are unique"); oracle.update(index as u64, subset).await; } let quota = Quota::per_second(NZU32!(100)); let (sender, receiver) = network.register(0, quota, DEFAULT_MESSAGE_BACKLOG); // Start the network background task let handle = network.start(); PeerNetwork { sender, receiver, oracle, handle, } } async fn register_peers<'a>( oracle: &'a mut Self::Oracle, index: u64, topo: &'a Topology, peer_ids: &'a [PeerId], ) { // Lookup needs both public keys and addresses let peer_list: Map<_, Address> = peer_ids .iter() .map(|&id| { let p = &topo.peers[id as usize]; (p.public_key.clone(), p.address.into()) }) .try_collect() .expect("public keys are unique"); let _ = oracle.update(index, peer_list).await; } } /// Main fuzzing entry point for testing p2p network implementations. /// /// Creates a deterministic network of peers and executes a sequence of random operations /// (sending messages, receiving messages, registering peer subsets, blocking peers). /// /// # Type Parameters /// /// * `N` - The network scheme to test ([Discovery] or [Lookup]) /// /// # Parameters /// /// * `input` - The fuzz input containing the seed, number of peers, and operations to execute /// /// # Behavior /// /// 1. Creates N peers with deterministic keys and addresses /// 2. Initializes a network instance for each peer /// 3. Executes the sequence of operations from the fuzzer /// 4. Tracks expected messages and verifies delivery /// 5. Cleans up network handles when done pub fn fuzz(input: FuzzInput) { // Create a deterministic executor with the provided seed let executor = deterministic::Runner::seeded(input.seed); executor.start(|mut context| async move { let base_port = 63000; // Arbitrary starting port for localhost testing // Build peer_infos and reverse lookup map together let mut pk_to_id = HashMap::new(); let mut peer_infos = Vec::new(); for i in 0..input.peers { let private_key = ed25519::PrivateKey::from_seed(context.gen()); let public_key = private_key.public_key(); let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16); pk_to_id.insert(public_key.clone(), i); peer_infos.push(PeerInfo { private_key, public_key, address, }); } // Build topology once let topology = Arc::new(Topology { peers: peer_infos, base_port, tracked_peer_sets: TRACKED_PEER_SETS, pk_to_id, }); // Create network instances and peer states let mut peers = Vec::new(); for id in 0..topology.peers.len() { let peer_ctx = PeerCtx { id: id as PeerId, info: topology.peers[id].clone(), topo: Arc::clone(&topology), }; // Create network instance for this peer let peer_context = context.with_label(&format!("peer_{id}")); let network = N::create_network(peer_context, &peer_ctx).await; // Create and store peer state peers.push(Peer { info: peer_ctx.info, network, }); } // Execute fuzzer operations and track message expectations // Track expected messages: (to_idx, from_idx) -> queue of messages // Messages are sent with the same priority, ensuring FIFO delivery per sender-receiver pair let mut expected_msgs: HashMap<(u8, u8), VecDeque> = HashMap::new(); // Track which receivers have pending messages from which senders // Receiver index -> set of sender indices that have pending messages for this receiver let mut pending_by_receiver: HashMap> = HashMap::new(); for op in input.operations.into_iter() { match op { NetworkOperation::SendMessage { from_idx, num_recipients, msg_size, } => { // Normalize sender index to valid range let from_idx = (from_idx as usize) % peers.len(); // Clamp message size to not exceed max (accounting for channel overhead) let msg_size = msg_size.clamp(0, MAX_MSG_SIZE as usize - Channel::SIZE); // Generate random message payload let mut bytes = vec![0u8; msg_size]; context.fill(&mut bytes[..]); let message = Bytes::from(bytes); // Select random recipients (excluding sender) let mut available: Vec<_> = (0..peers.len()) .filter(|&i| i != from_idx) .collect(); if available.is_empty() { continue; // Only 1 peer (self) in network, can't send anywhere } // Randomly select num_recipients peers (at least 1, at most all available) available.shuffle(&mut context); let count = (num_recipients as usize).clamp(1, available.len()); let selected = &available[..count]; // Build Recipients based on count let recipients = if count == available.len() { Recipients::All } else if count == 1 { Recipients::One(peers[selected[0]].info.public_key.clone()) } else { let pks: Vec<_> = selected .iter() .map(|&idx| peers[idx].info.public_key.clone()) .collect(); Recipients::Some(pks) }; // Attempt to send the message // Note: Always use low priority (false) to ensure FIFO ordering let send_result = peers[from_idx] .network .sender .send(recipients, message.clone(), false) .await; // Track message as expected only if send was accepted if let Ok(accepted_recipients) = send_result { // Map accepted recipient public keys to indices for pk in accepted_recipients.into_iter() { if let Some(&to_idx) = topology.pk_to_id.get(&pk) { // Add message to the queue for this (receiver, sender) pair expected_msgs .entry((to_idx, from_idx as u8)) .or_default() .push_back(message.clone()); // Track that this receiver has pending messages from this sender pending_by_receiver .entry(to_idx) .or_default() .insert(from_idx as u8); } } } } NetworkOperation::ReceiveMessages => { // Attempt to receive one message from each receiver with pending messages let receivers_with_pending: Vec = pending_by_receiver.keys().cloned().collect(); for to_idx in receivers_with_pending { let receiver = &mut peers[to_idx as usize].network.receiver; // Try to receive one message with timeout commonware_macros::select! { result = receiver.recv() => { let Ok((sender_pk, message)) = result else { continue; // Receive error }; // Identify the sender by their public key let Some(&from_idx) = topology.pk_to_id.get(&sender_pk) else { panic!("Received message from unknown sender: {sender_pk:?}"); }; // Find the expected queue for this sender-receiver pair let expected_msgs_key = (to_idx, from_idx); let Some(queue) = expected_msgs.get_mut(&expected_msgs_key) else { panic!("No expected messages for this sender-receiver pair"); }; // Find message in expected queue if let Some(pos) = queue.iter().position(|m| m == &message) { // Remove all messages up to and including this one // Messages before it were implicitly dropped, this one is received for _ in 0..=pos { queue.pop_front(); } // Clean up empty queue and update pending tracking if queue.is_empty() { expected_msgs.remove(&expected_msgs_key); if let Some(senders) = pending_by_receiver.get_mut(&to_idx) { senders.remove(&from_idx); if senders.is_empty() { pending_by_receiver.remove(&to_idx); } } } } else { panic!( "Unexpected message from sender {} to receiver {}. Message len: {}", from_idx, to_idx, message.len() ); } }, _ = context.sleep(Duration::from_millis(MAX_SLEEP_DURATION_MS)) => { continue; // Timeout - message may not have arrived yet }, } } } NetworkOperation::RegisterPeers { peer_idx, index, peer_set_size, } => { // Register a subset of peers under the given index // Normalize parameters to valid ranges let peer_idx = (peer_idx as usize) % peers.len(); let index = index % MAX_INDEX; let peer_set_size = (peer_set_size as usize).clamp(1, topology.peers.len()); // Build a random subset of peer IDs (using a set to avoid duplicates) let mut peer_ids = HashSet::new(); for _ in 0..peer_set_size { let id = context.gen::() % topology.peers.len(); peer_ids.insert(id as PeerId); } let peer_ids: Vec<_> = peer_ids.into_iter().collect(); // Register this peer set with the oracle N::register_peers( &mut peers[peer_idx].network.oracle, index as u64, &topology, &peer_ids, ) .await; } NetworkOperation::BlockPeer { peer_idx, target_idx } => { // Block a peer from sending messages to another peer // Normalize indices to valid ranges let blocker_idx = (peer_idx as usize) % peers.len(); let blocked_idx = (target_idx as usize) % peers.len(); // Block the target peer on the blocker's oracle let blocked_pk = peers[blocked_idx].info.public_key.clone(); let _ = peers[blocker_idx].network.oracle.block(blocked_pk).await; } } } }); }