//! Implementation of a simulated p2p network. use super::{ ingress::{self, Oracle}, metrics, transmitter::{self, Completion}, Error, }; use crate::{Channel, Message, Recipients}; use bytes::Bytes; use commonware_codec::{DecodeExt, FixedSize}; use commonware_cryptography::PublicKey; use commonware_macros::select; use commonware_runtime::{ spawn_cell, Clock, ContextCell, Handle, Listener as _, Metrics, Network as RNetwork, Spawner, }; use commonware_stream::utils::codec::{recv_frame, send_frame}; use commonware_utils::set::Ordered; use either::Either; use futures::{ channel::{mpsc, oneshot}, future, SinkExt, StreamExt, }; use prometheus_client::metrics::{counter::Counter, family::Family}; use rand::Rng; use rand_distr::{Distribution, Normal}; use std::{ collections::{BTreeMap, HashMap, HashSet}, net::{IpAddr, Ipv4Addr, SocketAddr}, time::{Duration, SystemTime}, }; use tracing::{debug, error, trace, warn}; /// Task type representing a message to be sent within the network. type Task

= (Channel, P, Recipients

, Bytes, oneshot::Sender>); /// Configuration for the simulated network. pub struct Config { /// Maximum size of a message that can be sent over the network. pub max_size: usize, /// True if peers should disconnect upon being blocked. While production networking would /// typically disconnect, for testing purposes it may be useful to keep peers connected, /// allowing byzantine actors the ability to continue sending messages. pub disconnect_on_block: bool, /// The maximum number of peer sets to track. When a new peer set is registered and this /// limit is exceeded, the oldest peer set is removed. Peers that are no longer in any /// tracked peer set will have their links removed and messages to them will be dropped. /// /// If [None], peer sets are not considered. pub tracked_peer_sets: Option, } /// Implementation of a simulated network. pub struct Network { context: ContextCell, // Maximum size of a message that can be sent over the network max_size: usize, // True if peers should disconnect upon being blocked. // While production networking would typically disconnect, for testing purposes it may be useful // to keep peers connected, allowing byzantine actors the ability to continue sending messages. disconnect_on_block: bool, // Next socket address to assign to a new peer // Incremented for each new peer next_addr: SocketAddr, // Channel to receive messages from the oracle ingress: mpsc::UnboundedReceiver>, // A channel to receive tasks from peers // The sender is cloned and given to each peer // The receiver is polled in the main loop sender: mpsc::UnboundedSender>, receiver: mpsc::UnboundedReceiver>, // A map from a pair of public keys (from, to) to a link between the two peers links: HashMap<(P, P), Link>, // A map from a public key to a peer peers: BTreeMap>, // Peer sets indexed by their ID peer_sets: BTreeMap>, // Reference count for each peer (number of peer sets they belong to) peer_refs: BTreeMap, // Maximum number of peer sets to track tracked_peer_sets: Option, // A map of peers blocking each other blocks: HashSet<(P, P)>, // State of the transmitter transmitter: transmitter::State

