use crate::{ authenticated::{ data::EncodedData, lookup::{channels::Channels, types}, relay::Relay, }, utils::limited::Connected, Channel, Recipients, }; use commonware_actor::{ mailbox::{self, UnreliablePolicy}, Feedback, Unreliable, }; use commonware_cryptography::PublicKey; use commonware_runtime::{BufferPool, IoBufs}; use commonware_utils::{ channel::{oneshot, ring}, NZUsize, }; use std::{collections::VecDeque, fmt}; /// Messages that can be processed by the router. pub enum Message { /// Notify the router that a peer is ready to communicate. Ready { peer: P, relay: Relay, channels: oneshot::Sender>, }, /// Notify the router that a peer is no longer available. Release { peer: P }, /// Send pre-encoded data to one or more recipients. Content { recipients: Recipients

, encoded: EncodedData, priority: bool, }, /// Register a subscription to peers known by the router. SubscribePeers { sender: ring::Sender> }, } impl UnreliablePolicy for Message

{ type Overflow = VecDeque; fn handle(overflow: &mut Self::Overflow, message: Self) -> bool { match message { Self::Content { .. } => false, message => { overflow.push_back(message); true } } } } /// Mailbox for the router actor. pub struct Mailbox(mailbox::UnreliableSender>); impl Clone for Mailbox

{ fn clone(&self) -> Self { Self(self.0.clone()) } } impl fmt::Debug for Mailbox

{ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_tuple("Mailbox").field(&self.0).finish() } } impl Mailbox

{ /// Returns a router mailbox around the provided sender. pub const fn new(sender: mailbox::UnreliableSender>) -> Self { Self(sender) } /// Notify the router that a peer is ready to communicate. /// /// Returns `None` if the router has shut down. pub async fn ready(&self, peer: P, relay: Relay) -> Option> { let (channels, receiver) = oneshot::channel(); let _ = self.0.enqueue(Message::Ready { peer, relay, channels, }); receiver.await.ok() } /// Notify the router that a peer is no longer available. /// /// This may fail during shutdown if the router has already exited, /// which is harmless since the router no longer tracks any peers. pub fn release(&self, peer: P) -> Feedback { match self.0.enqueue(Message::Release { peer }) { Unreliable::Outcome(feedback) => feedback, Unreliable::Rejected => unreachable!("router release cannot be rejected"), } } } /// Sends messages containing content to the router to send to peers. #[derive(Clone, Debug)] pub struct Messenger { pool: BufferPool, sender: Mailbox

, } impl Messenger

{ /// Returns a new [Messenger] with the given sender. /// (The router has the corresponding receiver.) pub const fn new(pool: BufferPool, sender: Mailbox

) -> Self { Self { pool, sender } } /// Sends a message to the given `recipients`. /// /// Encodes the message once and shares the encoded bytes across all recipients. /// Returns feedback from enqueueing the router message. pub fn content( &self, recipients: Recipients

, channel: Channel, message: IoBufs, priority: bool, ) -> Unreliable { // Build Data and encode Message::Data once for all recipients let encoded = types::Message::encode_data(&self.pool, channel, message); self.sender.0.enqueue(Message::Content { recipients, encoded, priority, }) } } impl Connected for Messenger

{ type PublicKey = P; fn subscribe(&self) -> ring::Receiver> { let (sender, receiver) = ring::channel(NZUsize!(1)); let _ = self.sender.0.enqueue(Message::SubscribePeers { sender }); receiver } }