//! Implementation of a simulated p2p network. use super::{ bandwidth, ingress::{self, Oracle}, metrics, Error, }; use crate::{Channel, Message, Recipients}; use bytes::Bytes; use commonware_codec::{DecodeExt, FixedSize}; use commonware_cryptography::PublicKey; use commonware_macros::select; use commonware_runtime::{Clock, Handle, Listener as _, Metrics, Network as RNetwork, Spawner}; use commonware_stream::utils::codec::{recv_frame, send_frame}; use futures::{ channel::{mpsc, oneshot}, SinkExt, StreamExt, }; use prometheus_client::metrics::{counter::Counter, family::Family}; use rand::Rng; use rand_distr::{Distribution, Normal}; use std::{ collections::{BTreeMap, HashMap, HashSet}, net::{IpAddr, Ipv4Addr, SocketAddr}, time::{Duration, SystemTime}, }; use tracing::{error, trace}; /// Task type representing a message to be sent within the network. type Task

= (Channel, P, Recipients

, Bytes, oneshot::Sender>); /// Configuration for the simulated network. pub struct Config { /// Maximum size of a message that can be sent over the network. pub max_size: usize, } /// Implementation of a simulated network. pub struct Network { context: E, // Maximum size of a message that can be sent over the network max_size: usize, // Next socket address to assign to a new peer // Incremented for each new peer next_addr: SocketAddr, // Channel to receive messages from the oracle ingress: mpsc::UnboundedReceiver>, // A channel to receive tasks from peers // The sender is cloned and given to each peer // The receiver is polled in the main loop sender: mpsc::UnboundedSender>, receiver: mpsc::UnboundedReceiver>, // A map from a pair of public keys (from, to) to a link between the two peers links: HashMap<(P, P), Link>, // A map from a public key to a peer peers: BTreeMap>, // A map of peers blocking each other blocks: HashSet<(P, P)>, // Metrics for received and sent messages received_messages: Family, sent_messages: Family, } impl Network { /// Create a new simulated network with a given runtime and configuration. /// /// Returns a tuple containing the network instance and the oracle that can /// be used to modify the state of the network during context. pub fn new(context: E, cfg: Config) -> (Self, Oracle

) { let (sender, receiver) = mpsc::unbounded(); let (oracle_sender, oracle_receiver) = mpsc::unbounded(); let sent_messages = Family::::default(); let received_messages = Family::::default(); context.register("messages_sent", "messages sent", sent_messages.clone()); context.register( "messages_received", "messages received", received_messages.clone(), ); // Start with a pseudo-random IP address to assign sockets to for new peers let next_addr = SocketAddr::new( IpAddr::V4(Ipv4Addr::from_bits(context.clone().next_u32())), 0, ); ( Self { context, max_size: cfg.max_size, next_addr, ingress: oracle_receiver, sender, receiver, links: HashMap::new(), peers: BTreeMap::new(), blocks: HashSet::new(), received_messages, sent_messages, }, Oracle::new(oracle_sender.clone()), ) } /// Returns (and increments) the next available socket address. /// /// The port number is incremented for each call, and the IP address is incremented if the port /// number overflows. fn get_next_socket(&mut self) -> SocketAddr { let result = self.next_addr; // Increment the port number, or the IP address if the port number overflows. // Allows the ip address to overflow (wrapping). match self.next_addr.port().checked_add(1) { Some(port) => { self.next_addr.set_port(port); } None => { let ip = match self.next_addr.ip() { IpAddr::V4(ipv4) => ipv4, _ => unreachable!(), }; let next_ip = Ipv4Addr::to_bits(ip).wrapping_add(1); self.next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(next_ip)), 0); } } result } /// Handle an ingress message. /// /// This method is called when a message is received from the oracle. async fn handle_ingress(&mut self, message: ingress::Message

) { // It is important to ensure that no failed receipt of a message will cause us to exit. // This could happen if the caller drops the `Oracle` after updating the network topology. // Thus, we create a helper function to send the result to the oracle and log any errors. fn send_result( result: oneshot::Sender>, value: Result, ) { let success = value.is_ok(); if let Err(e) = result.send(value) { error!(?e, "failed to send result to oracle (ok = {})", success); } } match message { ingress::Message::Register { public_key, channel, result, } => { // If peer does not exist, then create it. if !self.peers.contains_key(&public_key) { let peer = Peer::new( &mut self.context.clone(), public_key.clone(), self.get_next_socket(), usize::MAX, usize::MAX, self.max_size, ); self.peers.insert(public_key.clone(), peer); } // Create a receiver that allows receiving messages from the network for a certain channel let peer = self.peers.get_mut(&public_key).unwrap(); let receiver = match peer.register(channel).await { Ok(receiver) => Receiver { receiver }, Err(err) => return send_result(result, Err(err)), }; // Create a sender that allows sending messages to the network for a certain channel let sender = Sender::new( self.context.clone(), public_key, channel, self.max_size, self.sender.clone(), ); send_result(result, Ok((sender, receiver))) } ingress::Message::SetBandwidth { public_key, egress_bps, ingress_bps, result, } => match self.peers.get_mut(&public_key) { Some(peer) => { peer.set_bandwidth(egress_bps, ingress_bps); send_result(result, Ok(())); } None => send_result(result, Err(Error::PeerMissing)), }, ingress::Message::AddLink { sender, receiver, sampler, success_rate, result, } => { // Require both peers to be registered if !self.peers.contains_key(&sender) { return send_result(result, Err(Error::PeerMissing)); } let peer = match self.peers.get(&receiver) { Some(peer) => peer, None => return send_result(result, Err(Error::PeerMissing)), }; // Require link to not already exist let key = (sender.clone(), receiver.clone()); if self.links.contains_key(&key) { return send_result(result, Err(Error::LinkExists)); } let link = Link::new( &mut self.context, sender, receiver, peer.socket, sampler, success_rate, self.max_size, self.received_messages.clone(), ); self.links.insert(key, link); send_result(result, Ok(())) } ingress::Message::RemoveLink { sender, receiver, result, } => { match self.links.remove(&(sender, receiver)) { Some(_) => (), None => return send_result(result, Err(Error::LinkMissing)), } send_result(result, Ok(())) } ingress::Message::Block { from, to } => { self.blocks.insert((from, to)); } ingress::Message::Blocked { result } => { send_result(result, Ok(self.blocks.iter().cloned().collect())) } } } } impl Network { /// Schedule a transmission respecting bandwidth limits on both sender and receiver. fn schedule_transmission( &mut self, sender: &P, receiver: &P, data_size: usize, now: SystemTime, should_deliver: bool, ) -> SystemTime { // Prune and get used bandwidth for sender and receiver let sender_used = { let sender_peer = self.peers.get_mut(sender).expect("sender not found"); sender_peer.egress.prune_and_get_usage(now) }; let receiver_used = if should_deliver && sender != receiver { let receiver_peer = self.peers.get_mut(receiver).expect("receiver not found"); Some(receiver_peer.ingress.prune_and_get_usage(now)) } else { None }; let sender_schedule = { let sender = self.peers.get(sender).expect("sender not found"); (&sender.egress, sender_used) }; let receiver_schedule = if let Some(used) = receiver_used { let receiver_peer = self.peers.get(receiver).expect("receiver not found"); Some((&receiver_peer.ingress, used)) } else { None }; // Now calculate reservations let (reservations, completion_time) = bandwidth::calculate_reservations(data_size, now, sender_schedule, receiver_schedule); // Apply reservations to sender if !reservations.is_empty() { let sender_peer = self.peers.get_mut(sender).expect("sender not found"); for reservation in &reservations { sender_peer.egress.add_reservation( reservation.start, reservation.end, reservation.bandwidth, ); } // Apply to receiver if delivering if receiver_used.is_some() { let receiver_peer = self.peers.get_mut(receiver).expect("receiver not found"); for reservation in &reservations { receiver_peer.ingress.add_reservation( reservation.start, reservation.end, reservation.bandwidth, ); } } } completion_time } /// Handle a task. /// /// This method is called when a task is received from the sender, which can come from /// any peer in the network. fn handle_task(&mut self, task: Task

) { // Collect recipients let (channel, origin, recipients, message, reply) = task; let recipients = match recipients { Recipients::All => self.peers.keys().cloned().collect(), Recipients::Some(keys) => keys, Recipients::One(key) => vec![key], }; // Send to all recipients let mut sent = Vec::new(); let (acquired_sender, mut acquired_receiver) = mpsc::channel(recipients.len()); for recipient in recipients { // Skip self if recipient == origin { trace!(?recipient, reason = "self", "dropping message",); continue; } // Determine if the sender or recipient has blocked the other let o_r = (origin.clone(), recipient.clone()); let r_o = (recipient.clone(), origin.clone()); if self.blocks.contains(&o_r) || self.blocks.contains(&r_o) { trace!(?origin, ?recipient, reason = "blocked", "dropping message"); continue; } // Determine if there is a link between the sender and recipient let link = match self.links.get(&o_r) { Some(link) => link, None => { trace!(?origin, ?recipient, reason = "no link", "dropping message",); continue; } }; // Record sent message as soon as we determine there is a link with recipient (approximates // having an open connection) self.sent_messages .get_or_create(&metrics::Message::new(&origin, &recipient, channel)) .inc(); // Check bandwidth constraints and determine if the message should be delivered let (sender_has_bandwidth, should_deliver) = { let sender_peer = self.peers.get(&origin).expect("sender must exist"); let receiver_peer = self.peers.get(&recipient).expect("receiver must exist"); let sender_has_bandwidth = sender_peer.egress.bps > 0; let receiver_has_bandwidth = receiver_peer.ingress.bps > 0; let should_deliver = self.context.gen_bool(link.success_rate); ( sender_has_bandwidth, // If the receiver has no bandwidth then we treat it as if the message // is never delivered. Still consume sender-side bandwidth. should_deliver && receiver_has_bandwidth, ) }; if !sender_has_bandwidth { // Sender has no bandwidth, skip this recipient trace!( ?origin, ?recipient, "sender has zero bandwidth, skipping recipient" ); continue; } // Sample latency and get current time let latency = Duration::from_millis(link.sampler.sample(&mut self.context) as u64); let now = self.context.current(); // Schedule the transmission let transmission_complete_at = self.schedule_transmission(&origin, &recipient, message.len(), now, should_deliver); // If the message should be delivered, queue it immediately on the // link to preserve ordering if should_deliver { let link = self.links.get_mut(&o_r).unwrap(); // The final arrival time includes the per-message latency let receive_complete_at = transmission_complete_at + latency; if let Err(err) = link.send(channel, message.clone(), receive_complete_at) { // This can only happen if the receiver exited. error!(?origin, ?recipient, ?err, "failed to send"); continue; } } let transmission_duration = transmission_complete_at .duration_since(now) .unwrap_or(Duration::ZERO); trace!( ?origin, ?recipient, transmission_duration_ms = transmission_duration.as_millis(), latency_ms = latency.as_millis(), delivered = should_deliver, "sending message", ); // Spawn task to handle sender timing self.context.with_label("sender-timing").spawn({ let recipient = recipient.clone(); let mut acquired_sender = acquired_sender.clone(); move |context| async move { // Wait for transmission to complete context.sleep_until(transmission_complete_at).await; // Mark as sent once transmission completes acquired_sender.send(()).await.unwrap(); if !should_deliver { trace!( ?recipient, reason = "random link failure", "dropping message", ); } } }); sent.push(recipient); } // Notify sender of successful sends self.context .clone() .with_label("notifier") .spawn(|_| async move { // Wait for semaphore to be acquired on all sends for _ in 0..sent.len() { acquired_receiver.next().await.unwrap(); } // Notify sender of successful sends if let Err(err) = reply.send(sent) { // This can only happen if the sender exited. error!(?err, "failed to send ack"); } }); } /// Run the simulated network. /// /// It is not necessary to invoke this method before modifying the network topology, however, /// no messages will be sent until this method is called. pub fn start(mut self) -> Handle<()> { self.context.spawn_ref()(self.run()) } async fn run(mut self) { loop { select! { message = self.ingress.next() => { // If ingress is closed, exit let message = match message { Some(message) => message, None => break, }; self.handle_ingress(message).await; }, task = self.receiver.next() => { // If receiver is closed, exit let task = match task { Some(task) => task, None => break, }; self.handle_task(task); } } } } } /// Implementation of a [crate::Sender] for the simulated network. #[derive(Clone, Debug)] pub struct Sender { me: P, channel: Channel, max_size: usize, high: mpsc::UnboundedSender>, low: mpsc::UnboundedSender>, } impl Sender

{ fn new( context: impl Spawner + Metrics, me: P, channel: Channel, max_size: usize, mut sender: mpsc::UnboundedSender>, ) -> Self { // Listen for messages let (high, mut high_receiver) = mpsc::unbounded(); let (low, mut low_receiver) = mpsc::unbounded(); context.with_label("sender").spawn(move |_| async move { loop { // Wait for task let task; select! { high_task = high_receiver.next() => { task = match high_task { Some(task) => task, None => break, }; }, low_task = low_receiver.next() => { task = match low_task { Some(task) => task, None => break, }; } } // Send task if let Err(err) = sender.send(task).await { error!(?err, channel, "failed to send task"); } } }); // Return sender Self { me, channel, max_size, high, low, } } } impl crate::Sender for Sender

{ type Error = Error; type PublicKey = P; async fn send( &mut self, recipients: Recipients

, message: Bytes, priority: bool, ) -> Result, Error> { // Check message size if message.len() > self.max_size { return Err(Error::MessageTooLarge(message.len())); } // Send message let (sender, receiver) = oneshot::channel(); let mut channel = if priority { &self.high } else { &self.low }; channel .send((self.channel, self.me.clone(), recipients, message, sender)) .await .map_err(|_| Error::NetworkClosed)?; receiver.await.map_err(|_| Error::NetworkClosed) } } type MessageReceiver

= mpsc::UnboundedReceiver>; type MessageReceiverResult

= Result, Error>; /// Implementation of a [crate::Receiver] for the simulated network. #[derive(Debug)] pub struct Receiver { receiver: MessageReceiver

, } impl crate::Receiver for Receiver

{ type Error = Error; type PublicKey = P; async fn recv(&mut self) -> Result, Error> { self.receiver.next().await.ok_or(Error::NetworkClosed) } } /// A peer in the simulated network. /// /// The peer can register channels, which allows it to receive messages sent to the channel from other peers. struct Peer { // Socket address that the peer is listening on socket: SocketAddr, // Control to register new channels control: mpsc::UnboundedSender<(Channel, oneshot::Sender>)>, // Bandwidth schedules for egress and ingress egress: bandwidth::Schedule, ingress: bandwidth::Schedule, } impl Peer

{ /// Create and return a new peer. /// /// The peer will listen for incoming connections on the given `socket` address. /// `max_size` is the maximum size of a message that can be sent to the peer. fn new( context: &mut E, public_key: P, socket: SocketAddr, egress_bps: usize, ingress_bps: usize, max_size: usize, ) -> Self { // The control is used to register channels. // There is exactly one mailbox created for each channel that the peer is registered for. let (control_sender, mut control_receiver) = mpsc::unbounded(); // Whenever a message is received from a peer, it is placed in the inbox. // The router polls the inbox and forwards the message to the appropriate mailbox. let (inbox_sender, mut inbox_receiver) = mpsc::unbounded(); // Spawn router context.with_label("router").spawn(|_| async move { // Map of channels to mailboxes (senders to particular channels) let mut mailboxes = HashMap::new(); // Continually listen for control messages and outbound messages loop { select! { // Listen for control messages, which are used to register channels control = control_receiver.next() => { // If control is closed, exit let (channel, result): (Channel, oneshot::Sender>) = match control { Some(control) => control, None => break, }; // Check if channel is registered if mailboxes.contains_key(&channel) { result.send(Err(Error::ChannelAlreadyRegistered(channel))).unwrap(); continue; } // Register channel let (sender, receiver) = mpsc::unbounded(); mailboxes.insert(channel, sender); result.send(Ok(receiver)).unwrap(); }, // Listen for messages from the inbox, which are forwarded to the appropriate mailbox inbox = inbox_receiver.next() => { // If inbox is closed, exit let (channel, message) = match inbox { Some(message) => message, None => break, }; // Send message to mailbox match mailboxes.get_mut(&channel) { Some(mailbox) => { if let Err(err) = mailbox.send(message).await { error!(?err, "failed to send message to mailbox"); } } None => { trace!( recipient = ?public_key, channel, reason = "missing channel", "dropping message", ); } } }, } } }); // Spawn a task that accepts new connections and spawns a task for each connection context.with_label("listener").spawn({ let inbox_sender = inbox_sender.clone(); move |context| async move { // Initialize listener let mut listener = context.bind(socket).await.unwrap(); // Continually accept new connections while let Ok((_, _, mut stream)) = listener.accept().await { // New connection accepted. Spawn a task for this connection context.with_label("receiver").spawn({ let mut inbox_sender = inbox_sender.clone(); move |_| async move { // Receive dialer's public key as a handshake let dialer = match recv_frame(&mut stream, max_size).await { Ok(data) => data, Err(_) => { error!("failed to receive public key from dialer"); return; } }; let Ok(dialer) = P::decode(dialer.as_ref()) else { error!("received public key is invalid"); return; }; // Continually receive messages from the dialer and send them to the inbox while let Ok(data) = recv_frame(&mut stream, max_size).await { let channel = Channel::from_be_bytes( data[..Channel::SIZE].try_into().unwrap(), ); let message = data.slice(Channel::SIZE..); if let Err(err) = inbox_sender .send((channel, (dialer.clone(), message))) .await { error!(?err, "failed to send message to mailbox"); break; } } } }); } } }); // Return peer Self { socket, control: control_sender, egress: bandwidth::Schedule::new(egress_bps), ingress: bandwidth::Schedule::new(ingress_bps), } } /// Register a channel with the peer. /// /// This allows the peer to receive messages sent to the channel. /// Returns a receiver that can be used to receive messages sent to the channel. async fn register(&mut self, channel: Channel) -> MessageReceiverResult

{ let (sender, receiver) = oneshot::channel(); self.control .send((channel, sender)) .await .map_err(|_| Error::NetworkClosed)?; receiver.await.map_err(|_| Error::NetworkClosed)? } /// Set bandwidth limits for the peer. /// /// Bandwidth is specified for the peer's egress (upload) and ingress /// (download) rates in bytes per second. Use `usize::MAX` for effectively /// unlimited bandwidth. fn set_bandwidth(&mut self, egress_bps: usize, ingress_bps: usize) { self.egress.bps = egress_bps; self.ingress.bps = ingress_bps; } } // A unidirectional link between two peers. // Messages can be sent over the link with a given latency, jitter, and success rate. #[derive(Clone)] struct Link { sampler: Normal, success_rate: f64, // Messages with their receive time for ordered delivery inbox: mpsc::UnboundedSender<(Channel, Bytes, SystemTime)>, } impl Link { #[allow(clippy::too_many_arguments)] fn new( context: &mut E, dialer: P, receiver: P, socket: SocketAddr, sampler: Normal, success_rate: f64, max_size: usize, received_messages: Family, ) -> Self { let (inbox, mut outbox) = mpsc::unbounded(); let result = Self { sampler, success_rate, inbox, }; // Spawn a task that will wait for messages to be sent to the link and then send them // over the network. context .clone() .with_label("link") .spawn(move |context| async move { // Dial the peer and handshake by sending it the dialer's public key let (mut sink, _) = context.dial(socket).await.unwrap(); if let Err(err) = send_frame(&mut sink, &dialer, max_size).await { error!(?err, "failed to send public key to listener"); return; } // Process messages in order, waiting for their receive time while let Some((channel, message, receive_complete_at)) = outbox.next().await { // Wait until the message should arrive at receiver context.sleep_until(receive_complete_at).await; // Send the message let mut data = bytes::BytesMut::with_capacity(Channel::SIZE + message.len()); data.extend_from_slice(&channel.to_be_bytes()); data.extend_from_slice(&message); let data = data.freeze(); send_frame(&mut sink, &data, max_size).await.unwrap(); // Bump received messages metric received_messages .get_or_create(&metrics::Message::new(&dialer, &receiver, channel)) .inc(); } }); result } // Send a message over the link with receive timing. fn send( &mut self, channel: Channel, message: Bytes, receive_complete_at: SystemTime, ) -> Result<(), Error> { self.inbox .unbounded_send((channel, message, receive_complete_at)) .map_err(|_| Error::NetworkClosed)?; Ok(()) } } #[cfg(test)] mod tests { use super::*; use commonware_cryptography::{ed25519, PrivateKeyExt as _, Signer as _}; use commonware_runtime::{deterministic, Runner}; const MAX_MESSAGE_SIZE: usize = 1024 * 1024; #[test] fn test_register_and_link() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { max_size: MAX_MESSAGE_SIZE, }; let network_context = context.with_label("network"); let (network, mut oracle) = Network::new(network_context.clone(), cfg); network_context.spawn(|_| network.run()); // Create two public keys let pk1 = ed25519::PrivateKey::from_seed(1).public_key(); let pk2 = ed25519::PrivateKey::from_seed(2).public_key(); // Register oracle.register(pk1.clone(), 0).await.unwrap(); oracle.register(pk1.clone(), 1).await.unwrap(); oracle.register(pk2.clone(), 0).await.unwrap(); oracle.register(pk2.clone(), 1).await.unwrap(); // Expect error when registering again assert!(matches!( oracle.register(pk1.clone(), 1).await, Err(Error::ChannelAlreadyRegistered(_)) )); // Add link let link = ingress::Link { latency: Duration::from_millis(2), jitter: Duration::from_millis(1), success_rate: 0.9, }; oracle .add_link(pk1.clone(), pk2.clone(), link.clone()) .await .unwrap(); // Expect error when adding link again assert!(matches!( oracle.add_link(pk1, pk2, link).await, Err(Error::LinkExists) )); }); } #[test] fn test_get_next_socket() { let cfg = Config { max_size: MAX_MESSAGE_SIZE, }; let runner = deterministic::Runner::default(); runner.start(|context| async move { type PublicKey = ed25519::PublicKey; let (mut network, _) = Network::::new(context.clone(), cfg); // Test that the next socket address is incremented correctly let mut original = network.next_addr; let next = network.get_next_socket(); assert_eq!(next, original); let next = network.get_next_socket(); original.set_port(1); assert_eq!(next, original); // Test that the port number overflows correctly let max_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 0, 255, 255)), 65535); network.next_addr = max_addr; let next = network.get_next_socket(); assert_eq!(next, max_addr); let next = network.get_next_socket(); assert_eq!( next, SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 1, 0, 0)), 0) ); }); } }