use super::Error; use crate::{ authenticated::lookup::actors::router::{self, Messenger}, utils::limited::{CheckedSender, LimitedSender}, Channel, Message as NetworkMessage, Recipients, }; use commonware_actor::{ mailbox::{self, UnreliablePolicy}, Feedback, Unreliable, }; use commonware_cryptography::PublicKey; use commonware_runtime::{Clock, IoBufs, Metrics, Quota}; use std::{ collections::{BTreeMap, VecDeque}, fmt::Debug, num::NonZeroUsize, time::SystemTime, }; pub(crate) struct Inbound(pub(crate) NetworkMessage

); impl UnreliablePolicy for Inbound

{ type Overflow = VecDeque; fn handle(_overflow: &mut Self::Overflow, _message: Self) -> bool { false } } /// An interior sender that enforces message size limits and /// supports sending arbitrary bytes to a set of recipients over /// a pre-defined [`Channel`]. #[derive(Debug, Clone)] pub struct UnlimitedSender { channel: Channel, max_size: u32, messenger: Messenger

, } impl crate::UnlimitedSender for UnlimitedSender

{ type PublicKey = P; fn send( &mut self, recipients: Recipients, message: impl Into + Send, priority: bool, ) -> Unreliable { let message = message.into(); assert!( message.len() <= self.max_size as usize, "message too large: {} > {}", message.len(), self.max_size ); self.messenger .content(recipients, self.channel, message, priority) } } /// Sender is the mechanism used to send arbitrary bytes to a set of recipients over a pre-defined channel. pub struct Sender { limited_sender: LimitedSender, Messenger

>, } impl Clone for Sender { fn clone(&self) -> Self { Self { limited_sender: self.limited_sender.clone(), } } } impl Sender { pub(super) fn new( channel: Channel, max_size: u32, messenger: Messenger

, clock: C, quota: Quota, ) -> Self { let master_sender = UnlimitedSender { channel, max_size, messenger: messenger.clone(), }; let limited_sender = LimitedSender::new(master_sender, quota, clock, messenger); Self { limited_sender } } } impl crate::LimitedSender for Sender where P: PublicKey, C: Clock + Send + 'static, { type PublicKey = P; type Checked<'a> = CheckedSender<'a, UnlimitedSender

> where Self: 'a; fn check( &mut self, recipients: Recipients, ) -> Result, SystemTime> { self.limited_sender.check(recipients) } } /// Channel to asynchronously receive messages from a channel. pub struct Receiver { receiver: mailbox::UnreliableReceiver>, } impl Debug for Receiver

{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Receiver").finish_non_exhaustive() } } impl Receiver

{ pub(super) const fn new(receiver: mailbox::UnreliableReceiver>) -> Self { Self { receiver } } } impl crate::Receiver for Receiver

{ type Error = Error; type PublicKey = P; /// Receives a message from the channel. /// /// This method will block until a message is received or the underlying /// network shuts down. async fn recv(&mut self) -> Result, Error> { let Inbound((sender, message)) = self.receiver.recv().await.ok_or(Error::NetworkClosed)?; // We don't check that the message is too large here because we already enforce // that on the network layer. Ok((sender, message)) } } #[derive(Clone, Debug)] pub struct Channels { messenger: router::Messenger

, max_size: u32, receivers: BTreeMap>)>, } impl Channels

{ pub const fn new(messenger: router::Messenger

, max_size: u32) -> Self { Self { messenger, max_size, receivers: BTreeMap::new(), } } pub fn register( &mut self, channel: Channel, rate: Quota, backlog: usize, context: C, ) -> (Sender, Receiver

) { let backlog = NonZeroUsize::new(backlog).expect("message backlog must be non-zero"); let (sender, receiver) = mailbox::new_unreliable(context.child("mailbox"), backlog); if self.receivers.insert(channel, (rate, sender)).is_some() { panic!("duplicate channel registration: {channel}"); } ( Sender::new( channel, self.max_size, self.messenger.clone(), context, rate, ), Receiver::new(receiver), ) } pub fn collect(self) -> BTreeMap>)> { self.receivers } }