, // Subscribers to peer set updates #[allow(clippy::type_complexity)] subscribers: Vec, Ordered

)>>, // Metrics for received and sent messages received_messages: Family, sent_messages: Family, } impl Network { /// Create a new simulated network with a given runtime and configuration. /// /// Returns a tuple containing the network instance and the oracle that can /// be used to modify the state of the network during context. pub fn new(mut context: E, cfg: Config) -> (Self, Oracle

) { let (sender, receiver) = mpsc::unbounded(); let (oracle_sender, oracle_receiver) = mpsc::unbounded(); let sent_messages = Family::::default(); let received_messages = Family::::default(); context.register("messages_sent", "messages sent", sent_messages.clone()); context.register( "messages_received", "messages received", received_messages.clone(), ); // Start with a pseudo-random IP address to assign sockets to for new peers let next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(context.next_u32())), 0); ( Self { context: ContextCell::new(context), max_size: cfg.max_size, disconnect_on_block: cfg.disconnect_on_block, tracked_peer_sets: cfg.tracked_peer_sets, next_addr, ingress: oracle_receiver, sender, receiver, links: HashMap::new(), peers: BTreeMap::new(), peer_sets: BTreeMap::new(), peer_refs: BTreeMap::new(), blocks: HashSet::new(), transmitter: transmitter::State::new(), subscribers: Vec::new(), received_messages, sent_messages, }, Oracle::new(oracle_sender.clone()), ) } /// Returns (and increments) the next available socket address. /// /// The port number is incremented for each call, and the IP address is incremented if the port /// number overflows. fn get_next_socket(&mut self) -> SocketAddr { let result = self.next_addr; // Increment the port number, or the IP address if the port number overflows. // Allows the ip address to overflow (wrapping). match self.next_addr.port().checked_add(1) { Some(port) => { self.next_addr.set_port(port); } None => { let ip = match self.next_addr.ip() { IpAddr::V4(ipv4) => ipv4, _ => unreachable!(), }; let next_ip = Ipv4Addr::to_bits(ip).wrapping_add(1); self.next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(next_ip)), 0); } } result } /// Handle an ingress message. /// /// This method is called when a message is received from the oracle. async fn handle_ingress(&mut self, message: ingress::Message

) { // It is important to ensure that no failed receipt of a message will cause us to exit. // This could happen if the caller drops the `Oracle` after updating the network topology. // Thus, we create a helper function to send the result to the oracle and log any errors. fn send_result( result: oneshot::Sender>, value: Result, ) { let success = value.is_ok(); if let Err(e) = result.send(value) { error!(?e, "failed to send result to oracle (ok = {})", success); } } match message { ingress::Message::Update { peer_set, peers } => { let Some(tracked_peer_sets) = self.tracked_peer_sets else { warn!("attempted to register peer set when tracking is disabled"); return; }; // Check if peer set already exists if self.peer_sets.contains_key(&peer_set) { warn!(index = peer_set, "peer set already exists"); return; } // Ensure that peer set is monotonically increasing if let Some((last, _)) = self.peer_sets.last_key_value() { if peer_set <= *last { warn!( new_id = peer_set, old_id = last, "attempted to register peer set with non-monotonically increasing ID" ); return; } } // Create and store new peer set for public_key in peers.iter() { // Create peer if it doesn't exist if !self.peers.contains_key(public_key) { let peer = Peer::new( self.context.with_label("peer"), public_key.clone(), self.get_next_socket(), self.max_size, ); self.peers.insert(public_key.clone(), peer); } // Increment reference count *self.peer_refs.entry(public_key.clone()).or_insert(0) += 1; } self.peer_sets.insert(peer_set, peers.clone()); // Remove oldest peer set if we exceed the limit while self.peer_sets.len() > tracked_peer_sets { let (index, set) = self.peer_sets.pop_first().unwrap(); debug!(index, "removed oldest peer set"); // Decrement reference counts and clean up peers/links for public_key in set.iter() { let refs = self.peer_refs.get_mut(public_key).unwrap(); *refs = refs.checked_sub(1).expect("reference count underflow"); // If peer is no longer in any tracked set, remove it if *refs == 0 { self.peer_refs.remove(public_key); self.peers.remove(public_key); debug!(?public_key, "removed peer no longer in any tracked set"); } } } // Notify all subscribers about the new peer set let all = self.peer_refs.keys().cloned().collect(); let notification = (peer_set, peers, all); self.subscribers .retain(|subscriber| subscriber.unbounded_send(notification.clone()).is_ok()); } ingress::Message::Register { channel, public_key, result, } => { // If peer does not exist, then create it. if !self.peers.contains_key(&public_key) { let peer = Peer::new( self.context.with_label("peer"), public_key.clone(), self.get_next_socket(), self.max_size, ); self.peers.insert(public_key.clone(), peer); } // Create a receiver that allows receiving messages from the network for a certain channel let peer = self.peers.get_mut(&public_key).unwrap(); let receiver = match peer.register(channel).await { Ok(receiver) => Receiver { receiver }, Err(err) => return send_result(result, Err(err)), }; // Create a sender that allows sending messages to the network for a certain channel let sender = Sender::new( self.context.with_label("sender"), public_key, channel, self.max_size, self.sender.clone(), ); send_result(result, Ok((sender, receiver))) } ingress::Message::PeerSet { index, response } => { if self.peer_sets.is_empty() { // Return all peers if no peer sets are registered. let _ = response.send(Some(self.peers.keys().cloned().collect())); } else { // Return the peer set at the given index let _ = response.send(self.peer_sets.get(&index).cloned()); } } ingress::Message::Subscribe { response } => { // Create a new subscription channel let (sender, receiver) = mpsc::unbounded(); // Send the latest peer set upon subscription if let Some((index, peers)) = self.peer_sets.last_key_value() { let all = self.peer_refs.keys().cloned().collect(); let notification = (*index, peers.clone(), all); let _ = sender.unbounded_send(notification); } self.subscribers.push(sender); // Return the receiver to the caller let _ = response.send(receiver); } ingress::Message::LimitBandwidth { public_key, egress_cap, ingress_cap, result, } => match self.peers.contains_key(&public_key) { true => { // Update bandwidth limits let now = self.context.current(); let completions = self.transmitter .limit(now, &public_key, egress_cap, ingress_cap); self.process_completions(completions); // Alert application of update send_result(result, Ok(())); } false => send_result(result, Err(Error::PeerMissing)), }, ingress::Message::AddLink { sender, receiver, sampler, success_rate, result, } => { // Require both peers to be registered if !self.peers.contains_key(&sender) { return send_result(result, Err(Error::PeerMissing)); } let peer = match self.peers.get(&receiver) { Some(peer) => peer, None => return send_result(result, Err(Error::PeerMissing)), }; // Require link to not already exist let key = (sender.clone(), receiver.clone()); if self.links.contains_key(&key) { return send_result(result, Err(Error::LinkExists)); } let link = Link::new( &mut self.context, sender, receiver, peer.socket, sampler, success_rate, self.max_size, self.received_messages.clone(), ); self.links.insert(key, link); send_result(result, Ok(())) } ingress::Message::RemoveLink { sender, receiver, result, } => { match self.links.remove(&(sender, receiver)) { Some(_) => (), None => return send_result(result, Err(Error::LinkMissing)), } send_result(result, Ok(())) } ingress::Message::Block { from, to } => { self.blocks.insert((from, to)); } ingress::Message::Blocked { result } => { send_result(result, Ok(self.blocks.iter().cloned().collect())) } } } } impl Network { /// Process completions from the transmitter. fn process_completions(&mut self, completions: Vec>) { for completion in completions { // If there is no message to deliver, then skip let Some(deliver_at) = completion.deliver_at else { trace!( origin = ?completion.origin, recipient = ?completion.recipient, "message dropped before delivery", ); continue; }; // Send message to link let key = (completion.origin.clone(), completion.recipient.clone()); let Some(link) = self.links.get_mut(&key) else { // This can happen if the link is removed before the message is delivered trace!( origin = ?completion.origin, recipient = ?completion.recipient, "missing link for completion", ); continue; }; if let Err(err) = link.send(completion.channel, completion.message, deliver_at) { error!(?err, "failed to send"); } } } /// Handle a task. /// /// This method is called when a task is received from the sender, which can come from /// any peer in the network. fn handle_task(&mut self, task: Task

) { // If peer sets are enabled and we are not in one, ignore the message (we are disconnected from all) let (channel, origin, recipients, message, reply) = task; if self.tracked_peer_sets.is_some() && !self.peer_refs.contains_key(&origin) { warn!( ?origin, reason = "not in tracked peer set", "dropping message" ); return; } // Collect recipients let recipients = match recipients { Recipients::All => { // If peer sets have been registered, send only to tracked peers // Otherwise, send to all registered peers (compatibility // with tests that do not register peer sets.) if self.peer_sets.is_empty() { self.peers.keys().cloned().collect() } else { self.peer_refs.keys().cloned().collect() } } Recipients::Some(keys) => keys, Recipients::One(key) => vec![key], }; // Send to all recipients let now = self.context.current(); let mut sent = Vec::new(); for recipient in recipients { // Skip self if recipient == origin { trace!(?recipient, reason = "self", "dropping message"); continue; } // If tracking peer sets, ensure recipient and sender are in a tracked peer set if self.tracked_peer_sets.is_some() && !self.peer_refs.contains_key(&recipient) { trace!( ?origin, ?recipient, reason = "not in tracked peer set", "dropping message" ); continue; } // Determine if the sender or recipient has blocked the other let o_r = (origin.clone(), recipient.clone()); let r_o = (recipient.clone(), origin.clone()); if self.disconnect_on_block && (self.blocks.contains(&o_r) || self.blocks.contains(&r_o)) { trace!(?origin, ?recipient, reason = "blocked", "dropping message"); continue; } // Determine if there is a link between the origin and recipient let Some(link) = self.links.get_mut(&o_r) else { trace!(?origin, ?recipient, reason = "no link", "dropping message"); continue; }; // Record sent message as soon as we determine there is a link with recipient (approximates // having an open connection) self.sent_messages .get_or_create(&metrics::Message::new(&origin, &recipient, channel)) .inc(); // Sample latency let latency = Duration::from_millis(link.sampler.sample(&mut self.context) as u64); // Determine if the message should be delivered let should_deliver = self.context.gen_bool(link.success_rate); // Enqueue message for delivery let completions = self.transmitter.enqueue( now, origin.clone(), recipient.clone(), channel, message.clone(), latency, should_deliver, ); self.process_completions(completions); sent.push(recipient); } // Alert application of sent messages if let Err(err) = reply.send(sent) { error!(?err, "failed to send ack"); } } /// Run the simulated network. /// /// It is not necessary to invoke this method before modifying the network topology, however, /// no messages will be sent until this method is called. pub fn start(mut self) -> Handle<()> { spawn_cell!(self.context, self.run().await) } async fn run(mut self) { loop { let tick = match self.transmitter.next() { Some(when) => Either::Left(self.context.sleep_until(when)), None => Either::Right(future::pending()), }; select! { _ = tick => { let now = self.context.current(); let completions = self.transmitter.advance(now); self.process_completions(completions); }, message = self.ingress.next() => { // If ingress is closed, exit let message = match message { Some(message) => message, None => break, }; self.handle_ingress(message).await; }, task = self.receiver.next() => { // If receiver is closed, exit let task = match task { Some(task) => task, None => break, }; self.handle_task(task); }, } } } } /// Implementation of a [crate::Sender] for the simulated network. #[derive(Clone, Debug)] pub struct Sender { me: P, channel: Channel, max_size: usize, high: mpsc::UnboundedSender>, low: mpsc::UnboundedSender>, } impl Sender

{ fn new( context: impl Spawner + Metrics, me: P, channel: Channel, max_size: usize, mut sender: mpsc::UnboundedSender>, ) -> Self { // Listen for messages let (high, mut high_receiver) = mpsc::unbounded(); let (low, mut low_receiver) = mpsc::unbounded(); context.with_label("sender").spawn(move |_| async move { loop { // Wait for task let task; select! { high_task = high_receiver.next() => { task = match high_task { Some(task) => task, None => break, }; }, low_task = low_receiver.next() => { task = match low_task { Some(task) => task, None => break, }; } } // Send task if let Err(err) = sender.send(task).await { error!(?err, channel, "failed to send task"); } } }); // Return sender Self { me, channel, max_size, high, low, } } } impl crate::Sender for Sender

{ type Error = Error; type PublicKey = P; async fn send( &mut self, recipients: Recipients

, message: Bytes, priority: bool, ) -> Result, Error> { // Check message size if message.len() > self.max_size { return Err(Error::MessageTooLarge(message.len())); } // Send message let (sender, receiver) = oneshot::channel(); let mut channel = if priority { &self.high } else { &self.low }; channel .send((self.channel, self.me.clone(), recipients, message, sender)) .await .map_err(|_| Error::NetworkClosed)?; receiver.await.map_err(|_| Error::NetworkClosed) } } type MessageReceiver

= mpsc::UnboundedReceiver>; type MessageReceiverResult

= Result, Error>; /// Implementation of a [crate::Receiver] for the simulated network. #[derive(Debug)] pub struct Receiver { receiver: MessageReceiver

, } impl crate::Receiver for Receiver

{ type Error = Error; type PublicKey = P; async fn recv(&mut self) -> Result, Error> { self.receiver.next().await.ok_or(Error::NetworkClosed) } } /// A peer in the simulated network. /// /// The peer can register channels, which allows it to receive messages sent to the channel from other peers. struct Peer { // Socket address that the peer is listening on socket: SocketAddr, // Control to register new channels control: mpsc::UnboundedSender<(Channel, oneshot::Sender>)>, } impl Peer

{ /// Create and return a new peer. /// /// The peer will listen for incoming connections on the given `socket` address. /// `max_size` is the maximum size of a message that can be sent to the peer. fn new( context: E, public_key: P, socket: SocketAddr, max_size: usize, ) -> Self { // The control is used to register channels. // There is exactly one mailbox created for each channel that the peer is registered for. let (control_sender, mut control_receiver) = mpsc::unbounded(); // Whenever a message is received from a peer, it is placed in the inbox. // The router polls the inbox and forwards the message to the appropriate mailbox. let (inbox_sender, mut inbox_receiver) = mpsc::unbounded(); // Spawn router context.with_label("router").spawn(|_| async move { // Map of channels to mailboxes (senders to particular channels) let mut mailboxes = HashMap::new(); // Continually listen for control messages and outbound messages loop { select! { // Listen for control messages, which are used to register channels control = control_receiver.next() => { // If control is closed, exit let (channel, result): (Channel, oneshot::Sender>) = match control { Some(control) => control, None => break, }; // Check if channel is registered if mailboxes.contains_key(&channel) { result.send(Err(Error::ChannelAlreadyRegistered(channel))).unwrap(); continue; } // Register channel let (sender, receiver) = mpsc::unbounded(); mailboxes.insert(channel, sender); result.send(Ok(receiver)).unwrap(); }, // Listen for messages from the inbox, which are forwarded to the appropriate mailbox inbox = inbox_receiver.next() => { // If inbox is closed, exit let (channel, message) = match inbox { Some(message) => message, None => break, }; // Send message to mailbox match mailboxes.get_mut(&channel) { Some(mailbox) => { if let Err(err) = mailbox.send(message).await { error!(?err, "failed to send message to mailbox"); } } None => { trace!( recipient = ?public_key, channel, reason = "missing channel", "dropping message", ); } } }, } } }); // Spawn a task that accepts new connections and spawns a task for each connection context.with_label("listener").spawn({ let inbox_sender = inbox_sender.clone(); move |context| async move { // Initialize listener let mut listener = context.bind(socket).await.unwrap(); // Continually accept new connections while let Ok((_, _, mut stream)) = listener.accept().await { // New connection accepted. Spawn a task for this connection context.with_label("receiver").spawn({ let mut inbox_sender = inbox_sender.clone(); move |_| async move { // Receive dialer's public key as a handshake let dialer = match recv_frame(&mut stream, max_size).await { Ok(data) => data, Err(_) => { error!("failed to receive public key from dialer"); return; } }; let Ok(dialer) = P::decode(dialer.as_ref()) else { error!("received public key is invalid"); return; }; // Continually receive messages from the dialer and send them to the inbox while let Ok(data) = recv_frame(&mut stream, max_size).await { let channel = Channel::from_be_bytes( data[..Channel::SIZE].try_into().unwrap(), ); let message = data.slice(Channel::SIZE..); if let Err(err) = inbox_sender .send((channel, (dialer.clone(), message))) .await { error!(?err, "failed to send message to mailbox"); break; } } } }); } } }); // Return peer Self { socket, control: control_sender, } } /// Register a channel with the peer. /// /// This allows the peer to receive messages sent to the channel. /// Returns a receiver that can be used to receive messages sent to the channel. async fn register(&mut self, channel: Channel) -> MessageReceiverResult

{ let (sender, receiver) = oneshot::channel(); self.control .send((channel, sender)) .await .map_err(|_| Error::NetworkClosed)?; receiver.await.map_err(|_| Error::NetworkClosed)? } } // A unidirectional link between two peers. // Messages can be sent over the link with a given latency, jitter, and success rate. struct Link { sampler: Normal, success_rate: f64, // Messages with their receive time for ordered delivery inbox: mpsc::UnboundedSender<(Channel, Bytes, SystemTime)>, } /// Buffered payload waiting for earlier messages on the same link to complete. impl Link { #[allow(clippy::too_many_arguments)] fn new( context: &mut E, dialer: P, receiver: P, socket: SocketAddr, sampler: Normal, success_rate: f64, max_size: usize, received_messages: Family, ) -> Self { // Spawn a task that will wait for messages to be sent to the link and then send them // over the network. let (inbox, mut outbox) = mpsc::unbounded::<(Channel, Bytes, SystemTime)>(); context.with_label("link").spawn(move |context| async move { // Dial the peer and handshake by sending it the dialer's public key let (mut sink, _) = context.dial(socket).await.unwrap(); if let Err(err) = send_frame(&mut sink, &dialer, max_size).await { error!(?err, "failed to send public key to listener"); return; } // Process messages in order, waiting for their receive time while let Some((channel, message, receive_complete_at)) = outbox.next().await { // Wait until the message should arrive at receiver context.sleep_until(receive_complete_at).await; // Send the message let mut data = bytes::BytesMut::with_capacity(Channel::SIZE + message.len()); data.extend_from_slice(&channel.to_be_bytes()); data.extend_from_slice(&message); let data = data.freeze(); send_frame(&mut sink, &data, max_size).await.unwrap(); // Bump received messages metric received_messages .get_or_create(&metrics::Message::new(&dialer, &receiver, channel)) .inc(); } }); Self { sampler, success_rate, inbox, } } // Send a message over the link with receive timing. fn send( &mut self, channel: Channel, message: Bytes, receive_complete_at: SystemTime, ) -> Result<(), Error> { self.inbox .unbounded_send((channel, message, receive_complete_at)) .map_err(|_| Error::NetworkClosed)?; Ok(()) } } #[cfg(test)] mod tests { use super::*; use crate::{Manager, Receiver as _, Recipients, Sender as _}; use bytes::Bytes; use commonware_cryptography::{ed25519, PrivateKeyExt as _, Signer as _}; use commonware_runtime::{deterministic, Runner as _}; const MAX_MESSAGE_SIZE: usize = 1024 * 1024; #[test] fn test_register_and_link() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { max_size: MAX_MESSAGE_SIZE, disconnect_on_block: true, tracked_peer_sets: Some(3), }; let network_context = context.with_label("network"); let (network, mut oracle) = Network::new(network_context.clone(), cfg); network_context.spawn(|_| network.run()); // Create two public keys let pk1 = ed25519::PrivateKey::from_seed(1).public_key(); let pk2 = ed25519::PrivateKey::from_seed(2).public_key(); // Register the peer set oracle.update(0, [pk1.clone(), pk2.clone()].into()).await; let mut control = oracle.control(pk1.clone()); control.register(0).await.unwrap(); control.register(1).await.unwrap(); let mut control = oracle.control(pk2.clone()); control.register(0).await.unwrap(); control.register(1).await.unwrap(); // Expect error when registering again assert!(matches!( control.register(1).await, Err(Error::ChannelAlreadyRegistered(_)) )); // Add link let link = ingress::Link { latency: Duration::from_millis(2), jitter: Duration::from_millis(1), success_rate: 0.9, }; oracle .add_link(pk1.clone(), pk2.clone(), link.clone()) .await .unwrap(); // Expect error when adding link again assert!(matches!( oracle.add_link(pk1, pk2, link).await, Err(Error::LinkExists) )); }); } #[test] fn test_get_next_socket() { let cfg = Config { max_size: MAX_MESSAGE_SIZE, disconnect_on_block: true, tracked_peer_sets: None, }; let runner = deterministic::Runner::default(); runner.start(|context| async move { type PublicKey = ed25519::PublicKey; let (mut network, _) = Network::::new(context.clone(), cfg); // Test that the next socket address is incremented correctly let mut original = network.next_addr; let next = network.get_next_socket(); assert_eq!(next, original); let next = network.get_next_socket(); original.set_port(1); assert_eq!(next, original); // Test that the port number overflows correctly let max_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 0, 255, 255)), 65535); network.next_addr = max_addr; let next = network.get_next_socket(); assert_eq!(next, max_addr); let next = network.get_next_socket(); assert_eq!( next, SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 1, 0, 0)), 0) ); }); } #[test] fn test_fifo_burst_same_recipient() { let cfg = Config { max_size: MAX_MESSAGE_SIZE, disconnect_on_block: true, tracked_peer_sets: Some(3), }; let runner = deterministic::Runner::default(); runner.start(|context| async move { let (network, mut oracle) = Network::new(context.with_label("network"), cfg); let network_handle = network.start(); let sender_pk = ed25519::PrivateKey::from_seed(10).public_key(); let recipient_pk = ed25519::PrivateKey::from_seed(11).public_key(); oracle .update(0, [sender_pk.clone(), recipient_pk.clone()].into()) .await; let (mut sender, _sender_recv) = oracle.control(sender_pk.clone()).register(0).await.unwrap(); let (_sender2, mut receiver) = oracle .control(recipient_pk.clone()) .register(0) .await .unwrap(); oracle .limit_bandwidth(sender_pk.clone(), Some(5_000), None) .await .unwrap(); oracle .limit_bandwidth(recipient_pk.clone(), None, Some(5_000)) .await .unwrap(); oracle .add_link( sender_pk.clone(), recipient_pk.clone(), ingress::Link { latency: Duration::from_millis(0), jitter: Duration::from_millis(0), success_rate: 1.0, }, ) .await .unwrap(); const COUNT: usize = 50; let mut expected = Vec::with_capacity(COUNT); for i in 0..COUNT { let msg = Bytes::from(vec![i as u8; 64]); sender .send(Recipients::One(recipient_pk.clone()), msg.clone(), false) .await .unwrap(); expected.push(msg); } for expected_msg in expected { let (_pk, bytes) = receiver.recv().await.unwrap(); assert_eq!(bytes, expected_msg); } drop(oracle); drop(sender); network_handle.abort(); }); } #[test] fn test_broadcast_respects_transmit_latency() { let cfg = Config { max_size: MAX_MESSAGE_SIZE, disconnect_on_block: true, tracked_peer_sets: Some(3), }; let runner = deterministic::Runner::default(); runner.start(|context| async move { let (network, mut oracle) = Network::new(context.with_label("network"), cfg); let network_handle = network.start(); let sender_pk = ed25519::PrivateKey::from_seed(42).public_key(); let recipient_a = ed25519::PrivateKey::from_seed(43).public_key(); let recipient_b = ed25519::PrivateKey::from_seed(44).public_key(); oracle .update( 0, [sender_pk.clone(), recipient_a.clone(), recipient_b.clone()].into(), ) .await; let (mut sender, _recv_sender) = oracle.control(sender_pk.clone()).register(0).await.unwrap(); let (_sender2, mut recv_a) = oracle .control(recipient_a.clone()) .register(0) .await .unwrap(); let (_sender3, mut recv_b) = oracle .control(recipient_b.clone()) .register(0) .await .unwrap(); oracle .limit_bandwidth(sender_pk.clone(), Some(1_000), None) .await .unwrap(); oracle .limit_bandwidth(recipient_a.clone(), None, Some(1_000)) .await .unwrap(); oracle .limit_bandwidth(recipient_b.clone(), None, Some(1_000)) .await .unwrap(); let link = ingress::Link { latency: Duration::from_millis(0), jitter: Duration::from_millis(0), success_rate: 1.0, }; oracle .add_link(sender_pk.clone(), recipient_a.clone(), link.clone()) .await .unwrap(); oracle .add_link(sender_pk.clone(), recipient_b.clone(), link) .await .unwrap(); let big_msg = Bytes::from(vec![7u8; 10_000]); let start = context.current(); sender .send(Recipients::All, big_msg.clone(), false) .await .unwrap(); let (_pk, received_a) = recv_a.recv().await.unwrap(); assert_eq!(received_a, big_msg); let elapsed_a = context.current().duration_since(start).unwrap(); assert!(elapsed_a >= Duration::from_secs(20)); let (_pk, received_b) = recv_b.recv().await.unwrap(); assert_eq!(received_b, big_msg); let elapsed_b = context.current().duration_since(start).unwrap(); assert!(elapsed_b >= Duration::from_secs(20)); // Because bandwidth is shared, the two messages should take about the same time assert!(elapsed_a.abs_diff(elapsed_b) <= Duration::from_secs(1)); drop(oracle); drop(sender); network_handle.abort(); }); } }