use super::Error; use crate::{authenticated::lookup::actors::router, Channel, Message, Recipients}; use bytes::Bytes; use commonware_cryptography::PublicKey; use futures::{channel::mpsc, StreamExt}; use governor::Quota; use std::collections::BTreeMap; /// Sender is the mechanism used to send arbitrary bytes to /// a set of recipients over a pre-defined channel. #[derive(Clone, Debug)] pub struct Sender { channel: Channel, max_size: usize, messenger: router::Messenger

, } impl Sender

{ pub(super) fn new(channel: Channel, max_size: usize, messenger: router::Messenger

) -> Self { Self { channel, max_size, messenger, } } } impl crate::Sender for Sender

{ type Error = Error; type PublicKey = P; /// Sends a message to a set of recipients. /// /// # Offline Recipients /// /// If a recipient is offline at the time a message is sent, the message will be dropped. /// It is up to the application to handle retries (if necessary). /// /// # Parameters /// /// * `recipients` - The set of recipients to send the message to. /// * `message` - The message to send. /// * `priority` - Whether the message should be sent with priority (across /// all channels). /// /// # Returns /// /// A vector of recipients that the message was sent to, or an error if the message is too large. /// /// Note: a successful send does not guarantee that the recipient will receive the message. async fn send( &mut self, recipients: Recipients, message: Bytes, priority: bool, ) -> Result, Error> { // Ensure message isn't too large let message_len = message.len(); if message_len > self.max_size { return Err(Error::MessageTooLarge(message_len)); } // Wait for messenger to let us know who we sent to Ok(self .messenger .content(recipients, self.channel, message, priority) .await) } } /// Channel to asynchronously receive messages from a channel. #[derive(Debug)] pub struct Receiver { receiver: mpsc::Receiver>, } impl Receiver

{ pub(super) fn new(receiver: mpsc::Receiver>) -> Self { Self { receiver } } } impl crate::Receiver for Receiver

{ type Error = Error; type PublicKey = P; /// Receives a message from the channel. /// /// This method will block until a message is received or the underlying /// network shuts down. async fn recv(&mut self) -> Result, Error> { let (sender, message) = self.receiver.next().await.ok_or(Error::NetworkClosed)?; // We don't check that the message is too large here because we already enforce // that on the network layer. Ok((sender, message)) } } #[derive(Clone)] pub struct Channels { messenger: router::Messenger

, max_size: usize, receivers: BTreeMap>)>, } impl Channels

{ pub fn new(messenger: router::Messenger

, max_size: usize) -> Self { Self { messenger, max_size, receivers: BTreeMap::new(), } } pub fn register( &mut self, channel: Channel, rate: governor::Quota, backlog: usize, ) -> (Sender

, Receiver

) { let (sender, receiver) = mpsc::channel(backlog); if self.receivers.insert(channel, (rate, sender)).is_some() { panic!("duplicate channel registration: {channel}"); } ( Sender::new(channel, self.max_size, self.messenger.clone()), Receiver::new(receiver), ) } pub fn collect(self) -> BTreeMap>)> { self.receivers } }