#![no_main] use arbitrary::Arbitrary; use commonware_broadcast::{ buffered::{Config, Engine, Mailbox}, Broadcaster, }; use commonware_codec::{Encode, RangeCfg, ReadRangeExt}; use commonware_cryptography::{ ed25519::{PrivateKey, PublicKey}, sha256::Digest, Digestible, Hasher, Sha256, Signer, }; use commonware_p2p::{simulated::Network, Recipients}; use commonware_runtime::{deterministic, Buf, BufMut, Clock, Quota, Runner, Supervisor as _}; use commonware_utils::{channel::oneshot, futures::Pool, vec::Bounded, NZUsize}; use futures::FutureExt as _; use libfuzzer_sys::fuzz_target; use rand::{seq::SliceRandom, SeedableRng}; use std::{collections::BTreeMap, num::NonZeroU32, time::Duration}; /// Default rate limit set high enough to not interfere with normal operation const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX); /// Maximum sleep duration in milliseconds for fuzz tests. /// /// Capped to avoid overflow in governor rate limiter which uses nanoseconds internally /// and can only represent durations up to ~584 years. const MAX_SLEEP_DURATION_MS: u64 = 1000; /// Maximum number of recently broadcast digests tracked for `Recent` lookups. /// /// Indexed by `u8` modulo length, so a larger buffer would leave entries past 255 /// unreachable via `Source::Recent`. const MAX_RECENT_DIGESTS: usize = (u8::MAX as usize) + 1; /// Subscription result paired with the digest requested so completions can be validated. type Subscription = (Digest, Result); #[derive(Clone, Debug, Arbitrary)] pub enum RecipientPattern { All, Some(u64), One(u64), } #[derive(Debug, Clone, PartialEq, Eq, Arbitrary)] pub struct FuzzMessage { pub commitment: Vec, pub content: Vec, } impl Digestible for FuzzMessage { type Digest = Digest; fn digest(&self) -> Self::Digest { Sha256::hash(&self.encode()) } } impl commonware_codec::Write for FuzzMessage { fn write(&self, buf: &mut impl BufMut) { self.commitment.write(buf); self.content.write(buf); } } impl commonware_codec::EncodeSize for FuzzMessage { fn encode_size(&self) -> usize { self.commitment.encode_size() + self.content.encode_size() } } impl commonware_codec::Read for FuzzMessage { type Cfg = RangeCfg; fn read_cfg(buf: &mut impl Buf, range: &Self::Cfg) -> Result { let commitment = Vec::::read_range(buf, *range)?; let content = Vec::::read_range(buf, *range)?; Ok(Self { commitment, content, }) } } /// Source for the digest used by `Subscribe`/`Get` actions. #[derive(Clone, Debug, Arbitrary)] enum Source { /// A random message. Random(FuzzMessage), /// Index into messages broadcast earlier in this run. Recent(u8), } impl Source { /// Resolve to a concrete digest. Returns `None` when `Recent` is selected /// before any messages have been broadcast. fn resolve(self, recent: &Bounded) -> Option { match self { Source::Random(message) => Some(message.digest()), Source::Recent(_) if recent.is_empty() => None, Source::Recent(idx) => recent.get((idx as usize) % recent.len()).cloned(), } } } #[derive(Clone, Debug)] enum BroadcastAction { SendMessage { peer_index: usize, recipients: RecipientPattern, message: FuzzMessage, }, Subscribe { peer_index: usize, digest: Source, }, Get { peer_index: usize, digest: Source, }, Sleep { duration_ms: u64, }, } impl<'a> Arbitrary<'a> for BroadcastAction { fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { let variant = u.int_in_range(0..=3)?; match variant { 0 => Ok(BroadcastAction::SendMessage { peer_index: u.arbitrary()?, recipients: u.arbitrary()?, message: u.arbitrary()?, }), 1 => Ok(BroadcastAction::Subscribe { peer_index: u.arbitrary()?, digest: u.arbitrary()?, }), 2 => Ok(BroadcastAction::Get { peer_index: u.arbitrary()?, digest: u.arbitrary()?, }), _ => Ok(BroadcastAction::Sleep { duration_ms: u.int_in_range(0..=MAX_SLEEP_DURATION_MS)?, }), } } } #[derive(Debug)] pub struct FuzzInput { peer_seeds: Vec, network_success_rate: f64, network_latency_ms: u64, network_jitter_ms: u64, cache_size: usize, actions: Vec, } impl<'a> arbitrary::Arbitrary<'a> for FuzzInput { fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { let num_peers = u.int_in_range(1..=5)?; let peer_seeds = (0..num_peers).collect::>(); // avoid duplicate seeds let network_success_rate = u.int_in_range(30..=100)? as f64 / 100.0; let network_latency_ms = u.int_in_range(1..=100)?; let network_jitter_ms = u.int_in_range(0..=50)?; let cache_size = u.int_in_range(5..=10)?; let num_actions = u.int_in_range(1..=10)?; let actions = (0..num_actions) .map(|_| BroadcastAction::arbitrary(u)) .collect::, _>>()?; Ok(FuzzInput { peer_seeds, network_success_rate, network_latency_ms, network_jitter_ms, cache_size, actions, }) } } fn resolve_recipients(pattern: &RecipientPattern, peers: &[PublicKey]) -> Recipients { match pattern { RecipientPattern::All => Recipients::All, RecipientPattern::One(seed) => { let index = (*seed as usize) % peers.len(); Recipients::One(peers[index].clone()) } RecipientPattern::Some(seed) => { let mut rng = rand::rngs::StdRng::seed_from_u64(*seed); let mut shuffled_peers = peers.to_vec(); shuffled_peers.shuffle(&mut rng); let count = ((seed % peers.len() as u64) as usize).max(1); let peer_slice = shuffled_peers.into_iter().take(count).collect(); Recipients::Some(peer_slice) } } } // Keep subscriptions alive without spawning one task per receiver. Ready // subscriptions are validated, while unresolved ones remain pending. fn drain_ready_subscriptions(pending: &mut Pool) { while let Some((digest, result)) = pending.next_completed().now_or_never() { if let Ok(message) = result { assert_eq!(message.digest(), digest); } } } fn fuzz(input: FuzzInput) { let executor = deterministic::Runner::default(); executor.start(|context| async move { // Generate peer identities before building the network so the initial // peer set can be seeded through the constructor. let peers = input .peer_seeds .iter() .map(|&seed| PrivateKey::from_seed(seed).public_key()) .collect::>(); // Create network let (network, oracle) = Network::::new_with_peers( context.child("network"), commonware_p2p::simulated::Config { max_size: 1024 * 1024, disconnect_on_block: false, tracked_peer_sets: NZUsize!(1), }, peers.clone(), ) .await; network.start(); // Create peers let mut mailboxes: BTreeMap> = BTreeMap::new(); for (i, public_key) in peers.iter().cloned().enumerate() { // Create channel let (sender, receiver) = oracle .control(public_key.clone()) .register(0, TEST_QUOTA) .await .unwrap(); // Create mailbox let config = Config { public_key: public_key.clone(), mailbox_size: NZUsize!(1024), deque_size: input.cache_size, priority: false, codec_config: RangeCfg::from(..), peer_provider: oracle.manager(), }; // Create engine let engine_context = context.child("peer").with_attribute("index", i); let (engine, mailbox) = Engine::<_, PublicKey, FuzzMessage, _>::new(engine_context, config); mailboxes.insert(public_key.clone(), mailbox); engine.start((sender, receiver)); } // Add links between peers let link = commonware_p2p::simulated::Link { latency: Duration::from_millis(input.network_latency_ms), jitter: Duration::from_millis(input.network_jitter_ms), success_rate: input.network_success_rate, }; for p1 in &peers { for p2 in &peers { if p1 != p2 { let _ = oracle.add_link(p1.clone(), p2.clone(), link.clone()).await; } } } // Execute fuzzed actions let mut recent_digests = Bounded::new(NZUsize!(MAX_RECENT_DIGESTS)); let mut pending_subscriptions = Pool::default(); for action in input.actions { match action { BroadcastAction::SendMessage { peer_index, recipients, message, } => { let clamped_peer_idx = peer_index % peers.len(); let peer = peers[clamped_peer_idx].clone(); if let Some(mailbox) = mailboxes.get(&peer).cloned() { let resolved_recipients = resolve_recipients(&recipients, &peers); recent_digests.push(message.digest()); let _ = mailbox.broadcast(resolved_recipients, message); } } BroadcastAction::Subscribe { peer_index, digest: source, } => { let Some(digest) = source.resolve(&recent_digests) else { continue; }; let clamped_peer_idx = peer_index % peers.len(); let peer = peers[clamped_peer_idx].clone(); if let Some(mailbox) = mailboxes.get(&peer).cloned() { let receiver = mailbox.subscribe(digest); pending_subscriptions.push(async move { (digest, receiver.await) }); } } BroadcastAction::Get { peer_index, digest: source, } => { let Some(digest) = source.resolve(&recent_digests) else { continue; }; let clamped_peer_idx = peer_index % peers.len(); let peer = peers[clamped_peer_idx].clone(); if let Some(mailbox) = mailboxes.get(&peer).cloned() { if let Some(message) = mailbox.get(digest).await { assert_eq!(message.digest(), digest); } } } BroadcastAction::Sleep { duration_ms } => { context.sleep(Duration::from_millis(duration_ms)).await; } } drain_ready_subscriptions(&mut pending_subscriptions); } }); } fuzz_target!(|input: FuzzInput| { fuzz(input); });