use crate::{ authenticated::{data::Data, discovery::channels::Channels, relay::Relay, Mailbox}, utils::limited::Connected, Channel, Recipients, }; use bytes::{Buf, Bytes}; use commonware_cryptography::PublicKey; use commonware_utils::{ channels::{fallible::AsyncFallibleExt, ring}, NZUsize, }; use futures::channel::oneshot; /// Messages that can be processed by the router. #[derive(Debug)] pub enum Message { /// Notify the router that a peer is ready to communicate. Ready { peer: P, relay: Relay, channels: oneshot::Sender>, }, /// Notify the router that a peer is no longer available. Release { peer: P }, /// Send a message to one or more recipients. Content { recipients: Recipients

, channel: Channel, message: Bytes, priority: bool, success: oneshot::Sender>, }, /// Get a subscription to peers known by the router. SubscribePeers { response: oneshot::Sender>>, }, } impl Mailbox> { /// Notify the router that a peer is ready to communicate. /// /// Returns `None` if the router has shut down. pub async fn ready(&mut self, peer: P, relay: Relay) -> Option> { self.0 .request(|channels| Message::Ready { peer, relay, channels, }) .await } /// Notify the router that a peer is no longer available. /// /// This may fail during shutdown if the router has already exited, /// which is harmless since the router no longer tracks any peers. pub async fn release(&mut self, peer: P) { self.0.send_lossy(Message::Release { peer }).await; } } /// Sends messages containing content to the router to send to peers. #[derive(Clone, Debug)] pub struct Messenger { sender: Mailbox>, } impl Messenger

{ /// Returns a new [Messenger] with the given sender. /// (The router has the corresponding receiver.) pub const fn new(sender: Mailbox>) -> Self { Self { sender } } /// Sends a message to the given `recipients`. /// /// Returns an empty list if the router has shut down. pub async fn content( &mut self, recipients: Recipients

, channel: Channel, mut message: impl Buf + Send, priority: bool, ) -> Vec

{ let message = message.copy_to_bytes(message.remaining()); self.sender .0 .request_or_default(|success| Message::Content { recipients, channel, message, priority, success, }) .await } } impl Connected for Messenger

{ type PublicKey = P; async fn subscribe(&mut self) -> ring::Receiver> { self.sender .0 .request(|response| Message::SubscribePeers { response }) .await .unwrap_or_else(|| { let (_, rx) = ring::channel(NZUsize!(1)); rx }) } }