//! Implementation of a simulated p2p network.
use super::{
ingress::{self, Oracle},
metrics,
transmitter::{self, Completion},
Error,
};
use crate::{
utils::limited::{CheckedSender as LimitedCheckedSender, Connected, LimitedSender},
Channel, Message, Recipients, UnlimitedSender as _,
};
use bytes::Bytes;
use commonware_codec::{DecodeExt, FixedSize};
use commonware_cryptography::PublicKey;
use commonware_macros::{select, select_loop};
use commonware_runtime::{
spawn_cell, Clock, ContextCell, Handle, Listener as _, Metrics, Network as RNetwork, Quota,
Spawner,
};
use commonware_stream::utils::codec::{recv_frame, send_frame};
use commonware_utils::{channels::ring, ordered::Set, NZUsize, TryCollect};
use either::Either;
use futures::{
channel::{mpsc, oneshot},
future, SinkExt, StreamExt,
};
use prometheus_client::metrics::{counter::Counter, family::Family};
use rand::Rng;
use rand_distr::{Distribution, Normal};
use std::{
collections::{BTreeMap, HashMap, HashSet},
fmt::Debug,
net::{IpAddr, Ipv4Addr, SocketAddr},
time::{Duration, SystemTime},
};
use tracing::{debug, error, trace, warn};
/// Task type representing a message to be sent within the network.
type Task
= (Channel, P, Recipients
, Bytes, oneshot::Sender>);
/// Target for a message in a split receiver.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[must_use]
pub enum SplitTarget {
None,
Primary,
Secondary,
Both,
}
/// Origin of a message in a split sender.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[must_use]
pub enum SplitOrigin {
Primary,
Secondary,
}
/// A function that forwards messages from [SplitOrigin] to [Recipients].
pub trait SplitForwarder:
Fn(SplitOrigin, &Recipients, &Bytes) -> Option> + Send + Sync + Clone + 'static
{
}
impl SplitForwarder for F where
F: Fn(SplitOrigin, &Recipients
, &Bytes) -> Option>
+ Send
+ Sync
+ Clone
+ 'static
{
}
/// A function that routes incoming [Message]s to a [SplitTarget].
pub trait SplitRouter:
Fn(&Message) -> SplitTarget + Send + Sync + 'static
{
}
impl SplitRouter for F where
F: Fn(&Message
) -> SplitTarget + Send + Sync + 'static
{
}
/// Configuration for the simulated network.
pub struct Config {
/// Maximum size of a message that can be sent over the network.
pub max_size: u32,
/// True if peers should disconnect upon being blocked. While production networking would
/// typically disconnect, for testing purposes it may be useful to keep peers connected,
/// allowing byzantine actors the ability to continue sending messages.
pub disconnect_on_block: bool,
/// The maximum number of peer sets to track. When a new peer set is registered and this
/// limit is exceeded, the oldest peer set is removed. Peers that are no longer in any
/// tracked peer set will have their links removed and messages to them will be dropped.
///
/// If [None], peer sets are not considered.
pub tracked_peer_sets: Option,
}
/// Implementation of a simulated network.
pub struct Network {
context: ContextCell,
// Maximum size of a message that can be sent over the network
max_size: u32,
// True if peers should disconnect upon being blocked.
// While production networking would typically disconnect, for testing purposes it may be useful
// to keep peers connected, allowing byzantine actors the ability to continue sending messages.
disconnect_on_block: bool,
// Next socket address to assign to a new peer
// Incremented for each new peer
next_addr: SocketAddr,
// Channel to receive messages from the oracle
ingress: mpsc::UnboundedReceiver>,
// Sender to the oracle channel (passed to Senders for PeerSource subscriptions)
oracle_sender: mpsc::UnboundedSender>,
// A channel to receive tasks from peers
// The sender is cloned and given to each peer
// The receiver is polled in the main loop
sender: mpsc::UnboundedSender>,
receiver: mpsc::UnboundedReceiver>,
// A map from a pair of public keys (from, to) to a link between the two peers
links: HashMap<(P, P), Link>,
// A map from a public key to a peer
peers: BTreeMap>,
// Peer sets indexed by their ID
peer_sets: BTreeMap>,
// Reference count for each peer (number of peer sets they belong to)
peer_refs: BTreeMap,
// Maximum number of peer sets to track
tracked_peer_sets: Option,
// A map of peers blocking each other
blocks: HashSet<(P, P)>,
// State of the transmitter
transmitter: transmitter::State,
// Subscribers to peer set updates (used by Manager::subscribe())
#[allow(clippy::type_complexity)]
subscribers: Vec, Set)>>,
// Subscribers to tracked peer list updates (used by PeerSource for LimitedSender)
peer_subscribers: Vec>>,
// Metrics for received and sent messages
received_messages: Family,
sent_messages: Family,
}
impl Network {
/// Create a new simulated network with a given runtime and configuration.
///
/// Returns a tuple containing the network instance and the oracle that can
/// be used to modify the state of the network during context.
pub fn new(mut context: E, cfg: Config) -> (Self, Oracle) {
let (sender, receiver) = mpsc::unbounded();
let (oracle_sender, oracle_receiver) = mpsc::unbounded();
let sent_messages = Family::::default();
let received_messages = Family::::default();
context.register("messages_sent", "messages sent", sent_messages.clone());
context.register(
"messages_received",
"messages received",
received_messages.clone(),
);
// Start with a pseudo-random IP address to assign sockets to for new peers
let next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(context.next_u32())), 0);
(
Self {
context: ContextCell::new(context),
max_size: cfg.max_size,
disconnect_on_block: cfg.disconnect_on_block,
tracked_peer_sets: cfg.tracked_peer_sets,
next_addr,
ingress: oracle_receiver,
oracle_sender: oracle_sender.clone(),
sender,
receiver,
links: HashMap::new(),
peers: BTreeMap::new(),
peer_sets: BTreeMap::new(),
peer_refs: BTreeMap::new(),
blocks: HashSet::new(),
transmitter: transmitter::State::new(),
subscribers: Vec::new(),
peer_subscribers: Vec::new(),
received_messages,
sent_messages,
},
Oracle::new(oracle_sender),
)
}
/// Returns (and increments) the next available socket address.
///
/// The port number is incremented for each call, and the IP address is incremented if the port
/// number overflows.
fn get_next_socket(&mut self) -> SocketAddr {
let result = self.next_addr;
// Increment the port number, or the IP address if the port number overflows.
// Allows the ip address to overflow (wrapping).
match self.next_addr.port().checked_add(1) {
Some(port) => {
self.next_addr.set_port(port);
}
None => {
let ip = match self.next_addr.ip() {
IpAddr::V4(ipv4) => ipv4,
_ => unreachable!(),
};
let next_ip = Ipv4Addr::to_bits(ip).wrapping_add(1);
self.next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(next_ip)), 0);
}
}
result
}
/// Handle an ingress message.
///
/// This method is called when a message is received from the oracle.
async fn handle_ingress(&mut self, message: ingress::Message) {
// It is important to ensure that no failed receipt of a message will cause us to exit.
// This could happen if the caller drops the `Oracle` after updating the network topology.
// Thus, we create a helper function to send the result to the oracle and log any errors.
fn send_result(
result: oneshot::Sender>,
value: Result,
) {
let success = value.is_ok();
if let Err(e) = result.send(value) {
error!(?e, "failed to send result to oracle (ok = {})", success);
}
}
match message {
ingress::Message::Update { id, peers } => {
let Some(tracked_peer_sets) = self.tracked_peer_sets else {
warn!("attempted to register peer set when tracking is disabled");
return;
};
// Check if peer set already exists
if self.peer_sets.contains_key(&id) {
warn!(id, "peer set already exists");
return;
}
// Ensure that peer set is monotonically increasing
if let Some((last, _)) = self.peer_sets.last_key_value() {
if id <= *last {
warn!(
new_id = id,
old_id = last,
"attempted to register peer set with non-monotonically increasing ID"
);
return;
}
}
// Create and store new peer set
for public_key in peers.iter() {
// Create peer if it doesn't exist
self.ensure_peer_exists(public_key).await;
// Increment reference count
*self.peer_refs.entry(public_key.clone()).or_insert(0) += 1;
}
self.peer_sets.insert(id, peers.clone());
// Remove oldest peer set if we exceed the limit
while self.peer_sets.len() > tracked_peer_sets {
let (id, set) = self.peer_sets.pop_first().unwrap();
debug!(id, "removed oldest peer set");
// Decrement reference counts and clean up peers/links
for public_key in set.iter() {
let refs = self.peer_refs.get_mut(public_key).unwrap();
*refs = refs.checked_sub(1).expect("reference count underflow");
// If peer is no longer in any tracked set, remove it. We explicitly keep the peer around
// in `self.peers` to keep its network alive, in-case the peer re-joins in a future peer set.
if *refs == 0 {
self.peer_refs.remove(public_key);
debug!(?public_key, "removed peer no longer in any tracked set");
}
}
}
// Notify all subscribers about the new peer set
let all = self.all_tracked_peers();
let notification = (id, peers, all);
self.subscribers
.retain(|subscriber| subscriber.unbounded_send(notification.clone()).is_ok());
// Broadcast updated peer list to LimitedSender subscribers
self.broadcast_peer_list().await;
}
ingress::Message::Register {
channel,
public_key,
quota,
result,
} => {
// If peer does not exist, then create it.
let (_, is_new) = self.ensure_peer_exists(&public_key).await;
// When not using peer sets, broadcast updated peer list to subscribers
if is_new && self.peer_sets.is_empty() {
self.broadcast_peer_list().await;
}
// Get clock for the rate limiter
let clock = self
.context
.with_label(&format!("rate_limiter_{channel}_{public_key}"))
.take();
// Create a sender that allows sending messages to the network for a certain channel
let (sender, handle) = Sender::new(
self.context.with_label("sender"),
public_key.clone(),
channel,
self.max_size,
self.sender.clone(),
self.oracle_sender.clone(),
clock,
quota,
);
// Create a receiver that allows receiving messages from the network for a certain channel
let peer = self.peers.get_mut(&public_key).unwrap();
let receiver = match peer.register(channel, handle).await {
Ok(receiver) => Receiver { receiver },
Err(err) => return send_result(result, Err(err)),
};
send_result(result, Ok((sender, receiver)))
}
ingress::Message::PeerSet { id, response } => {
if self.peer_sets.is_empty() {
// Return all peers if no peer sets are registered.
let _ = response.send(Some(
self.peers
.keys()
.cloned()
.try_collect()
.expect("BTreeMap keys are unique"),
));
} else {
// Return the peer set at the given index
let _ = response.send(self.peer_sets.get(&id).cloned());
}
}
ingress::Message::Subscribe { sender } => {
// Send the latest peer set upon subscription
if let Some((index, peers)) = self.peer_sets.last_key_value() {
let all = self.all_tracked_peers();
let notification = (*index, peers.clone(), all);
let _ = sender.unbounded_send(notification);
}
self.subscribers.push(sender);
}
ingress::Message::SubscribeConnected { response } => {
// Create a ring channel for the subscriber
let (mut sender, receiver) = ring::channel(NZUsize!(1));
// Send current peer list immediately
let peer_list: Vec = self.all_tracked_peers().into_iter().collect();
let _ = sender.send(peer_list).await;
// Store sender for future broadcasts
self.peer_subscribers.push(sender);
// Return the receiver to the subscriber
let _ = response.send(receiver);
}
ingress::Message::LimitBandwidth {
public_key,
egress_cap,
ingress_cap,
result,
} => {
// If peer does not exist, then create it.
let (_, is_new) = self.ensure_peer_exists(&public_key).await;
// When not using peer sets, broadcast updated peer list to subscribers
if is_new && self.peer_sets.is_empty() {
self.broadcast_peer_list().await;
}
// Update bandwidth limits
let now = self.context.current();
let completions = self
.transmitter
.limit(now, &public_key, egress_cap, ingress_cap);
self.process_completions(completions);
// Notify the caller that the bandwidth update has been applied
let _ = result.send(());
}
ingress::Message::AddLink {
sender,
receiver,
sampler,
success_rate,
result,
} => {
// If sender or receiver does not exist, then create it.
let (_, sender_is_new) = self.ensure_peer_exists(&sender).await;
let (receiver_socket, receiver_is_new) = self.ensure_peer_exists(&receiver).await;
// When not using peer sets, broadcast updated peer list to subscribers
if (sender_is_new || receiver_is_new) && self.peer_sets.is_empty() {
self.broadcast_peer_list().await;
}
// Require link to not already exist
let key = (sender.clone(), receiver.clone());
if self.links.contains_key(&key) {
return send_result(result, Err(Error::LinkExists));
}
let link = Link::new(
&mut self.context,
sender,
receiver,
receiver_socket,
sampler,
success_rate,
self.max_size,
self.received_messages.clone(),
);
self.links.insert(key, link);
send_result(result, Ok(()))
}
ingress::Message::RemoveLink {
sender,
receiver,
result,
} => {
match self.links.remove(&(sender, receiver)) {
Some(_) => (),
None => return send_result(result, Err(Error::LinkMissing)),
}
send_result(result, Ok(()))
}
ingress::Message::Block { from, to } => {
self.blocks.insert((from, to));
}
ingress::Message::Blocked { result } => {
send_result(result, Ok(self.blocks.iter().cloned().collect()))
}
}
}
/// Ensure a peer exists, creating it if necessary.
///
/// Returns the socket address of the peer and a boolean indicating if a new peer was created.
async fn ensure_peer_exists(&mut self, public_key: &P) -> (SocketAddr, bool) {
if !self.peers.contains_key(public_key) {
// Create peer
let socket = self.get_next_socket();
let peer = Peer::new(
self.context.with_label("peer"),
public_key.clone(),
socket,
self.max_size,
)
.await;
// Once ready, add to peers
self.peers.insert(public_key.clone(), peer);
(socket, true)
} else {
(self.peers.get(public_key).unwrap().socket, false)
}
}
/// Broadcast updated peer list to all peer subscribers.
///
/// This is called when the peer list changes (either from peer set updates
/// or from new peers being added when not using peer sets).
///
/// Subscribers whose receivers have been dropped are removed to prevent
/// memory leaks.
async fn broadcast_peer_list(&mut self) {
let peer_list = self.all_tracked_peers().into_iter().collect::>();
let mut live_subscribers = Vec::with_capacity(self.peer_subscribers.len());
for mut subscriber in self.peer_subscribers.drain(..) {
if subscriber.send(peer_list.clone()).await.is_ok() {
live_subscribers.push(subscriber);
}
}
self.peer_subscribers = live_subscribers;
}
/// Get all tracked peers as an ordered set.
///
/// When peer sets are registered, returns only the peers from those sets.
/// Otherwise, returns all registered peers (for compatibility with tests
/// that don't use peer sets).
fn all_tracked_peers(&self) -> Set {
if self.peer_sets.is_empty() && self.tracked_peer_sets.is_none() {
self.peers
.keys()
.cloned()
.try_collect()
.expect("BTreeMap keys are unique")
} else {
self.peer_refs
.keys()
.cloned()
.try_collect()
.expect("BTreeMap keys are unique")
}
}
}
impl Network {
/// Process completions from the transmitter.
fn process_completions(&mut self, completions: Vec>) {
for completion in completions {
// If there is no message to deliver, then skip
let Some(deliver_at) = completion.deliver_at else {
trace!(
origin = ?completion.origin,
recipient = ?completion.recipient,
"message dropped before delivery",
);
continue;
};
// Send message to link
let key = (completion.origin.clone(), completion.recipient.clone());
let Some(link) = self.links.get_mut(&key) else {
// This can happen if the link is removed before the message is delivered
trace!(
origin = ?completion.origin,
recipient = ?completion.recipient,
"missing link for completion",
);
continue;
};
if let Err(err) = link.send(completion.channel, completion.message, deliver_at) {
error!(?err, "failed to send");
}
}
}
/// Handle a task.
///
/// This method is called when a task is received from the sender, which can come from
/// any peer in the network.
fn handle_task(&mut self, task: Task) {
// If peer sets are enabled and we are not in one, ignore the message (we are disconnected from all)
let (channel, origin, recipients, message, reply) = task;
if self.tracked_peer_sets.is_some() && !self.peer_refs.contains_key(&origin) {
warn!(
?origin,
reason = "not in tracked peer set",
"dropping message"
);
if let Err(err) = reply.send(Vec::new()) {
error!(?err, "failed to send ack");
}
return;
}
// Collect recipients
let recipients = match recipients {
Recipients::All => {
// If peer sets have been registered, send only to tracked peers
// Otherwise, send to all registered peers (compatibility
// with tests that do not register peer sets.)
if self.peer_sets.is_empty() {
self.peers.keys().cloned().collect()
} else {
self.peer_refs.keys().cloned().collect()
}
}
Recipients::Some(keys) => keys,
Recipients::One(key) => vec![key],
};
// Send to all recipients
let now = self.context.current();
let mut sent = Vec::new();
for recipient in recipients {
// Skip self
if recipient == origin {
trace!(?recipient, reason = "self", "dropping message");
continue;
}
// If tracking peer sets, ensure recipient and sender are in a tracked peer set
if self.tracked_peer_sets.is_some() && !self.peer_refs.contains_key(&recipient) {
trace!(
?origin,
?recipient,
reason = "not in tracked peer set",
"dropping message"
);
continue;
}
// Determine if the sender or recipient has blocked the other
let o_r = (origin.clone(), recipient.clone());
let r_o = (recipient.clone(), origin.clone());
if self.disconnect_on_block
&& (self.blocks.contains(&o_r) || self.blocks.contains(&r_o))
{
trace!(?origin, ?recipient, reason = "blocked", "dropping message");
continue;
}
// Determine if there is a link between the origin and recipient
let Some(link) = self.links.get_mut(&o_r) else {
trace!(?origin, ?recipient, reason = "no link", "dropping message");
continue;
};
// Note: Rate limiting is handled by the Sender before messages reach here.
// The Sender filters recipients via LimitedSender::check() or in Sender::send().
// Record sent message as soon as we determine there is a link with recipient (approximates
// having an open connection)
self.sent_messages
.get_or_create(&metrics::Message::new(&origin, &recipient, channel))
.inc();
// Sample latency
let latency = Duration::from_millis(link.sampler.sample(&mut self.context) as u64);
// Determine if the message should be delivered
let should_deliver = self.context.gen_bool(link.success_rate);
// Enqueue message for delivery
let completions = self.transmitter.enqueue(
now,
origin.clone(),
recipient.clone(),
channel,
message.clone(),
latency,
should_deliver,
);
self.process_completions(completions);
sent.push(recipient);
}
// Alert application of sent messages
if let Err(err) = reply.send(sent) {
error!(?err, "failed to send ack");
}
}
/// Run the simulated network.
///
/// It is not necessary to invoke this method before modifying the network topology, however,
/// no messages will be sent until this method is called.
pub fn start(mut self) -> Handle<()> {
spawn_cell!(self.context, self.run().await)
}
async fn run(mut self) {
loop {
let tick = match self.transmitter.next() {
Some(when) => Either::Left(self.context.sleep_until(when)),
None => Either::Right(future::pending()),
};
select! {
_ = tick => {
let now = self.context.current();
let completions = self.transmitter.advance(now);
self.process_completions(completions);
},
message = self.ingress.next() => {
// If ingress is closed, exit
let message = match message {
Some(message) => message,
None => break,
};
self.handle_ingress(message).await;
},
task = self.receiver.next() => {
// If receiver is closed, exit
let task = match task {
Some(task) => task,
None => break,
};
self.handle_task(task);
},
}
}
}
}
/// Provides online peers from the simulated network.
///
/// Implements [`crate::utils::limited::Connected`] to provide peer list updates
/// to [`crate::utils::limited::LimitedSender`].
pub struct ConnectedPeerProvider {
sender: mpsc::UnboundedSender>,
}
impl Clone for ConnectedPeerProvider {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
}
}
}
impl ConnectedPeerProvider {
const fn new(sender: mpsc::UnboundedSender>) -> Self {
Self { sender }
}
}
impl Connected for ConnectedPeerProvider {
type PublicKey = P;
async fn subscribe(&mut self) -> ring::Receiver> {
let (response_tx, response_rx) = oneshot::channel();
let _ = self
.sender
.unbounded_send(ingress::Message::SubscribeConnected {
response: response_tx,
});
// If the network is closed, return an empty receiver
response_rx.await.unwrap_or_else(|_| {
let (_sender, receiver) = ring::channel(NZUsize!(1));
receiver
})
}
}
/// Implementation of a [crate::Sender] for the simulated network without rate limiting.
///
/// This is the inner sender used by [`Sender`] which wraps it with rate limiting.
#[derive(Clone)]
pub struct UnlimitedSender {
me: P,
channel: Channel,
max_size: u32,
high: mpsc::UnboundedSender>,
low: mpsc::UnboundedSender>,
}
impl crate::UnlimitedSender for UnlimitedSender {
type Error = Error;
type PublicKey = P;
async fn send(
&mut self,
recipients: Recipients
,
message: Bytes,
priority: bool,
) -> Result, Error> {
// Check message size
if message.len() > self.max_size as usize {
return Err(Error::MessageTooLarge(message.len()));
}
// Send message
let (sender, receiver) = oneshot::channel();
let channel = if priority { &self.high } else { &self.low };
channel
.unbounded_send((self.channel, self.me.clone(), recipients, message, sender))
.map_err(|_| Error::NetworkClosed)?;
receiver.await.map_err(|_| Error::NetworkClosed)
}
}
/// Implementation of a [crate::Sender] for the simulated network.
///
/// Also implements [crate::LimitedSender] to support rate-limit checking
/// before sending messages.
pub struct Sender {
limited_sender: LimitedSender, ConnectedPeerProvider>,
}
impl Clone for Sender {
fn clone(&self) -> Self {
Self {
limited_sender: self.limited_sender.clone(),
}
}
}
impl Debug for Sender {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Sender").finish_non_exhaustive()
}
}
impl Sender {
#[allow(clippy::too_many_arguments)]
fn new(
context: impl Spawner + Metrics,
me: P,
channel: Channel,
max_size: u32,
mut sender: mpsc::UnboundedSender>,
oracle_sender: mpsc::UnboundedSender>,
clock: E,
quota: Quota,
) -> (Self, Handle<()>) {
// Listen for messages
let (high, mut high_receiver) = mpsc::unbounded();
let (low, mut low_receiver) = mpsc::unbounded();
let processor = context.with_label("processor").spawn(move |_| async move {
loop {
// Wait for task
let task;
select! {
high_task = high_receiver.next() => {
task = match high_task {
Some(task) => task,
None => break,
};
},
low_task = low_receiver.next() => {
task = match low_task {
Some(task) => task,
None => break,
};
}
}
// Send task
if let Err(err) = sender.send(task).await {
error!(?err, channel, "failed to send task");
}
}
});
let unlimited_sender = UnlimitedSender {
me,
channel,
max_size,
high,
low,
};
let peer_source = ConnectedPeerProvider::new(oracle_sender);
let limited_sender = LimitedSender::new(unlimited_sender, quota, clock, peer_source);
(Self { limited_sender }, processor)
}
/// Split this [Sender] into a [SplitOrigin::Primary] and [SplitOrigin::Secondary] sender.
pub fn split_with>(
self,
forwarder: F,
) -> (SplitSender, SplitSender
) {
(
SplitSender {
replica: SplitOrigin::Primary,
inner: self.clone(),
forwarder: forwarder.clone(),
},
SplitSender {
replica: SplitOrigin::Secondary,
inner: self,
forwarder,
},
)
}
}
impl crate::LimitedSender for Sender {
type PublicKey = P;
type Checked<'a>
= crate::utils::limited::CheckedSender<'a, UnlimitedSender
>
where
Self: 'a;
async fn check(
&mut self,
recipients: Recipients,
) -> Result, SystemTime> {
self.limited_sender.check(recipients).await
}
}
/// A sender that routes recipients per message via a user-provided function.
pub struct SplitSender> {
replica: SplitOrigin,
inner: Sender,
forwarder: F,
}
impl> Clone for SplitSender {
fn clone(&self) -> Self {
Self {
replica: self.replica,
inner: self.inner.clone(),
forwarder: self.forwarder.clone(),
}
}
}
impl> std::fmt::Debug for SplitSender {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SplitSender")
.field("replica", &self.replica)
.field("inner", &self.inner)
.finish()
}
}
impl> crate::LimitedSender for SplitSender {
type PublicKey = P;
type Checked<'a> = SplitCheckedSender<'a, P, E, F>;
async fn check(
&mut self,
recipients: Recipients,
) -> Result, SystemTime> {
Ok(SplitCheckedSender {
// Perform a rate limit check with the entire set of original recipients although
// the forwarder may filter these (based on message content) during send.
checked: self.inner.limited_sender.check(recipients.clone()).await?,
replica: self.replica,
forwarder: self.forwarder.clone(),
recipients,
_phantom: std::marker::PhantomData,
})
}
}
/// A checked sender for [`SplitSender`] that defers the forwarder call to send time.
///
/// This is necessary because [`SplitForwarder`] may examine message content to determine
/// routing, but the message is not available at [`LimitedSender::check`] time.
pub struct SplitCheckedSender<'a, P: PublicKey, E: Clock, F: SplitForwarder> {
checked: LimitedCheckedSender<'a, UnlimitedSender
>,
replica: SplitOrigin,
forwarder: F,
recipients: Recipients
,
_phantom: std::marker::PhantomData,
}
impl<'a, P: PublicKey, E: Clock, F: SplitForwarder> crate::CheckedSender
for SplitCheckedSender<'a, P, E, F>
{
type PublicKey = P;
type Error = Error;
async fn send(
self,
message: Bytes,
priority: bool,
) -> Result, Self::Error> {
// Determine the set of recipients that will receive the message
let Some(recipients) = (self.forwarder)(self.replica, &self.recipients, &message) else {
return Ok(Vec::new());
};
// Extract the inner sender and send directly with the new recipients
//
// While SplitForwarder does not enforce any relationship between the original recipients
// and the new recipients, it is typically some subset of the original recipients. This
// means we may over-rate limit some recipients (who are never actually sent a message here) but
// we prefer this to not providing feedback at all (we would have to skip check entirely).
self.checked
.into_inner()
.send(recipients, message, priority)
.await
}
}
type MessageReceiver = mpsc::UnboundedReceiver>;
/// Implementation of a [crate::Receiver] for the simulated network.
#[derive(Debug)]
pub struct Receiver {
receiver: MessageReceiver,
}
impl crate::Receiver for Receiver {
type Error = Error;
type PublicKey = P;
async fn recv(&mut self) -> Result, Error> {
self.receiver.next().await.ok_or(Error::NetworkClosed)
}
}
impl Receiver {
/// Split this [Receiver] into a [SplitTarget::Primary] and [SplitTarget::Secondary] receiver.
pub fn split_with>(
mut self,
context: E,
router: R,
) -> (Self, Self) {
let (mut primary_tx, primary_rx) = mpsc::unbounded();
let (mut secondary_tx, secondary_rx) = mpsc::unbounded();
context.spawn(move |_| async move {
while let Some(message) = self.receiver.next().await {
// Route message to the appropriate target
let direction = router(&message);
match direction {
SplitTarget::None => {}
SplitTarget::Primary => {
if let Err(err) = primary_tx.send(message).await {
error!(?err, "failed to send message to primary");
}
}
SplitTarget::Secondary => {
if let Err(err) = secondary_tx.send(message).await {
error!(?err, "failed to send message to secondary");
}
}
SplitTarget::Both => {
if let Err(err) = primary_tx.send(message.clone()).await {
error!(?err, "failed to send message to primary");
}
if let Err(err) = secondary_tx.send(message).await {
error!(?err, "failed to send message to secondary");
}
}
}
// Exit if both channels are closed
if primary_tx.is_closed() && secondary_tx.is_closed() {
break;
}
}
});
(
Self {
receiver: primary_rx,
},
Self {
receiver: secondary_rx,
},
)
}
}
/// A peer in the simulated network.
///
/// The peer can register channels, which allows it to receive messages sent to the channel from other peers.
struct Peer {
// Socket address that the peer is listening on
socket: SocketAddr,
// Control to register new channels
control: mpsc::UnboundedSender<(Channel, Handle<()>, oneshot::Sender>)>,
}
impl Peer {
/// Create and return a new peer.
///
/// The peer will listen for incoming connections on the given `socket` address.
/// `max_size` is the maximum size of a message that can be sent to the peer.
async fn new(
context: E,
public_key: P,
socket: SocketAddr,
max_size: u32,
) -> Self {
// The control is used to register channels.
// There is exactly one mailbox created for each channel that the peer is registered for.
let (control_sender, mut control_receiver) = mpsc::unbounded();
// Whenever a message is received from a peer, it is placed in the inbox.
// The router polls the inbox and forwards the message to the appropriate mailbox.
let (inbox_sender, mut inbox_receiver) = mpsc::unbounded();
// Spawn router
context.with_label("router").spawn(|context| async move {
// Map of channels to mailboxes (senders to particular channels)
let mut mailboxes = HashMap::new();
// Continually listen for control messages and outbound messages
select_loop! {
context,
on_stopped => {},
// Listen for control messages, which are used to register channels
control = control_receiver.next() => {
// If control is closed, exit
let (channel, sender, result_tx): (Channel, Handle<()>, oneshot::Sender>) = match control {
Some(control) => control,
None => break,
};
// Register channel
let (receiver_tx, receiver_rx) = mpsc::unbounded();
if let Some((_, existing_sender)) = mailboxes.insert(channel, (receiver_tx, sender)) {
warn!(?public_key, ?channel, "overwriting existing channel");
existing_sender.abort();
}
result_tx.send(receiver_rx).unwrap();
},
// Listen for messages from the inbox, which are forwarded to the appropriate mailbox
inbox = inbox_receiver.next() => {
// If inbox is closed, exit
let (channel, message) = match inbox {
Some(message) => message,
None => break,
};
// Send message to mailbox
match mailboxes.get_mut(&channel) {
Some((receiver_tx, _)) => {
if let Err(err) = receiver_tx.send(message).await {
debug!(?err, "failed to send message to mailbox");
}
}
None => {
trace!(
recipient = ?public_key,
channel,
reason = "missing channel",
"dropping message",
);
}
}
},
}
});
// Spawn a task that accepts new connections and spawns a task for each connection
let (ready_tx, ready_rx) = oneshot::channel();
context
.with_label("listener")
.spawn(move |context| async move {
// Initialize listener
let mut listener = context.bind(socket).await.unwrap();
let _ = ready_tx.send(());
// Continually accept new connections
while let Ok((_, _, mut stream)) = listener.accept().await {
// New connection accepted. Spawn a task for this connection
context.with_label("receiver").spawn({
let mut inbox_sender = inbox_sender.clone();
move |_| async move {
// Receive dialer's public key as a handshake
let dialer = match recv_frame(&mut stream, max_size).await {
Ok(data) => data,
Err(_) => {
error!("failed to receive public key from dialer");
return;
}
};
let Ok(dialer) = P::decode(dialer.as_ref()) else {
error!("received public key is invalid");
return;
};
// Continually receive messages from the dialer and send them to the inbox
while let Ok(data) = recv_frame(&mut stream, max_size).await {
let channel = Channel::from_be_bytes(
data[..Channel::SIZE].try_into().unwrap(),
);
let message = data.slice(Channel::SIZE..);
if let Err(err) = inbox_sender
.send((channel, (dialer.clone(), message)))
.await
{
debug!(?err, "failed to send message to mailbox");
break;
}
}
}
});
}
});
// Wait for listener to start before returning
let _ = ready_rx.await;
// Return peer
Self {
socket,
control: control_sender,
}
}
/// Register a channel with the peer.
///
/// This allows the peer to receive messages sent to the channel.
/// Returns a receiver that can be used to receive messages sent to the channel.
async fn register(
&mut self,
channel: Channel,
sender: Handle<()>,
) -> Result, Error> {
let (result_tx, result_rx) = oneshot::channel();
self.control
.send((channel, sender, result_tx))
.await
.map_err(|_| Error::NetworkClosed)?;
result_rx.await.map_err(|_| Error::NetworkClosed)
}
}
// A unidirectional link between two peers.
// Messages can be sent over the link with a given latency, jitter, and success rate.
struct Link {
sampler: Normal,
success_rate: f64,
// Messages with their receive time for ordered delivery
inbox: mpsc::UnboundedSender<(Channel, Bytes, SystemTime)>,
}
/// Buffered payload waiting for earlier messages on the same link to complete.
impl Link {
#[allow(clippy::too_many_arguments)]
fn new(
context: &mut E,
dialer: P,
receiver: P,
socket: SocketAddr,
sampler: Normal,
success_rate: f64,
max_size: u32,
received_messages: Family,
) -> Self {
// Spawn a task that will wait for messages to be sent to the link and then send them
// over the network.
let (inbox, mut outbox) = mpsc::unbounded::<(Channel, Bytes, SystemTime)>();
context.with_label("link").spawn(move |context| async move {
// Dial the peer and handshake by sending it the dialer's public key
let (mut sink, _) = context.dial(socket).await.unwrap();
if let Err(err) = send_frame(&mut sink, &dialer, max_size).await {
error!(?err, "failed to send public key to listener");
return;
}
// Process messages in order, waiting for their receive time
while let Some((channel, message, receive_complete_at)) = outbox.next().await {
// Wait until the message should arrive at receiver
context.sleep_until(receive_complete_at).await;
// Send the message
let mut data = bytes::BytesMut::with_capacity(Channel::SIZE + message.len());
data.extend_from_slice(&channel.to_be_bytes());
data.extend_from_slice(&message);
let data = data.freeze();
let _ = send_frame(&mut sink, &data, max_size).await;
// Bump received messages metric
received_messages
.get_or_create(&metrics::Message::new(&dialer, &receiver, channel))
.inc();
}
});
Self {
sampler,
success_rate,
inbox,
}
}
// Send a message over the link with receive timing.
fn send(
&mut self,
channel: Channel,
message: Bytes,
receive_complete_at: SystemTime,
) -> Result<(), Error> {
self.inbox
.unbounded_send((channel, message, receive_complete_at))
.map_err(|_| Error::NetworkClosed)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Manager, Receiver as _, Recipients, Sender as _};
use bytes::Bytes;
use commonware_cryptography::{ed25519, Signer as _};
use commonware_runtime::{deterministic, Quota, Runner as _};
use futures::FutureExt;
use std::num::NonZeroU32;
const MAX_MESSAGE_SIZE: u32 = 1024 * 1024;
/// Default rate limit set high enough to not interfere with normal operation
const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
#[test]
fn test_register_and_link() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: Some(3),
};
let network_context = context.with_label("network");
let (network, mut oracle) = Network::new(network_context.clone(), cfg);
network_context.spawn(|_| network.run());
// Create two public keys
let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
// Register the peer set
let mut manager = oracle.manager();
manager
.update(0, [pk1.clone(), pk2.clone()].try_into().unwrap())
.await;
let mut control = oracle.control(pk1.clone());
control.register(0, TEST_QUOTA).await.unwrap();
control.register(1, TEST_QUOTA).await.unwrap();
let mut control = oracle.control(pk2.clone());
control.register(0, TEST_QUOTA).await.unwrap();
control.register(1, TEST_QUOTA).await.unwrap();
// Overwrite if registering again
control.register(1, TEST_QUOTA).await.unwrap();
// Add link
let link = ingress::Link {
latency: Duration::from_millis(2),
jitter: Duration::from_millis(1),
success_rate: 0.9,
};
oracle
.add_link(pk1.clone(), pk2.clone(), link.clone())
.await
.unwrap();
// Expect error when adding link again
assert!(matches!(
oracle.add_link(pk1, pk2, link).await,
Err(Error::LinkExists)
));
});
}
#[test]
fn test_split_channel_single() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: Some(3),
};
let network_context = context.with_label("network");
let (network, mut oracle) = Network::new(network_context.clone(), cfg);
network_context.spawn(|_| network.run());
// Create a "twin" node that will be split, plus two normal peers
let twin = ed25519::PrivateKey::from_seed(20).public_key();
let peer_a = ed25519::PrivateKey::from_seed(21).public_key();
let peer_b = ed25519::PrivateKey::from_seed(22).public_key();
// Register all peers
let mut manager = oracle.manager();
manager
.update(
0,
[twin.clone(), peer_a.clone(), peer_b.clone()]
.try_into()
.unwrap(),
)
.await;
// Register normal peers
let (mut peer_a_sender, mut peer_a_recv) = oracle
.control(peer_a.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (mut peer_b_sender, mut peer_b_recv) = oracle
.control(peer_b.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
// Register and split the twin's channel:
// - Primary sends only to peer_a
// - Secondary sends only to peer_b
// - Messages from peer_a go to primary receiver
// - Messages from peer_b go to secondary receiver
let (twin_sender, twin_receiver) = oracle
.control(twin.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let peer_a_for_router = peer_a.clone();
let peer_b_for_router = peer_b.clone();
let (mut twin_primary_sender, mut twin_secondary_sender) =
twin_sender.split_with(move |origin, _, _| match origin {
SplitOrigin::Primary => Some(Recipients::One(peer_a_for_router.clone())),
SplitOrigin::Secondary => Some(Recipients::One(peer_b_for_router.clone())),
});
let peer_a_for_recv = peer_a.clone();
let peer_b_for_recv = peer_b.clone();
let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver.split_with(
context.with_label("split_receiver"),
move |(sender, _)| {
if sender == &peer_a_for_recv {
SplitTarget::Primary
} else if sender == &peer_b_for_recv {
SplitTarget::Secondary
} else {
panic!("unexpected sender");
}
},
);
// Establish bidirectional links
let link = ingress::Link {
latency: Duration::from_millis(0),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
oracle
.add_link(peer_a.clone(), twin.clone(), link.clone())
.await
.unwrap();
oracle
.add_link(twin.clone(), peer_a.clone(), link.clone())
.await
.unwrap();
oracle
.add_link(peer_b.clone(), twin.clone(), link.clone())
.await
.unwrap();
oracle
.add_link(twin.clone(), peer_b.clone(), link.clone())
.await
.unwrap();
// Send messages in both directions
let msg_a_to_twin = Bytes::from_static(b"from_a");
let msg_b_to_twin = Bytes::from_static(b"from_b");
let msg_primary_out = Bytes::from_static(b"primary_out");
let msg_secondary_out = Bytes::from_static(b"secondary_out");
peer_a_sender
.send(Recipients::One(twin.clone()), msg_a_to_twin.clone(), false)
.await
.unwrap();
peer_b_sender
.send(Recipients::One(twin.clone()), msg_b_to_twin.clone(), false)
.await
.unwrap();
twin_primary_sender
.send(Recipients::All, msg_primary_out.clone(), false)
.await
.unwrap();
twin_secondary_sender
.send(Recipients::All, msg_secondary_out.clone(), false)
.await
.unwrap();
// Verify routing: peer_a messages go to primary, peer_b to secondary
let (sender, payload) = twin_primary_recv.recv().await.unwrap();
assert_eq!(sender, peer_a);
assert_eq!(payload, msg_a_to_twin);
let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
assert_eq!(sender, peer_b);
assert_eq!(payload, msg_b_to_twin);
// Verify routing: primary sends to peer_a, secondary to peer_b
let (sender, payload) = peer_a_recv.recv().await.unwrap();
assert_eq!(sender, twin);
assert_eq!(payload, msg_primary_out);
let (sender, payload) = peer_b_recv.recv().await.unwrap();
assert_eq!(sender, twin);
assert_eq!(payload, msg_secondary_out);
});
}
#[test]
fn test_split_channel_both() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: Some(3),
};
let network_context = context.with_label("network");
let (network, mut oracle) = Network::new(network_context.clone(), cfg);
network_context.spawn(|_| network.run());
// Create a "twin" node that will be split, plus a third peer
let twin = ed25519::PrivateKey::from_seed(30).public_key();
let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
// Register all peers
let mut manager = oracle.manager();
manager
.update(0, [twin.clone(), peer_c.clone()].try_into().unwrap())
.await;
// Register normal peer
let (mut peer_c_sender, _peer_c_recv) = oracle
.control(peer_c.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
// Register and split the twin's channel with a router that sends to Both
let (twin_sender, twin_receiver) = oracle
.control(twin.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_twin_primary_sender, _twin_secondary_sender) =
twin_sender.split_with(|_origin, recipients, _| Some(recipients.clone()));
let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
.split_with(context.with_label("split_receiver_both"), |_| {
SplitTarget::Both
});
// Establish bidirectional links
let link = ingress::Link {
latency: Duration::from_millis(0),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
oracle
.add_link(peer_c.clone(), twin.clone(), link.clone())
.await
.unwrap();
oracle
.add_link(twin.clone(), peer_c.clone(), link)
.await
.unwrap();
// Send a message from peer_c to twin
let msg_both = Bytes::from_static(b"to_both");
peer_c_sender
.send(Recipients::One(twin.clone()), msg_both.clone(), false)
.await
.unwrap();
// Verify both receivers get the message
let (sender, payload) = twin_primary_recv.recv().await.unwrap();
assert_eq!(sender, peer_c);
assert_eq!(payload, msg_both);
let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
assert_eq!(sender, peer_c);
assert_eq!(payload, msg_both);
});
}
#[test]
fn test_split_channel_none() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: Some(3),
};
let network_context = context.with_label("network");
let (network, mut oracle) = Network::new(network_context.clone(), cfg);
network_context.spawn(|_| network.run());
// Create a "twin" node that will be split, plus a third peer
let twin = ed25519::PrivateKey::from_seed(30).public_key();
let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
// Register all peers
let mut manager = oracle.manager();
manager
.update(0, [twin.clone(), peer_c.clone()].try_into().unwrap())
.await;
// Register normal peer
let (mut peer_c_sender, _peer_c_recv) = oracle
.control(peer_c.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
// Register and split the twin's channel with a router that sends to Both
let (twin_sender, twin_receiver) = oracle
.control(twin.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (mut twin_primary_sender, mut twin_secondary_sender) =
twin_sender.split_with(|_origin, _, _| None);
let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
.split_with(context.with_label("split_receiver_both"), |_| {
SplitTarget::None
});
// Establish bidirectional links
let link = ingress::Link {
latency: Duration::from_millis(0),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
oracle
.add_link(peer_c.clone(), twin.clone(), link.clone())
.await
.unwrap();
oracle
.add_link(twin.clone(), peer_c.clone(), link)
.await
.unwrap();
// Send a message from peer_c to twin
let msg_both = Bytes::from_static(b"to_both");
let sent = peer_c_sender
.send(Recipients::One(twin.clone()), msg_both.clone(), false)
.await
.unwrap();
assert_eq!(sent.len(), 1);
assert_eq!(sent[0], twin);
// Verify both receivers get the message
context.sleep(Duration::from_millis(100)).await;
assert!(twin_primary_recv.recv().now_or_never().is_none());
assert!(twin_secondary_recv.recv().now_or_never().is_none());
// Send a message from twin to peer_c
let msg_both = Bytes::from_static(b"to_both");
let sent = twin_primary_sender
.send(Recipients::One(peer_c.clone()), msg_both.clone(), false)
.await
.unwrap();
assert_eq!(sent.len(), 0);
// Send a message from twin to peer_c
let msg_both = Bytes::from_static(b"to_both");
let sent = twin_secondary_sender
.send(Recipients::One(peer_c.clone()), msg_both.clone(), false)
.await
.unwrap();
assert_eq!(sent.len(), 0);
});
}
#[test]
fn test_unordered_peer_sets() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: Some(3),
};
let network_context = context.with_label("network");
let (network, oracle) = Network::new(network_context.clone(), cfg);
network_context.spawn(|_| network.run());
// Create two public keys
let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
// Subscribe to peer sets
let mut manager = oracle.manager();
let mut subscription = manager.subscribe().await;
// Register initial peer set
manager
.update(10, [pk1.clone(), pk2.clone()].try_into().unwrap())
.await;
let (id, new, all) = subscription.next().await.unwrap();
assert_eq!(id, 10);
assert_eq!(new.len(), 2);
assert_eq!(all.len(), 2);
// Register old peer sets (ignored)
let pk3 = ed25519::PrivateKey::from_seed(3).public_key();
manager.update(9, [pk3.clone()].try_into().unwrap()).await;
// Add new peer set
let pk4 = ed25519::PrivateKey::from_seed(4).public_key();
manager.update(11, [pk4.clone()].try_into().unwrap()).await;
let (id, new, all) = subscription.next().await.unwrap();
assert_eq!(id, 11);
assert_eq!(new, [pk4.clone()].try_into().unwrap());
assert_eq!(all, [pk1, pk2, pk4].try_into().unwrap());
});
}
#[test]
fn test_get_next_socket() {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: None,
};
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type PublicKey = ed25519::PublicKey;
let (mut network, _) =
Network::::new(context.clone(), cfg);
// Test that the next socket address is incremented correctly
let mut original = network.next_addr;
let next = network.get_next_socket();
assert_eq!(next, original);
let next = network.get_next_socket();
original.set_port(1);
assert_eq!(next, original);
// Test that the port number overflows correctly
let max_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 0, 255, 255)), 65535);
network.next_addr = max_addr;
let next = network.get_next_socket();
assert_eq!(next, max_addr);
let next = network.get_next_socket();
assert_eq!(
next,
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 1, 0, 0)), 0)
);
});
}
#[test]
fn test_fifo_burst_same_recipient() {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: Some(3),
};
let runner = deterministic::Runner::default();
runner.start(|context| async move {
let (network, mut oracle) = Network::new(context.with_label("network"), cfg);
let network_handle = network.start();
let sender_pk = ed25519::PrivateKey::from_seed(10).public_key();
let recipient_pk = ed25519::PrivateKey::from_seed(11).public_key();
let mut manager = oracle.manager();
manager
.update(
0,
[sender_pk.clone(), recipient_pk.clone()]
.try_into()
.unwrap(),
)
.await;
let (mut sender, _sender_recv) = oracle
.control(sender_pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_sender2, mut receiver) = oracle
.control(recipient_pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
oracle
.limit_bandwidth(sender_pk.clone(), Some(5_000), None)
.await
.unwrap();
oracle
.limit_bandwidth(recipient_pk.clone(), None, Some(5_000))
.await
.unwrap();
oracle
.add_link(
sender_pk.clone(),
recipient_pk.clone(),
ingress::Link {
latency: Duration::from_millis(0),
jitter: Duration::from_millis(0),
success_rate: 1.0,
},
)
.await
.unwrap();
const COUNT: usize = 50;
let mut expected = Vec::with_capacity(COUNT);
for i in 0..COUNT {
let msg = Bytes::from(vec![i as u8; 64]);
sender
.send(Recipients::One(recipient_pk.clone()), msg.clone(), false)
.await
.unwrap();
expected.push(msg);
}
for expected_msg in expected {
let (_pk, bytes) = receiver.recv().await.unwrap();
assert_eq!(bytes, expected_msg);
}
drop(oracle);
drop(sender);
network_handle.abort();
});
}
#[test]
fn test_broadcast_respects_transmit_latency() {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: Some(3),
};
let runner = deterministic::Runner::default();
runner.start(|context| async move {
let (network, mut oracle) = Network::new(context.with_label("network"), cfg);
let network_handle = network.start();
let sender_pk = ed25519::PrivateKey::from_seed(42).public_key();
let recipient_a = ed25519::PrivateKey::from_seed(43).public_key();
let recipient_b = ed25519::PrivateKey::from_seed(44).public_key();
let mut manager = oracle.manager();
manager
.update(
0,
[sender_pk.clone(), recipient_a.clone(), recipient_b.clone()]
.try_into()
.unwrap(),
)
.await;
let (mut sender, _recv_sender) = oracle
.control(sender_pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_sender2, mut recv_a) = oracle
.control(recipient_a.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_sender3, mut recv_b) = oracle
.control(recipient_b.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
oracle
.limit_bandwidth(sender_pk.clone(), Some(1_000), None)
.await
.unwrap();
oracle
.limit_bandwidth(recipient_a.clone(), None, Some(1_000))
.await
.unwrap();
oracle
.limit_bandwidth(recipient_b.clone(), None, Some(1_000))
.await
.unwrap();
let link = ingress::Link {
latency: Duration::from_millis(0),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
oracle
.add_link(sender_pk.clone(), recipient_a.clone(), link.clone())
.await
.unwrap();
oracle
.add_link(sender_pk.clone(), recipient_b.clone(), link)
.await
.unwrap();
let big_msg = Bytes::from(vec![7u8; 10_000]);
let start = context.current();
sender
.send(Recipients::All, big_msg.clone(), false)
.await
.unwrap();
let (_pk, received_a) = recv_a.recv().await.unwrap();
assert_eq!(received_a, big_msg);
let elapsed_a = context.current().duration_since(start).unwrap();
assert!(elapsed_a >= Duration::from_secs(20));
let (_pk, received_b) = recv_b.recv().await.unwrap();
assert_eq!(received_b, big_msg);
let elapsed_b = context.current().duration_since(start).unwrap();
assert!(elapsed_b >= Duration::from_secs(20));
// Because bandwidth is shared, the two messages should take about the same time
assert!(elapsed_a.abs_diff(elapsed_b) <= Duration::from_secs(1));
drop(oracle);
drop(sender);
network_handle.abort();
});
}
}