use super::{ ingress::{Message, Messenger}, Config, }; use crate::{ authenticated::{ data::EncodedData, discovery::{channels::Channels, metrics}, relay::Relay, Mailbox, }, Recipients, }; use commonware_cryptography::PublicKey; use commonware_macros::select_loop; use commonware_runtime::{spawn_cell, ContextCell, Handle, Metrics, Spawner}; use commonware_utils::{ channel::{mpsc, ring}, NZUsize, }; use futures::SinkExt; use prometheus_client::metrics::{counter::Counter, family::Family}; use std::collections::BTreeMap; use tracing::debug; /// Router actor that manages peer connections and routing messages. pub struct Actor { context: ContextCell, control: mpsc::Receiver>, connections: BTreeMap>, open_subscriptions: Vec>>, messages_dropped: Family, } impl Actor { /// Returns a new [Actor] along with a [Mailbox] and [Messenger] /// that can be used to send messages to the router. pub fn new(context: E, cfg: Config) -> (Self, Mailbox>, Messenger

) { // Create mailbox let (control_sender, control_receiver) = Mailbox::new(cfg.mailbox_size); // Create metrics let messages_dropped = Family::::default(); context.register( "messages_dropped", "messages dropped", messages_dropped.clone(), ); // Create actor ( Self { context: ContextCell::new(context), control: control_receiver, connections: BTreeMap::new(), open_subscriptions: Vec::new(), messages_dropped, }, control_sender.clone(), Messenger::new(control_sender), ) } /// Sends pre-encoded data to the given `recipient`. fn send(&mut self, recipient: P, encoded: EncodedData, priority: bool, sent: &mut Vec

) { let channel = encoded.channel; if let Some(relay) = self.connections.get_mut(&recipient) { if relay.send(encoded, priority).is_ok() { sent.push(recipient); } else { self.messages_dropped .get_or_create(&metrics::Message::new_data(&recipient, channel)) .inc(); } } else { self.messages_dropped .get_or_create(&metrics::Message::new_data(&recipient, channel)) .inc(); } } /// Starts a new task that runs the router [Actor]. /// Returns a [Handle] that can be used to await the completion of the task, /// which will run until its `control` receiver is closed. pub fn start(mut self, routing: Channels

) -> Handle<()> { spawn_cell!(self.context, self.run(routing).await) } /// Runs the [Actor] event loop, processing incoming messages control messages /// ([Message::Ready], [Message::Release]) and content messages ([Message::Content]). /// Returns when the `control` channel is closed. async fn run(mut self, routing: Channels

) { select_loop! { self.context, on_stopped => { debug!("context shutdown, stopping router"); }, Some(msg) = self.control.recv() else { debug!("mailbox closed, stopping router"); break; } => { match msg { Message::Ready { peer, relay, channels, } => { debug!(?peer, "peer ready"); self.connections.insert(peer, relay); let _ = channels.send(routing.clone()); self.notify_subscribers().await; } Message::Release { peer } => { debug!(?peer, "peer released"); self.connections.remove(&peer); self.notify_subscribers().await; } Message::Content { recipients, encoded, priority, success, } => { let mut sent = Vec::new(); let channel = encoded.channel; match recipients { Recipients::One(recipient) => { self.send(recipient, encoded, priority, &mut sent); } Recipients::Some(recipients) => { for recipient in recipients { self.send(recipient, encoded.clone(), priority, &mut sent); } } Recipients::All => { // Send to all connected peers for (recipient, relay) in self.connections.iter_mut() { if relay.send(encoded.clone(), priority).is_ok() { sent.push(recipient.clone()); } else { self.messages_dropped .get_or_create(&metrics::Message::new_data( recipient, channel, )) .inc(); } } } } // Communicate success back to sender (if still alive) let _ = success.send(sent); } Message::SubscribePeers { response } => { let (mut sender, receiver) = ring::channel::>(NZUsize!(1)); // Send existing peers immediately let peers = self.connections.keys().cloned().collect(); let _ = sender.send(peers).await; self.open_subscriptions.push(sender); let _ = response.send(receiver); } } }, } } /// Notifies all open peer subscriptions with the current list of connected peers. async fn notify_subscribers(&mut self) { let peers: Vec

= self.connections.keys().cloned().collect(); let mut keep = Vec::with_capacity(self.open_subscriptions.len()); for mut subscriber in self.open_subscriptions.drain(..) { if subscriber.send(peers.clone()).await.is_ok() { keep.push(subscriber); } } self.open_subscriptions = keep; } }