use super::{ingress::Message, Config}; use crate::authenticated::{ lookup::{ actors::{peer, router, tracker}, metrics, }, mailbox::UnboundedMailbox, Mailbox, }; use commonware_cryptography::PublicKey; use commonware_macros::select_loop; use commonware_runtime::{ spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Sink, Spawner, Stream, }; use commonware_utils::channel::mpsc; use prometheus_client::metrics::{counter::Counter, family::Family}; use rand_core::CryptoRngCore; use tracing::debug; pub struct Actor< E: Spawner + BufferPooler + Clock + CryptoRngCore + Metrics, Si: Sink, St: Stream, C: PublicKey, > { context: ContextCell, mailbox_size: usize, ping_frequency: std::time::Duration, receiver: mpsc::Receiver>, sent_messages: Family, received_messages: Family, dropped_messages: Family, rate_limited: Family, } impl< E: Spawner + BufferPooler + Clock + CryptoRngCore + Metrics, Si: Sink, St: Stream, C: PublicKey, > Actor { pub fn new(context: E, cfg: Config) -> (Self, Mailbox>) { let sent_messages = Family::::default(); let received_messages = Family::::default(); let dropped_messages = Family::::default(); let rate_limited = Family::::default(); context.register("messages_sent", "messages sent", sent_messages.clone()); context.register( "messages_received", "messages received", received_messages.clone(), ); context.register( "messages_dropped", "messages dropped due to full application buffer", dropped_messages.clone(), ); context.register( "messages_rate_limited", "messages rate limited", rate_limited.clone(), ); let (sender, receiver) = Mailbox::new(cfg.mailbox_size); ( Self { context: ContextCell::new(context), mailbox_size: cfg.mailbox_size, ping_frequency: cfg.ping_frequency, receiver, sent_messages, received_messages, dropped_messages, rate_limited, }, sender, ) } pub fn start( mut self, tracker: UnboundedMailbox>, router: Mailbox>, ) -> Handle<()> { spawn_cell!(self.context, self.run(tracker, router).await) } async fn run( mut self, tracker: UnboundedMailbox>, router: Mailbox>, ) { select_loop! { self.context, on_stopped => { debug!("context shutdown, stopping spawner"); }, Some(msg) = self.receiver.recv() else { debug!("mailbox closed, stopping spawner"); break; } => { match msg { Message::Spawn { peer, connection, reservation, } => { // Clone required variables let sent_messages = self.sent_messages.clone(); let received_messages = self.received_messages.clone(); let dropped_messages = self.dropped_messages.clone(); let rate_limited = self.rate_limited.clone(); let mut tracker = tracker.clone(); let mut router = router.clone(); // Spawn peer self.context .with_label("peer") .spawn(move |context| async move { // Create peer debug!(?peer, "peer started"); let (peer_actor, peer_mailbox, messenger) = peer::Actor::new( context, peer::Config { ping_frequency: self.ping_frequency, sent_messages, received_messages, dropped_messages, rate_limited, mailbox_size: self.mailbox_size, }, ); // Register peer with the router (may fail during shutdown) let Some(channels) = router.ready(peer.clone(), messenger).await else { debug!(?peer, "router shut down during peer setup"); return; }; // Register peer with tracker tracker.connect(peer.clone(), peer_mailbox); // Run peer let result = peer_actor.run(peer.clone(), connection, channels).await; // Let the router know the peer has exited match result { Ok(()) => debug!(?peer, "peer shutdown gracefully"), Err(e) => debug!(error = ?e, ?peer, "peer shutdown"), } router.release(peer).await; // Release the reservation drop(reservation) }); } } }, } } }