use super::{ingress::Message, Config}; use crate::authenticated::{ discovery::{ actors::{ peer, router, tracker::{self, Metadata}, }, metrics, types::InfoVerifier, }, mailbox::UnboundedMailbox, Mailbox, }; use commonware_cryptography::PublicKey; use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Metrics, Sink, Spawner, Stream}; use futures::{channel::mpsc, StreamExt}; use governor::{clock::ReasonablyRealtime, Quota}; use prometheus_client::metrics::{counter::Counter, family::Family, gauge::Gauge}; use rand::{CryptoRng, Rng}; use std::time::Duration; use tracing::debug; pub struct Actor< E: Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + Metrics, O: Sink, I: Stream, C: PublicKey, > { context: ContextCell, mailbox_size: usize, gossip_bit_vec_frequency: Duration, allowed_bit_vec_rate: Quota, max_peer_set_size: u64, allowed_peers_rate: Quota, peer_gossip_max_count: usize, info_verifier: InfoVerifier, receiver: mpsc::Receiver>, connections: Gauge, sent_messages: Family, received_messages: Family, rate_limited: Family, } impl< E: Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + Metrics, O: Sink, I: Stream, C: PublicKey, > Actor { #[allow(clippy::type_complexity)] pub fn new(context: E, cfg: Config) -> (Self, Mailbox>) { let connections = Gauge::default(); let sent_messages = Family::::default(); let received_messages = Family::::default(); let rate_limited = Family::::default(); context.register( "connections", "number of connected peers", connections.clone(), ); context.register("messages_sent", "messages sent", sent_messages.clone()); context.register( "messages_received", "messages received", received_messages.clone(), ); context.register( "messages_rate_limited", "messages rate limited", rate_limited.clone(), ); let (sender, receiver) = Mailbox::new(cfg.mailbox_size); ( Self { context: ContextCell::new(context), mailbox_size: cfg.mailbox_size, gossip_bit_vec_frequency: cfg.gossip_bit_vec_frequency, allowed_bit_vec_rate: cfg.allowed_bit_vec_rate, max_peer_set_size: cfg.max_peer_set_size, allowed_peers_rate: cfg.allowed_peers_rate, peer_gossip_max_count: cfg.peer_gossip_max_count, info_verifier: cfg.info_verifier, receiver, connections, sent_messages, received_messages, rate_limited, }, sender, ) } pub fn start( mut self, tracker: UnboundedMailbox>, router: Mailbox>, ) -> Handle<()> { spawn_cell!(self.context, self.run(tracker, router).await) } async fn run( mut self, tracker: UnboundedMailbox>, router: Mailbox>, ) { while let Some(msg) = self.receiver.next().await { match msg { Message::Spawn { peer, connection, reservation, } => { // Mark peer as connected self.connections.inc(); // Spawn peer self.context.with_label("peer").spawn({ let connections = self.connections.clone(); let sent_messages = self.sent_messages.clone(); let received_messages = self.received_messages.clone(); let rate_limited = self.rate_limited.clone(); let mut tracker = tracker.clone(); let mut router = router.clone(); let is_dialer = matches!(reservation.metadata(), Metadata::Dialer(..)); let info_verifier = self.info_verifier.clone(); move |context| async move { // Create peer debug!(?peer, "peer started"); let (peer_actor, peer_mailbox, messenger) = peer::Actor::new( context, peer::Config { sent_messages, received_messages, rate_limited, mailbox_size: self.mailbox_size, gossip_bit_vec_frequency: self.gossip_bit_vec_frequency, allowed_bit_vec_rate: self.allowed_bit_vec_rate, max_peer_set_size: self.max_peer_set_size, allowed_peers_rate: self.allowed_peers_rate, peer_gossip_max_count: self.peer_gossip_max_count, info_verifier, }, ); // Register peer with the router let channels = router.ready(peer.clone(), messenger).await; // Register peer with tracker tracker.connect(peer.clone(), is_dialer, peer_mailbox).await; // Run peer let e = peer_actor .run(peer.clone(), connection, tracker, channels) .await; connections.dec(); // Let the router know the peer has exited debug!(error = ?e, ?peer, "peer shutdown"); router.release(peer).await; // Release the reservation drop(reservation); } }); } } } debug!("supervisor shutdown"); } }