//! Actor responsible for dialing peers and establishing connections. use crate::authenticated::{ discovery::{ actors::{ spawner, tracker::{self, Metadata, Reservation}, }, metrics, }, mailbox::UnboundedMailbox, Mailbox, }; use commonware_cryptography::Signer; use commonware_macros::select_loop; use commonware_runtime::{ spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Network, Resolver, SinkOf, Spawner, StreamOf, }; use commonware_stream::encrypted::{dial, Config as StreamConfig}; use commonware_utils::SystemTimeExt; use prometheus_client::metrics::{counter::Counter, family::Family}; use rand::seq::SliceRandom; use rand_core::CryptoRngCore; use std::time::Duration; use tracing::debug; /// Configuration for the dialer actor. pub struct Config { /// Configuration for the stream. pub stream_cfg: StreamConfig, /// The frequency at which to dial a single peer from the queue. This also limits the rate at /// which we attempt to dial peers in general. pub dial_frequency: Duration, /// The frequency at which to refresh the list of dialable peers if there are no more peers in /// the queue. This also limits the rate at which any single peer is dialed multiple times. /// /// This approach attempts to help ensure that the connection rate-limiter is not maxed out for /// a single peer by preventing dialing it as fast as possible. This should make it easier for /// other peers to dial us. pub query_frequency: Duration, /// Whether to allow dialing private IP addresses after DNS resolution. pub allow_private_ips: bool, } /// Actor responsible for dialing peers and establishing outgoing connections. pub struct Actor { context: ContextCell, // ---------- State ---------- /// The list of peers to dial. queue: Vec, // ---------- Configuration ---------- stream_cfg: StreamConfig, dial_frequency: Duration, query_frequency: Duration, allow_private_ips: bool, // ---------- Metrics ---------- /// The number of dial attempts made to each peer. attempts: Family, } impl< E: Spawner + BufferPooler + Clock + Network + Resolver + CryptoRngCore + Metrics, C: Signer, > Actor { pub fn new(context: E, cfg: Config) -> Self { let attempts = Family::::default(); context.register( "attempts", "The number of dial attempts made to each peer", attempts.clone(), ); Self { context: ContextCell::new(context), queue: Vec::new(), stream_cfg: cfg.stream_cfg, dial_frequency: cfg.dial_frequency, query_frequency: cfg.query_frequency, allow_private_ips: cfg.allow_private_ips, attempts, } } /// Dial a peer for which we have a reservation. #[allow(clippy::type_complexity)] async fn dial_peer( &mut self, reservation: Reservation, supervisor: &mut Mailbox, StreamOf, C::PublicKey>>, ) { // Extract metadata from the reservation let Metadata::Dialer(peer, ingress) = reservation.metadata().clone() else { unreachable!("unexpected reservation metadata"); }; // Increment metrics. self.attempts .get_or_create(&metrics::Peer::new(&peer)) .inc(); // Spawn dialer to connect to peer self.context.with_label("dialer").spawn({ let config = self.stream_cfg.clone(); let mut supervisor = supervisor.clone(); let allow_private_ips = self.allow_private_ips; move |mut context| async move { // Resolve ingress to socket addresses (filtered by private IP policy) let addresses: Vec<_> = ingress .resolve_filtered(&context, allow_private_ips) .await .map(Iterator::collect) .unwrap_or_default(); let Some(&address) = addresses.choose(&mut context) else { debug!(?ingress, "failed to resolve or no valid addresses"); return; }; // Attempt to dial peer let (sink, stream) = match context.dial(address).await { Ok(stream) => stream, Err(err) => { debug!(?err, "failed to dial peer"); return; } }; debug!(?peer, ?ingress, "dialed peer"); // Upgrade connection let instance = match dial(context, config, peer.clone(), stream, sink).await { Ok(instance) => instance, Err(err) => { debug!(?err, "failed to upgrade connection"); return; } }; debug!(?peer, ?ingress, "upgraded connection"); // Start peer to handle messages supervisor.spawn(instance, reservation).await; } }); } /// Start the dialer actor. #[allow(clippy::type_complexity)] pub fn start( mut self, tracker: UnboundedMailbox>, supervisor: Mailbox, StreamOf, C::PublicKey>>, ) -> Handle<()> { spawn_cell!(self.context, self.run(tracker, supervisor).await) } #[allow(clippy::type_complexity)] async fn run( mut self, mut tracker: UnboundedMailbox>, mut supervisor: Mailbox, StreamOf, C::PublicKey>>, ) { let mut dial_deadline = self.context.current(); let mut query_deadline = self.context.current(); select_loop! { self.context, on_stopped => { debug!("context shutdown, stopping dialer"); }, _ = self.context.sleep_until(dial_deadline) => { // Update the deadline. dial_deadline = dial_deadline.add_jittered(&mut self.context, self.dial_frequency); // Pop the queue until we can reserve a peer. // If a peer is reserved, attempt to dial it. while let Some(peer) = self.queue.pop() { // Attempt to reserve peer. let Some(reservation) = tracker.dial(peer).await else { continue; }; self.dial_peer(reservation, &mut supervisor).await; break; } }, _ = self.context.sleep_until(query_deadline) => { // Update the deadline. query_deadline = query_deadline.add_jittered(&mut self.context, self.query_frequency); // Only update the queue if it is empty. if self.queue.is_empty() { // Query the tracker for dialable peers and shuffle the list to prevent // starvation. self.queue = tracker.dialable().await; self.queue.shuffle(&mut self.context); } }, } } } #[cfg(test)] mod tests { use super::*; use crate::{ authenticated::discovery::actors::tracker::{ingress::Releaser, Metadata}, Ingress, }; use commonware_cryptography::ed25519::{PrivateKey, PublicKey}; use commonware_macros::select; use commonware_runtime::{deterministic, Clock, Runner}; use commonware_stream::encrypted::Config as StreamConfig; use std::{ net::{Ipv4Addr, SocketAddr}, time::Duration, }; fn test_stream_config(signing_key: PrivateKey) -> StreamConfig { StreamConfig { signing_key, namespace: b"test".to_vec(), max_message_size: 1024, handshake_timeout: Duration::from_secs(5), synchrony_bound: Duration::from_secs(5), max_handshake_age: Duration::from_secs(10), } } #[test] fn test_dialer_dials_one_peer_per_tick() { let executor = deterministic::Runner::timed(Duration::from_secs(10)); executor.start(|context| async move { let signer = PrivateKey::from_seed(0); let dial_frequency = Duration::from_millis(100); let dialer_cfg = Config { stream_cfg: test_stream_config(signer), dial_frequency, query_frequency: Duration::from_secs(60), allow_private_ips: true, }; let dialer = Actor::new(context.with_label("dialer"), dialer_cfg); let (tracker_mailbox, mut tracker_rx) = UnboundedMailbox::>::new(); // Create a releaser for reservations let (releaser_mailbox, _releaser_rx) = UnboundedMailbox::>::new(); let releaser = Releaser::new(releaser_mailbox); // Generate 10 peers let peers: Vec = (0..10) .map(|i| PrivateKey::from_seed(i).public_key()) .collect(); // Create a supervisor that just drops spawn messages let (supervisor, mut supervisor_rx) = Mailbox::>::new(100); context .with_label("supervisor") .spawn(|_| async move { while supervisor_rx.recv().await.is_some() {} }); // Start the dialer let _handle = dialer.start(tracker_mailbox, supervisor); // Handle messages until deadline, counting dial attempts let mut dial_count = 0; let deadline = context.current() + dial_frequency * 3; loop { select! { msg = tracker_rx.recv() => match msg { Some(tracker::Message::Dialable { responder }) => { let _ = responder.send(peers.clone()); } Some(tracker::Message::Dial { public_key, reservation, }) => { dial_count += 1; let ingress: Ingress = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8000).into(); let metadata = Metadata::Dialer(public_key, ingress); let res = tracker::Reservation::new(metadata, releaser.clone()); let _ = reservation.send(Some(res)); } _ => {} }, _ = context.sleep_until(deadline) => break, } } // Should have dialed ~3 peers (one per tick), not all 10 at once assert!( (2..=4).contains(&dial_count), "expected 2-4 dial attempts (one per tick), got {}", dial_count ); }); } }