use super::Error; use crate::{ authenticated::lookup::actors::router::{self, Messenger}, utils::limited::{CheckedSender, LimitedSender}, Channel, Message, Recipients, }; use bytes::Bytes; use commonware_cryptography::PublicKey; use commonware_runtime::{Clock, Quota}; use futures::{channel::mpsc, StreamExt}; use std::{collections::BTreeMap, fmt::Debug, time::SystemTime}; /// An interior sender that enforces message size limits and /// supports sending arbitrary bytes to a set of recipients over /// a pre-defined [`Channel`]. #[derive(Debug, Clone)] pub struct UnlimitedSender { channel: Channel, max_size: u32, messenger: Messenger

, } impl crate::UnlimitedSender for UnlimitedSender

{ type Error = Error; type PublicKey = P; async fn send( &mut self, recipients: Recipients, message: Bytes, priority: bool, ) -> Result, Self::Error> { if message.len() > self.max_size as usize { return Err(Error::MessageTooLarge(message.len())); } Ok(self .messenger .content(recipients, self.channel, message, priority) .await) } } /// Sender is the mechanism used to send arbitrary bytes to a set of recipients over a pre-defined channel. #[derive(Clone)] pub struct Sender { limited_sender: LimitedSender, Messenger

>, } impl Sender { pub(super) fn new( channel: Channel, max_size: u32, messenger: Messenger

, clock: C, quota: Quota, ) -> Self { let master_sender = UnlimitedSender { channel, max_size, messenger: messenger.clone(), }; let limited_sender = LimitedSender::new(master_sender, quota, clock, messenger); Self { limited_sender } } } impl crate::LimitedSender for Sender where P: PublicKey, C: Clock + Clone + Send + 'static, { type PublicKey = P; type Checked<'a> = CheckedSender<'a, UnlimitedSender

> where Self: 'a; async fn check( &mut self, recipients: Recipients, ) -> Result, SystemTime> { self.limited_sender.check(recipients).await } } /// Channel to asynchronously receive messages from a channel. #[derive(Debug)] pub struct Receiver { receiver: mpsc::Receiver>, } impl Receiver

{ pub(super) const fn new(receiver: mpsc::Receiver>) -> Self { Self { receiver } } } impl crate::Receiver for Receiver

{ type Error = Error; type PublicKey = P; /// Receives a message from the channel. /// /// This method will block until a message is received or the underlying /// network shuts down. async fn recv(&mut self) -> Result, Error> { let (sender, message) = self.receiver.next().await.ok_or(Error::NetworkClosed)?; // We don't check that the message is too large here because we already enforce // that on the network layer. Ok((sender, message)) } } #[derive(Clone)] pub struct Channels { messenger: router::Messenger

, max_size: u32, receivers: BTreeMap>)>, } impl Channels

{ pub const fn new(messenger: router::Messenger

, max_size: u32) -> Self { Self { messenger, max_size, receivers: BTreeMap::new(), } } pub fn register( &mut self, channel: Channel, rate: Quota, backlog: usize, clock: C, ) -> (Sender, Receiver

) { let (sender, receiver) = mpsc::channel(backlog); if self.receivers.insert(channel, (rate, sender)).is_some() { panic!("duplicate channel registration: {channel}"); } ( Sender::new(channel, self.max_size, self.messenger.clone(), clock, rate), Receiver::new(receiver), ) } pub fn collect(self) -> BTreeMap>)> { self.receivers } }