use super::{ ingress::{Mailbox, Message, Messenger}, Config, }; use crate::{ authenticated::{data::EncodedData, discovery::channels::Channels, relay::Relay}, Recipients, }; use commonware_actor::mailbox; use commonware_cryptography::PublicKey; use commonware_macros::select_loop; use commonware_runtime::{spawn_cell, BufferPooler, ContextCell, Handle, Metrics, Spawner}; use commonware_utils::channel::ring; use futures::Sink; use std::{collections::BTreeMap, pin::Pin}; use tracing::debug; /// Router actor that manages peer connections and routing messages. pub struct Actor { context: ContextCell, control: mailbox::UnreliableReceiver>, connections: BTreeMap>, open_subscriptions: Vec>>, } impl Actor { /// Returns a new [Actor] along with a [Mailbox] and [Messenger] /// that can be used to send messages to the router. pub fn new(context: E, cfg: Config) -> (Self, Mailbox

, Messenger

) { // Create mailbox let (control_sender, control_receiver) = mailbox::new_unreliable::>(context.child("mailbox"), cfg.mailbox_size); let pool = context.network_buffer_pool().clone(); // Create actor ( Self { context: ContextCell::new(context), control: control_receiver, connections: BTreeMap::new(), open_subscriptions: Vec::new(), }, Mailbox::new(control_sender.clone()), Messenger::new(pool, Mailbox::new(control_sender)), ) } /// Sends pre-encoded data to the given `recipient`. fn send(&mut self, recipient: P, encoded: EncodedData, priority: bool) { if let Some(relay) = self.connections.get_mut(&recipient) { let _ = relay.send(encoded, priority); } } /// Routes content to the configured recipients. fn route(&mut self, recipients: Recipients

, encoded: EncodedData, priority: bool) { match recipients { Recipients::One(recipient) => { self.send(recipient, encoded, priority); } Recipients::Some(recipients) => { for recipient in recipients { self.send(recipient, encoded.clone(), priority); } } Recipients::All => { // Send to all connected peers for relay in self.connections.values_mut() { let _ = relay.send(encoded.clone(), priority); } } } } /// Starts a new task that runs the router [Actor]. /// Returns a [Handle] that can be used to await the completion of the task, /// which will run until its `control` receiver is closed. pub fn start(mut self, routing: Channels

) -> Handle<()> { spawn_cell!(self.context, self.run(routing)) } /// Runs the [Actor] event loop, processing incoming control and content messages. /// Returns when the `control` channel is closed. async fn run(mut self, routing: Channels

) { select_loop! { self.context, on_stopped => { debug!("context shutdown, stopping router"); }, Some(msg) = self.control.recv() else { debug!("mailbox closed, stopping router"); break; } => match msg { Message::Ready { peer, relay, channels, } => { debug!(?peer, "peer ready"); self.connections.insert(peer, relay); let _ = channels.send(routing.clone()); self.notify_subscribers(); } Message::Release { peer } => { debug!(?peer, "peer released"); self.connections.remove(&peer); self.notify_subscribers(); } Message::Content { recipients, encoded, priority, } => { self.route(recipients, encoded, priority); } Message::SubscribePeers { sender } => { self.subscribe_peers(sender); } }, } } fn subscribe_peers(&mut self, mut sender: ring::Sender>) { let peers = self.connections.keys().cloned().collect(); if Pin::new(&mut sender).start_send(peers).is_ok() { self.open_subscriptions.push(sender); } } /// Notifies all open peer subscriptions with the current list of connected peers. fn notify_subscribers(&mut self) { let peers: Vec

= self.connections.keys().cloned().collect(); let mut keep = Vec::with_capacity(self.open_subscriptions.len()); for mut subscriber in self.open_subscriptions.drain(..) { if Pin::new(&mut subscriber).start_send(peers.clone()).is_ok() { keep.push(subscriber); } } self.open_subscriptions = keep; } } #[cfg(test)] mod tests { use super::*; use commonware_cryptography::ed25519::PublicKey; use commonware_runtime::{deterministic, Runner as _}; use commonware_utils::NZUsize; #[test] fn subscribe_retains_only_open_initial_sender() { deterministic::Runner::default().start(|context| async move { let (mut actor, _, _) = Actor::::new( context, Config { mailbox_size: NZUsize!(1), }, ); let (sender, receiver) = ring::channel(NZUsize!(1)); drop(receiver); actor.subscribe_peers(sender); assert!(actor.open_subscriptions.is_empty()); let (sender, _receiver) = ring::channel(NZUsize!(1)); actor.subscribe_peers(sender); assert_eq!(actor.open_subscriptions.len(), 1); }); } }