//! Implementation of a simulated p2p network.
use super::{
bandwidth,
ingress::{self, Oracle},
metrics, Error,
};
use crate::{Channel, Message, Recipients};
use bytes::Bytes;
use commonware_codec::{DecodeExt, FixedSize};
use commonware_cryptography::PublicKey;
use commonware_macros::select;
use commonware_runtime::{Clock, Handle, Listener as _, Metrics, Network as RNetwork, Spawner};
use commonware_stream::utils::codec::{recv_frame, send_frame};
use futures::{
channel::{mpsc, oneshot},
SinkExt, StreamExt,
};
use prometheus_client::metrics::{counter::Counter, family::Family};
use rand::Rng;
use rand_distr::{Distribution, Normal};
use std::{
collections::{BTreeMap, HashMap, HashSet},
net::{IpAddr, Ipv4Addr, SocketAddr},
time::{Duration, SystemTime},
};
use tracing::{error, trace};
/// Task type representing a message to be sent within the network.
type Task
= (Channel, P, Recipients
, Bytes, oneshot::Sender>);
/// Configuration for the simulated network.
pub struct Config {
/// Maximum size of a message that can be sent over the network.
pub max_size: usize,
}
/// Implementation of a simulated network.
pub struct Network {
context: E,
// Maximum size of a message that can be sent over the network
max_size: usize,
// Next socket address to assign to a new peer
// Incremented for each new peer
next_addr: SocketAddr,
// Channel to receive messages from the oracle
ingress: mpsc::UnboundedReceiver>,
// A channel to receive tasks from peers
// The sender is cloned and given to each peer
// The receiver is polled in the main loop
sender: mpsc::UnboundedSender>,
receiver: mpsc::UnboundedReceiver>,
// A map from a pair of public keys (from, to) to a link between the two peers
links: HashMap<(P, P), Link>,
// A map from a public key to a peer
peers: BTreeMap>,
// A map of peers blocking each other
blocks: HashSet<(P, P)>,
// Metrics for received and sent messages
received_messages: Family,
sent_messages: Family,
}
impl Network {
/// Create a new simulated network with a given runtime and configuration.
///
/// Returns a tuple containing the network instance and the oracle that can
/// be used to modify the state of the network during context.
pub fn new(context: E, cfg: Config) -> (Self, Oracle) {
let (sender, receiver) = mpsc::unbounded();
let (oracle_sender, oracle_receiver) = mpsc::unbounded();
let sent_messages = Family::::default();
let received_messages = Family::::default();
context.register("messages_sent", "messages sent", sent_messages.clone());
context.register(
"messages_received",
"messages received",
received_messages.clone(),
);
// Start with a pseudo-random IP address to assign sockets to for new peers
let next_addr = SocketAddr::new(
IpAddr::V4(Ipv4Addr::from_bits(context.clone().next_u32())),
0,
);
(
Self {
context,
max_size: cfg.max_size,
next_addr,
ingress: oracle_receiver,
sender,
receiver,
links: HashMap::new(),
peers: BTreeMap::new(),
blocks: HashSet::new(),
received_messages,
sent_messages,
},
Oracle::new(oracle_sender.clone()),
)
}
/// Returns (and increments) the next available socket address.
///
/// The port number is incremented for each call, and the IP address is incremented if the port
/// number overflows.
fn get_next_socket(&mut self) -> SocketAddr {
let result = self.next_addr;
// Increment the port number, or the IP address if the port number overflows.
// Allows the ip address to overflow (wrapping).
match self.next_addr.port().checked_add(1) {
Some(port) => {
self.next_addr.set_port(port);
}
None => {
let ip = match self.next_addr.ip() {
IpAddr::V4(ipv4) => ipv4,
_ => unreachable!(),
};
let next_ip = Ipv4Addr::to_bits(ip).wrapping_add(1);
self.next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(next_ip)), 0);
}
}
result
}
/// Handle an ingress message.
///
/// This method is called when a message is received from the oracle.
async fn handle_ingress(&mut self, message: ingress::Message) {
// It is important to ensure that no failed receipt of a message will cause us to exit.
// This could happen if the caller drops the `Oracle` after updating the network topology.
// Thus, we create a helper function to send the result to the oracle and log any errors.
fn send_result(
result: oneshot::Sender>,
value: Result,
) {
let success = value.is_ok();
if let Err(e) = result.send(value) {
error!(?e, "failed to send result to oracle (ok = {})", success);
}
}
match message {
ingress::Message::Register {
public_key,
channel,
result,
} => {
// If peer does not exist, then create it.
if !self.peers.contains_key(&public_key) {
let peer = Peer::new(
&mut self.context.clone(),
public_key.clone(),
self.get_next_socket(),
usize::MAX,
usize::MAX,
self.max_size,
);
self.peers.insert(public_key.clone(), peer);
}
// Create a receiver that allows receiving messages from the network for a certain channel
let peer = self.peers.get_mut(&public_key).unwrap();
let receiver = match peer.register(channel).await {
Ok(receiver) => Receiver { receiver },
Err(err) => return send_result(result, Err(err)),
};
// Create a sender that allows sending messages to the network for a certain channel
let sender = Sender::new(
self.context.clone(),
public_key,
channel,
self.max_size,
self.sender.clone(),
);
send_result(result, Ok((sender, receiver)))
}
ingress::Message::SetBandwidth {
public_key,
egress_bps,
ingress_bps,
result,
} => match self.peers.get_mut(&public_key) {
Some(peer) => {
peer.set_bandwidth(egress_bps, ingress_bps);
send_result(result, Ok(()));
}
None => send_result(result, Err(Error::PeerMissing)),
},
ingress::Message::AddLink {
sender,
receiver,
sampler,
success_rate,
result,
} => {
// Require both peers to be registered
if !self.peers.contains_key(&sender) {
return send_result(result, Err(Error::PeerMissing));
}
let peer = match self.peers.get(&receiver) {
Some(peer) => peer,
None => return send_result(result, Err(Error::PeerMissing)),
};
// Require link to not already exist
let key = (sender.clone(), receiver.clone());
if self.links.contains_key(&key) {
return send_result(result, Err(Error::LinkExists));
}
let link = Link::new(
&mut self.context,
sender,
receiver,
peer.socket,
sampler,
success_rate,
self.max_size,
self.received_messages.clone(),
);
self.links.insert(key, link);
send_result(result, Ok(()))
}
ingress::Message::RemoveLink {
sender,
receiver,
result,
} => {
match self.links.remove(&(sender, receiver)) {
Some(_) => (),
None => return send_result(result, Err(Error::LinkMissing)),
}
send_result(result, Ok(()))
}
ingress::Message::Block { from, to } => {
self.blocks.insert((from, to));
}
ingress::Message::Blocked { result } => {
send_result(result, Ok(self.blocks.iter().cloned().collect()))
}
}
}
}
impl Network {
/// Schedule a transmission respecting bandwidth limits on both sender and receiver.
fn schedule_transmission(
&mut self,
sender: &P,
receiver: &P,
data_size: usize,
now: SystemTime,
should_deliver: bool,
) -> SystemTime {
// Prune and get used bandwidth for sender and receiver
let sender_used = {
let sender_peer = self.peers.get_mut(sender).expect("sender not found");
sender_peer.egress.prune_and_get_usage(now)
};
let receiver_used = if should_deliver && sender != receiver {
let receiver_peer = self.peers.get_mut(receiver).expect("receiver not found");
Some(receiver_peer.ingress.prune_and_get_usage(now))
} else {
None
};
let sender_schedule = {
let sender = self.peers.get(sender).expect("sender not found");
(&sender.egress, sender_used)
};
let receiver_schedule = if let Some(used) = receiver_used {
let receiver_peer = self.peers.get(receiver).expect("receiver not found");
Some((&receiver_peer.ingress, used))
} else {
None
};
// Now calculate reservations
let (reservations, completion_time) =
bandwidth::calculate_reservations(data_size, now, sender_schedule, receiver_schedule);
// Apply reservations to sender
if !reservations.is_empty() {
let sender_peer = self.peers.get_mut(sender).expect("sender not found");
for reservation in &reservations {
sender_peer.egress.add_reservation(
reservation.start,
reservation.end,
reservation.bandwidth,
);
}
// Apply to receiver if delivering
if receiver_used.is_some() {
let receiver_peer = self.peers.get_mut(receiver).expect("receiver not found");
for reservation in &reservations {
receiver_peer.ingress.add_reservation(
reservation.start,
reservation.end,
reservation.bandwidth,
);
}
}
}
completion_time
}
/// Handle a task.
///
/// This method is called when a task is received from the sender, which can come from
/// any peer in the network.
fn handle_task(&mut self, task: Task) {
// Collect recipients
let (channel, origin, recipients, message, reply) = task;
let recipients = match recipients {
Recipients::All => self.peers.keys().cloned().collect(),
Recipients::Some(keys) => keys,
Recipients::One(key) => vec![key],
};
// Send to all recipients
let mut sent = Vec::new();
let (acquired_sender, mut acquired_receiver) = mpsc::channel(recipients.len());
for recipient in recipients {
// Skip self
if recipient == origin {
trace!(?recipient, reason = "self", "dropping message",);
continue;
}
// Determine if the sender or recipient has blocked the other
let o_r = (origin.clone(), recipient.clone());
let r_o = (recipient.clone(), origin.clone());
if self.blocks.contains(&o_r) || self.blocks.contains(&r_o) {
trace!(?origin, ?recipient, reason = "blocked", "dropping message");
continue;
}
// Determine if there is a link between the sender and recipient
let link = match self.links.get(&o_r) {
Some(link) => link,
None => {
trace!(?origin, ?recipient, reason = "no link", "dropping message",);
continue;
}
};
// Record sent message as soon as we determine there is a link with recipient (approximates
// having an open connection)
self.sent_messages
.get_or_create(&metrics::Message::new(&origin, &recipient, channel))
.inc();
// Check bandwidth constraints and determine if the message should be delivered
let (sender_has_bandwidth, should_deliver) = {
let sender_peer = self.peers.get(&origin).expect("sender must exist");
let receiver_peer = self.peers.get(&recipient).expect("receiver must exist");
let sender_has_bandwidth = sender_peer.egress.bps > 0;
let receiver_has_bandwidth = receiver_peer.ingress.bps > 0;
let should_deliver = self.context.gen_bool(link.success_rate);
(
sender_has_bandwidth,
// If the receiver has no bandwidth then we treat it as if the message
// is never delivered. Still consume sender-side bandwidth.
should_deliver && receiver_has_bandwidth,
)
};
if !sender_has_bandwidth {
// Sender has no bandwidth, skip this recipient
trace!(
?origin,
?recipient,
"sender has zero bandwidth, skipping recipient"
);
continue;
}
// Sample latency and get current time
let latency = Duration::from_millis(link.sampler.sample(&mut self.context) as u64);
let now = self.context.current();
// Schedule the transmission
let transmission_complete_at =
self.schedule_transmission(&origin, &recipient, message.len(), now, should_deliver);
// If the message should be delivered, queue it immediately on the
// link to preserve ordering
if should_deliver {
let link = self.links.get_mut(&o_r).unwrap();
// The final arrival time includes the per-message latency
let receive_complete_at = transmission_complete_at + latency;
if let Err(err) = link.send(channel, message.clone(), receive_complete_at) {
// This can only happen if the receiver exited.
error!(?origin, ?recipient, ?err, "failed to send");
continue;
}
}
let transmission_duration = transmission_complete_at
.duration_since(now)
.unwrap_or(Duration::ZERO);
trace!(
?origin,
?recipient,
transmission_duration_ms = transmission_duration.as_millis(),
latency_ms = latency.as_millis(),
delivered = should_deliver,
"sending message",
);
// Spawn task to handle sender timing
self.context.with_label("sender-timing").spawn({
let recipient = recipient.clone();
let mut acquired_sender = acquired_sender.clone();
move |context| async move {
// Wait for transmission to complete
context.sleep_until(transmission_complete_at).await;
// Mark as sent once transmission completes
acquired_sender.send(()).await.unwrap();
if !should_deliver {
trace!(
?recipient,
reason = "random link failure",
"dropping message",
);
}
}
});
sent.push(recipient);
}
// Notify sender of successful sends
self.context
.clone()
.with_label("notifier")
.spawn(|_| async move {
// Wait for semaphore to be acquired on all sends
for _ in 0..sent.len() {
acquired_receiver.next().await.unwrap();
}
// Notify sender of successful sends
if let Err(err) = reply.send(sent) {
// This can only happen if the sender exited.
error!(?err, "failed to send ack");
}
});
}
/// Run the simulated network.
///
/// It is not necessary to invoke this method before modifying the network topology, however,
/// no messages will be sent until this method is called.
pub fn start(mut self) -> Handle<()> {
self.context.spawn_ref()(self.run())
}
async fn run(mut self) {
loop {
select! {
message = self.ingress.next() => {
// If ingress is closed, exit
let message = match message {
Some(message) => message,
None => break,
};
self.handle_ingress(message).await;
},
task = self.receiver.next() => {
// If receiver is closed, exit
let task = match task {
Some(task) => task,
None => break,
};
self.handle_task(task);
}
}
}
}
}
/// Implementation of a [crate::Sender] for the simulated network.
#[derive(Clone, Debug)]
pub struct Sender {
me: P,
channel: Channel,
max_size: usize,
high: mpsc::UnboundedSender>,
low: mpsc::UnboundedSender>,
}
impl Sender {
fn new(
context: impl Spawner + Metrics,
me: P,
channel: Channel,
max_size: usize,
mut sender: mpsc::UnboundedSender>,
) -> Self {
// Listen for messages
let (high, mut high_receiver) = mpsc::unbounded();
let (low, mut low_receiver) = mpsc::unbounded();
context.with_label("sender").spawn(move |_| async move {
loop {
// Wait for task
let task;
select! {
high_task = high_receiver.next() => {
task = match high_task {
Some(task) => task,
None => break,
};
},
low_task = low_receiver.next() => {
task = match low_task {
Some(task) => task,
None => break,
};
}
}
// Send task
if let Err(err) = sender.send(task).await {
error!(?err, channel, "failed to send task");
}
}
});
// Return sender
Self {
me,
channel,
max_size,
high,
low,
}
}
}
impl crate::Sender for Sender {
type Error = Error;
type PublicKey = P;
async fn send(
&mut self,
recipients: Recipients
,
message: Bytes,
priority: bool,
) -> Result, Error> {
// Check message size
if message.len() > self.max_size {
return Err(Error::MessageTooLarge(message.len()));
}
// Send message
let (sender, receiver) = oneshot::channel();
let mut channel = if priority { &self.high } else { &self.low };
channel
.send((self.channel, self.me.clone(), recipients, message, sender))
.await
.map_err(|_| Error::NetworkClosed)?;
receiver.await.map_err(|_| Error::NetworkClosed)
}
}
type MessageReceiver = mpsc::UnboundedReceiver>;
type MessageReceiverResult = Result, Error>;
/// Implementation of a [crate::Receiver] for the simulated network.
#[derive(Debug)]
pub struct Receiver {
receiver: MessageReceiver,
}
impl crate::Receiver for Receiver {
type Error = Error;
type PublicKey = P;
async fn recv(&mut self) -> Result, Error> {
self.receiver.next().await.ok_or(Error::NetworkClosed)
}
}
/// A peer in the simulated network.
///
/// The peer can register channels, which allows it to receive messages sent to the channel from other peers.
struct Peer {
// Socket address that the peer is listening on
socket: SocketAddr,
// Control to register new channels
control: mpsc::UnboundedSender<(Channel, oneshot::Sender>)>,
// Bandwidth schedules for egress and ingress
egress: bandwidth::Schedule,
ingress: bandwidth::Schedule,
}
impl Peer {
/// Create and return a new peer.
///
/// The peer will listen for incoming connections on the given `socket` address.
/// `max_size` is the maximum size of a message that can be sent to the peer.
fn new(
context: &mut E,
public_key: P,
socket: SocketAddr,
egress_bps: usize,
ingress_bps: usize,
max_size: usize,
) -> Self {
// The control is used to register channels.
// There is exactly one mailbox created for each channel that the peer is registered for.
let (control_sender, mut control_receiver) = mpsc::unbounded();
// Whenever a message is received from a peer, it is placed in the inbox.
// The router polls the inbox and forwards the message to the appropriate mailbox.
let (inbox_sender, mut inbox_receiver) = mpsc::unbounded();
// Spawn router
context.with_label("router").spawn(|_| async move {
// Map of channels to mailboxes (senders to particular channels)
let mut mailboxes = HashMap::new();
// Continually listen for control messages and outbound messages
loop {
select! {
// Listen for control messages, which are used to register channels
control = control_receiver.next() => {
// If control is closed, exit
let (channel, result): (Channel, oneshot::Sender>) = match control {
Some(control) => control,
None => break,
};
// Check if channel is registered
if mailboxes.contains_key(&channel) {
result.send(Err(Error::ChannelAlreadyRegistered(channel))).unwrap();
continue;
}
// Register channel
let (sender, receiver) = mpsc::unbounded();
mailboxes.insert(channel, sender);
result.send(Ok(receiver)).unwrap();
},
// Listen for messages from the inbox, which are forwarded to the appropriate mailbox
inbox = inbox_receiver.next() => {
// If inbox is closed, exit
let (channel, message) = match inbox {
Some(message) => message,
None => break,
};
// Send message to mailbox
match mailboxes.get_mut(&channel) {
Some(mailbox) => {
if let Err(err) = mailbox.send(message).await {
error!(?err, "failed to send message to mailbox");
}
}
None => {
trace!(
recipient = ?public_key,
channel,
reason = "missing channel",
"dropping message",
);
}
}
},
}
}
});
// Spawn a task that accepts new connections and spawns a task for each connection
context.with_label("listener").spawn({
let inbox_sender = inbox_sender.clone();
move |context| async move {
// Initialize listener
let mut listener = context.bind(socket).await.unwrap();
// Continually accept new connections
while let Ok((_, _, mut stream)) = listener.accept().await {
// New connection accepted. Spawn a task for this connection
context.with_label("receiver").spawn({
let mut inbox_sender = inbox_sender.clone();
move |_| async move {
// Receive dialer's public key as a handshake
let dialer = match recv_frame(&mut stream, max_size).await {
Ok(data) => data,
Err(_) => {
error!("failed to receive public key from dialer");
return;
}
};
let Ok(dialer) = P::decode(dialer.as_ref()) else {
error!("received public key is invalid");
return;
};
// Continually receive messages from the dialer and send them to the inbox
while let Ok(data) = recv_frame(&mut stream, max_size).await {
let channel = Channel::from_be_bytes(
data[..Channel::SIZE].try_into().unwrap(),
);
let message = data.slice(Channel::SIZE..);
if let Err(err) = inbox_sender
.send((channel, (dialer.clone(), message)))
.await
{
error!(?err, "failed to send message to mailbox");
break;
}
}
}
});
}
}
});
// Return peer
Self {
socket,
control: control_sender,
egress: bandwidth::Schedule::new(egress_bps),
ingress: bandwidth::Schedule::new(ingress_bps),
}
}
/// Register a channel with the peer.
///
/// This allows the peer to receive messages sent to the channel.
/// Returns a receiver that can be used to receive messages sent to the channel.
async fn register(&mut self, channel: Channel) -> MessageReceiverResult {
let (sender, receiver) = oneshot::channel();
self.control
.send((channel, sender))
.await
.map_err(|_| Error::NetworkClosed)?;
receiver.await.map_err(|_| Error::NetworkClosed)?
}
/// Set bandwidth limits for the peer.
///
/// Bandwidth is specified for the peer's egress (upload) and ingress
/// (download) rates in bytes per second. Use `usize::MAX` for effectively
/// unlimited bandwidth.
fn set_bandwidth(&mut self, egress_bps: usize, ingress_bps: usize) {
self.egress.bps = egress_bps;
self.ingress.bps = ingress_bps;
}
}
// A unidirectional link between two peers.
// Messages can be sent over the link with a given latency, jitter, and success rate.
#[derive(Clone)]
struct Link {
sampler: Normal,
success_rate: f64,
// Messages with their receive time for ordered delivery
inbox: mpsc::UnboundedSender<(Channel, Bytes, SystemTime)>,
}
impl Link {
#[allow(clippy::too_many_arguments)]
fn new(
context: &mut E,
dialer: P,
receiver: P,
socket: SocketAddr,
sampler: Normal,
success_rate: f64,
max_size: usize,
received_messages: Family,
) -> Self {
let (inbox, mut outbox) = mpsc::unbounded();
let result = Self {
sampler,
success_rate,
inbox,
};
// Spawn a task that will wait for messages to be sent to the link and then send them
// over the network.
context
.clone()
.with_label("link")
.spawn(move |context| async move {
// Dial the peer and handshake by sending it the dialer's public key
let (mut sink, _) = context.dial(socket).await.unwrap();
if let Err(err) = send_frame(&mut sink, &dialer, max_size).await {
error!(?err, "failed to send public key to listener");
return;
}
// Process messages in order, waiting for their receive time
while let Some((channel, message, receive_complete_at)) = outbox.next().await {
// Wait until the message should arrive at receiver
context.sleep_until(receive_complete_at).await;
// Send the message
let mut data = bytes::BytesMut::with_capacity(Channel::SIZE + message.len());
data.extend_from_slice(&channel.to_be_bytes());
data.extend_from_slice(&message);
let data = data.freeze();
send_frame(&mut sink, &data, max_size).await.unwrap();
// Bump received messages metric
received_messages
.get_or_create(&metrics::Message::new(&dialer, &receiver, channel))
.inc();
}
});
result
}
// Send a message over the link with receive timing.
fn send(
&mut self,
channel: Channel,
message: Bytes,
receive_complete_at: SystemTime,
) -> Result<(), Error> {
self.inbox
.unbounded_send((channel, message, receive_complete_at))
.map_err(|_| Error::NetworkClosed)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use commonware_cryptography::{ed25519, PrivateKeyExt as _, Signer as _};
use commonware_runtime::{deterministic, Runner};
const MAX_MESSAGE_SIZE: usize = 1024 * 1024;
#[test]
fn test_register_and_link() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
};
let network_context = context.with_label("network");
let (network, mut oracle) = Network::new(network_context.clone(), cfg);
network_context.spawn(|_| network.run());
// Create two public keys
let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
// Register
oracle.register(pk1.clone(), 0).await.unwrap();
oracle.register(pk1.clone(), 1).await.unwrap();
oracle.register(pk2.clone(), 0).await.unwrap();
oracle.register(pk2.clone(), 1).await.unwrap();
// Expect error when registering again
assert!(matches!(
oracle.register(pk1.clone(), 1).await,
Err(Error::ChannelAlreadyRegistered(_))
));
// Add link
let link = ingress::Link {
latency: Duration::from_millis(2),
jitter: Duration::from_millis(1),
success_rate: 0.9,
};
oracle
.add_link(pk1.clone(), pk2.clone(), link.clone())
.await
.unwrap();
// Expect error when adding link again
assert!(matches!(
oracle.add_link(pk1, pk2, link).await,
Err(Error::LinkExists)
));
});
}
#[test]
fn test_get_next_socket() {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
};
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type PublicKey = ed25519::PublicKey;
let (mut network, _) =
Network::::new(context.clone(), cfg);
// Test that the next socket address is incremented correctly
let mut original = network.next_addr;
let next = network.get_next_socket();
assert_eq!(next, original);
let next = network.get_next_socket();
original.set_port(1);
assert_eq!(next, original);
// Test that the port number overflows correctly
let max_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 0, 255, 255)), 65535);
network.next_addr = max_addr;
let next = network.get_next_socket();
assert_eq!(next, max_addr);
let next = network.get_next_socket();
assert_eq!(
next,
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 1, 0, 0)), 0)
);
});
}
}