use super::{ directory::{self, Directory}, ingress::{Mailbox, Message, Oracle}, Config, }; use crate::{ authenticated::lookup::actors::{listener, peer, tracker::ingress::Releaser}, PeerSetUpdate, }; use commonware_actor::mailbox; use commonware_cryptography::Signer; use commonware_macros::select_loop; use commonware_runtime::{ spawn_cell, Clock, ContextCell, Handle, Metrics as RuntimeMetrics, Spawner, }; use commonware_utils::channel::{fallible::FallibleExt, mpsc}; use rand::Rng; use std::collections::HashMap; use tracing::debug; /// The tracker actor that manages peer discovery and connection reservations. pub struct Actor { context: ContextCell, // ---------- Message-Passing ---------- /// The mailbox for the actor. /// /// We use this to support sending a [`Message::Release`] message to the actor /// during [`Drop`]. receiver: mailbox::Receiver>, /// The mailbox for the listener. listener: listener::Mailbox, // ---------- State ---------- /// Tracks peer sets and peer connectivity information. directory: Directory, /// Maps a peer's public key to its mailbox. /// Set when a peer connects and cleared when it is blocked or released. mailboxes: HashMap, /// Subscribers to peer set updates. subscribers: Vec>>, } impl Actor { /// Create a new tracker [Actor] from the given `context` and `cfg`. #[allow(clippy::type_complexity)] pub fn new(context: E, cfg: Config) -> (Self, Mailbox, Oracle) { // General initialization let directory_cfg = directory::Config { max_sets: cfg.tracked_peer_sets, peer_connection_cooldown: cfg.peer_connection_cooldown, allow_private_ips: cfg.allow_private_ips, allow_dns: cfg.allow_dns, bypass_ip_check: cfg.bypass_ip_check, block_duration: cfg.block_duration, }; // Create the mailboxes let (sender, receiver) = mailbox::new(context.child("mailbox"), cfg.mailbox_size); let oracle = Oracle::new(sender.clone()); let releaser = Releaser::new(sender.clone()); // Create the directory let directory = Directory::init( context.child("directory"), cfg.crypto.public_key(), directory_cfg, releaser, ); ( Self { context: ContextCell::new(context), receiver, directory, listener: cfg.listener, mailboxes: HashMap::new(), subscribers: Vec::new(), }, Mailbox::new(sender), oracle, ) } /// Start the actor and run it in the background. pub fn start(mut self) -> Handle<()> { spawn_cell!(self.context, self.run()) } async fn run(mut self) { select_loop! { self.context, on_stopped => { debug!("context shutdown, stopping tracker"); }, _ = self.directory.wait_for_unblock() => { if self.directory.unblock_expired() { let _ = self.listener.set(self.directory.listenable()); } }, Some(msg) = self.receiver.recv() else { debug!("mailbox closed, stopping tracker"); break; } => { self.handle_msg(msg); }, } } /// Handle a [`Message`]. fn handle_msg(&mut self, msg: Message) { match msg { Message::Register { index, peers } => { // Identify peers whose connection state should be torn down. let Some(kill_peers) = self.directory.track(index, peers) else { return; }; // Kill active peers no longer in any tracked peer set or whose addresses changed. for peer in kill_peers { self.kill_peer(&peer); } // Send the updated listenable IPs to the listener. let _ = self.listener.set(self.directory.listenable()); // Notify all subscribers about the new peer set let update = self .directory .latest_update() .expect("latest update missing after successful track"); self.subscribers .retain(|subscriber| subscriber.send_lossy(update.clone())); } Message::Overwrite { peers } => { let mut any_changed = false; for (public_key, address) in peers { // Update the peer address. if !self.directory.overwrite(&public_key, address) { continue; } any_changed = true; // Kill the existing connection since it was established to the old address. self.kill_peer(&public_key); } // Send the updated listenable IPs to the listener (if any changes occurred). if any_changed { let _ = self.listener.set(self.directory.listenable()); } } Message::PeerSet { index, responder } => { let _ = responder.send(self.directory.get_peer_set(&index)); } Message::Subscribe { responder } => { // Create a new subscription channel let (sender, receiver) = mpsc::unbounded_channel(); // Send the latest peer set immediately if let Some(update) = self.directory.latest_update() { sender.send_lossy(update); } self.subscribers.push(sender); // Return the receiver to the caller let _ = responder.send(receiver); } Message::Connect { public_key, peer } => { // Kill if peer is not eligible if !self.directory.eligible(&public_key) { peer.kill(); return; } // Promote the reservation unless it was invalidated before Connect arrived. if !self.directory.connect(&public_key) { peer.kill(); return; } self.mailboxes.insert(public_key, peer); } Message::Dialable { responder } => { let _ = responder.send(self.directory.dialable()); } Message::Dial { public_key, reservation, } => { let _ = reservation.send(self.directory.dial(&public_key)); } Message::Acceptable { public_key, source_ip, responder, } => { let _ = responder.send(self.directory.acceptable(&public_key, source_ip)); } Message::Listen { public_key, source_ip, reservation, } => { let _ = reservation.send(self.directory.listen(&public_key, source_ip)); } Message::Block { public_key } => { // Block the peer self.directory.block(&public_key); // Kill the peer if we're connected to it self.kill_peer(&public_key); // Send the updated listenable IPs to the listener. let _ = self.listener.set(self.directory.listenable()); } Message::Release { metadata } => { // Clear the peer handle if it exists self.mailboxes.remove(metadata.public_key()); // Release the peer self.directory.release(metadata); } } } fn kill_peer(&mut self, public_key: &C::PublicKey) { if let Some(peer) = self.mailboxes.remove(public_key) { peer.kill(); } } } #[cfg(test)] mod tests { use super::*; use crate::{ authenticated::lookup::actors::peer, AddressableManager, AddressableTrackedPeers, Ingress, Provider, }; use commonware_cryptography::{ ed25519::{PrivateKey, PublicKey}, Signer, }; use commonware_runtime::{ deterministic::{self}, Clock, Runner, Supervisor as _, }; use commonware_utils::{ ordered::{Map, Set}, NZUsize, }; use futures::{FutureExt, StreamExt}; use std::{ net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, time::Duration, }; // Test Configuration Setup fn test_config(crypto: C, bypass_ip_check: bool) -> (Config, listener::Updates) { let (registered_ips_sender, registered_ips_receiver) = listener::Mailbox::new(); ( Config { crypto, mailbox_size: NZUsize!(1024), tracked_peer_sets: NZUsize!(2), peer_connection_cooldown: Duration::from_millis(200), allow_private_ips: true, allow_dns: true, bypass_ip_check, listener: registered_ips_sender, block_duration: Duration::from_secs(100), }, registered_ips_receiver, ) } // Helper to create Ed25519 signer and public key fn new_signer_and_pk(seed: u64) -> (PrivateKey, PublicKey) { let signer = PrivateKey::from_seed(seed); let pk = signer.public_key(); (signer, pk) } // Test Harness struct TestHarness { mailbox: Mailbox, oracle: Oracle, } fn setup_actor( runner_context: deterministic::Context, cfg_to_clone: Config, // Pass by value to allow cloning ) -> TestHarness { // Actor::new takes ownership, so clone again if cfg_to_clone is needed later let (actor, mailbox, oracle) = Actor::new(runner_context, cfg_to_clone); actor.start(); TestHarness { mailbox, oracle } } #[test] fn test_connect_unauthorized_peer_is_killed() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (cfg, _) = test_config(PrivateKey::from_seed(0), false); let TestHarness { mailbox, .. } = setup_actor(context.child("actor"), cfg); let (_unauth_signer, unauth_pk) = new_signer_and_pk(1); let (peer_mailbox, mut peer_receiver) = peer::Mailbox::new(NZUsize!(1)); // Connect as listener let _ = mailbox.connect(unauth_pk.clone(), peer_mailbox); assert!( matches!(peer_receiver.next().await, Some(peer::Message::Kill)), "Unauthorized peer should be killed on Connect" ); }); } #[test] fn test_block_peer_standard_behavior() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (cfg_initial, _) = test_config(PrivateKey::from_seed(0), false); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg_initial); let (_, pk) = new_signer_and_pk(1); let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1001); oracle.track( 0, Map::<_, crate::Address>::try_from([(pk.clone(), addr.into())]).unwrap(), ); context.sleep(Duration::from_millis(10)).await; let dialable = mailbox.dialable().await; assert!(dialable.peers.iter().any(|peer| peer == &pk)); crate::block_peer(&mut oracle, pk.clone()); context.sleep(Duration::from_millis(10)).await; let dialable = mailbox.dialable().await; assert!(!dialable.peers.iter().any(|peer| peer == &pk)); }); } #[test] fn test_block_peer_already_blocked_is_noop() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (cfg_initial, _) = test_config(PrivateKey::from_seed(0), false); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg_initial); let (_, pk1) = new_signer_and_pk(1); let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1001); oracle.track( 0, Map::<_, crate::Address>::try_from([(pk1.clone(), addr.into())]).unwrap(), ); context.sleep(Duration::from_millis(10)).await; crate::block_peer(&mut oracle, pk1.clone()); context.sleep(Duration::from_millis(10)).await; let dialable = mailbox.dialable().await; assert!(!dialable.peers.iter().any(|peer| peer == &pk1)); crate::block_peer(&mut oracle, pk1.clone()); context.sleep(Duration::from_millis(10)).await; let dialable = mailbox.dialable().await; assert!(!dialable.peers.iter().any(|peer| peer == &pk1)); }); } #[test] fn test_block_peer_non_existent_is_noop() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (cfg_initial, _) = test_config(PrivateKey::from_seed(0), false); let TestHarness { mut oracle, .. } = setup_actor(context.child("actor"), cfg_initial); let (_s1_signer, pk_non_existent) = new_signer_and_pk(100); crate::block_peer(&mut oracle, pk_non_existent); context.sleep(Duration::from_millis(10)).await; }); } #[test] fn test_listenable() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (peer_signer, peer_pk) = new_signer_and_pk(1); let peer_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 1001); let (_peer_signer2, peer_pk2) = new_signer_and_pk(2); let peer_addr2 = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 2).into(), 1002); let (_peer_signer3, peer_pk3) = new_signer_and_pk(3); let peer_addr3 = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 3).into(), 1003); let (cfg_initial, _) = test_config(peer_signer, false); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg_initial); // None acceptable because not registered assert!(!mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await); assert!(!mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await); assert!(!mailbox.acceptable(peer_pk3.clone(), peer_addr3.ip()).await); oracle.track( 0, Map::<_, crate::Address>::try_from([ (peer_pk.clone(), peer_addr.into()), (peer_pk2.clone(), peer_addr2.into()), ]) .unwrap(), ); context.sleep(Duration::from_millis(10)).await; // Not acceptable because self assert!(!mailbox.acceptable(peer_pk, peer_addr.ip()).await); // Acceptable because registered with correct IP assert!(mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await); // Not acceptable with wrong IP assert!(!mailbox.acceptable(peer_pk2, peer_addr.ip()).await); // Not acceptable because not registered assert!(!mailbox.acceptable(peer_pk3, peer_addr3.ip()).await); }); } #[test] fn test_acceptable_bypass_ip_check() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (peer_signer, peer_pk) = new_signer_and_pk(1); let peer_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 1001); let (_peer_signer2, peer_pk2) = new_signer_and_pk(2); let peer_addr2 = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 2).into(), 1002); let (_peer_signer3, peer_pk3) = new_signer_and_pk(3); let peer_addr3 = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 3).into(), 1003); // Create a tracker with bypass_ip_check=true (skips IP verification) let (cfg, _) = test_config(peer_signer, true); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg); // Unknown peer is NOT acceptable (bypass_ip_check only skips IP check) assert!( !mailbox.acceptable(peer_pk3.clone(), peer_addr3.ip()).await, "Unknown peer should not be acceptable" ); oracle.track( 0, Map::<_, crate::Address>::try_from([ (peer_pk.clone(), peer_addr.into()), (peer_pk2.clone(), peer_addr2.into()), ]) .unwrap(), ); context.sleep(Duration::from_millis(10)).await; // With bypass_ip_check=true, tracked peer with wrong IP is acceptable assert!( mailbox.acceptable(peer_pk2.clone(), peer_addr.ip()).await, "Registered peer with wrong IP should be acceptable with bypass_ip_check=true" ); // Self is still not acceptable assert!( !mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await, "Self should not be acceptable" ); // Block peer_pk2 and verify it's not acceptable crate::block_peer(&mut oracle, peer_pk2.clone()); context.sleep(Duration::from_millis(10)).await; assert!( !mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await, "Blocked peer should not be acceptable" ); }); } #[test] fn test_listen() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (cfg_initial, _) = test_config(PrivateKey::from_seed(0), false); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg_initial); let (_peer_signer, peer_pk) = new_signer_and_pk(1); let peer_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8080); let reservation = mailbox.listen(peer_pk.clone(), peer_addr.ip()).await; assert!(reservation.is_none()); oracle.track( 0, Map::<_, crate::Address>::try_from([(peer_pk.clone(), peer_addr.into())]).unwrap(), ); context.sleep(Duration::from_millis(10)).await; // Allow register to process assert!(mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await); let reservation = mailbox.listen(peer_pk.clone(), peer_addr.ip()).await; assert!(reservation.is_some()); assert!(!mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await); let failed_reservation = mailbox.listen(peer_pk.clone(), peer_addr.ip()).await; assert!(failed_reservation.is_none()); drop(reservation.unwrap()); context.sleep(Duration::from_millis(1_010)).await; // Allow release and rate limit to pass let reservation_after_release = mailbox.listen(peer_pk.clone(), peer_addr.ip()).await; assert!(reservation_after_release.is_some()); }); } #[test] fn test_dialable_message() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (_boot_signer, boot_pk) = new_signer_and_pk(99); let boot_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9000); let (cfg_initial, _) = test_config(PrivateKey::from_seed(0), false); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg_initial); oracle.track( 0, Map::<_, crate::Address>::try_from([(boot_pk.clone(), boot_addr.into())]).unwrap(), ); let dialable = mailbox.dialable().await; assert_eq!(dialable.peers.len(), 1); assert_eq!(dialable.peers[0], boot_pk); }); } #[test] fn test_dial_message() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (_boot_signer, boot_pk) = new_signer_and_pk(99); let boot_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9000); let (cfg_initial, _) = test_config(PrivateKey::from_seed(0), false); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg_initial); oracle.track( 0, Map::<_, crate::Address>::try_from([(boot_pk.clone(), boot_addr.into())]).unwrap(), ); let result = mailbox.dial(boot_pk.clone()).await; assert!(result.is_some()); if let Some((res, ingress)) = result { match res.metadata() { crate::authenticated::lookup::actors::tracker::Metadata::Dialer(pk) => { assert_eq!(pk, &boot_pk); } _ => panic!("Expected Dialer metadata"), } assert_eq!(ingress, Ingress::Socket(boot_addr)); } let (_unknown_signer, unknown_pk) = new_signer_and_pk(100); let no_reservation = mailbox.dial(unknown_pk).await; assert!(no_reservation.is_none()); }); } #[test] fn test_secondary_peers_are_acceptable_but_not_primary_or_dialable() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (cfg, _) = test_config(PrivateKey::from_seed(0), false); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg); let mut subscription = oracle.subscribe().await; let (_primary_signer, primary_pk) = new_signer_and_pk(1); let primary_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9001); let (_secondary_signer, secondary_pk) = new_signer_and_pk(2); let secondary_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9002); oracle.track( 0, AddressableTrackedPeers::new( Map::<_, crate::Address>::try_from([(primary_pk.clone(), primary_addr.into())]) .unwrap(), Map::<_, crate::Address>::try_from([( secondary_pk.clone(), secondary_addr.into(), )]) .unwrap(), ), ); let update = subscription.recv().await.unwrap(); assert_eq!(update.index, 0); assert_eq!(update.latest.primary.len(), 1); assert!(update.latest.primary.position(&primary_pk).is_some()); assert!(update.latest.primary.position(&secondary_pk).is_none()); assert_eq!( update.latest.secondary, Set::try_from([secondary_pk.clone()]).unwrap() ); assert_eq!(update.all.primary, update.latest.primary); assert_eq!( update.all.secondary, Set::try_from([secondary_pk.clone()]).unwrap() ); let dialable = mailbox.dialable().await; assert!(dialable.peers.iter().any(|peer| peer == &primary_pk)); assert!(!dialable.peers.iter().any(|peer| peer == &secondary_pk)); assert!(mailbox.dial(secondary_pk.clone()).await.is_none()); assert!(mailbox.acceptable(secondary_pk, secondary_addr.ip()).await); }); } #[test] fn test_overlapping_primary_secondary_no_duplicate_in_subscription() { let executor = deterministic::Runner::default(); executor.start(|context| async move { // Duplicate key across primary/secondary maps; deduplicated as primary only. let (cfg, _) = test_config(PrivateKey::from_seed(0), false); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg); let mut subscription = oracle.subscribe().await; let (_signer, pk) = new_signer_and_pk(1); let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9001); oracle.track( 0, AddressableTrackedPeers::new( Map::<_, crate::Address>::try_from([(pk.clone(), addr.into())]).unwrap(), Map::<_, crate::Address>::try_from([(pk.clone(), addr.into())]).unwrap(), ), ); let update = subscription.recv().await.unwrap(); assert_eq!(update.index, 0); assert_eq!(update.latest.primary.len(), 1); assert!(update.latest.primary.position(&pk).is_some()); assert!( update.latest.secondary.is_empty(), "overlap peer is deduplicated as primary only" ); assert_eq!(update.all.primary, update.latest.primary); assert!( update.all.secondary.is_empty(), "aggregate secondary excludes keys that are primary" ); let dialable = mailbox.dialable().await; assert!(dialable.peers.iter().any(|peer| peer == &pk)); assert!(mailbox.acceptable(pk, addr.ip()).await); }); } #[test] fn test_block_clears_peer_mailbox_and_only_kills_once() { let executor = deterministic::Runner::default(); executor.start(|context| async move { // 1) Setup actor let (cfg, _) = test_config(PrivateKey::from_seed(0), false); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg); // 2) Register & connect an authorized peer let (_peer_signer, peer_pk) = new_signer_and_pk(1); let peer_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 12345); oracle.track( 0, Map::<_, crate::Address>::try_from([(peer_pk.clone(), peer_addr.into())]).unwrap(), ); // let the register take effect context.sleep(Duration::from_millis(10)).await; let reservation = mailbox.listen(peer_pk.clone(), peer_addr.ip()).await; assert!(reservation.is_some()); let (peer_mailbox, mut peer_rx) = peer::Mailbox::new(NZUsize!(1)); let _ = mailbox.connect(peer_pk.clone(), peer_mailbox); // 3) Block it → should see exactly one Kill crate::block_peer(&mut oracle, peer_pk.clone()); context.sleep(Duration::from_millis(10)).await; assert!( matches!(peer_rx.next().await, Some(peer::Message::Kill)), "connected peer must be killed on first Block" ); // 4) Block again → mailbox was removed, so no new Kill crate::block_peer(&mut oracle, peer_pk.clone()); context.sleep(Duration::from_millis(10)).await; assert!( !matches!( peer_rx.next().now_or_never(), Some(Some(peer::Message::Kill)) ), "no kill after handle has been cleared" ); }); } #[test] fn test_register_disconnects_removed_peers() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (my_sk, my_pk) = new_signer_and_pk(0); let my_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9000); let pk_1 = new_signer_and_pk(1).1; let addr_1 = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 2).into(), 9001); let pk_2 = new_signer_and_pk(2).1; let addr_2 = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 9002); let (mut cfg, mut listener_receiver) = test_config(my_sk, false); cfg.tracked_peer_sets = NZUsize!(1); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg); // Register set with myself and one other peer oracle.track( 0, Map::<_, crate::Address>::try_from([ (my_pk.clone(), my_addr.into()), (pk_1.clone(), addr_1.into()), ]) .unwrap(), ); // let the register take effect context.sleep(Duration::from_millis(10)).await; // Wait for a listener update let registered_ips = listener_receiver.next().await.unwrap(); assert!(!registered_ips.contains(&my_addr.ip())); assert!(registered_ips.contains(&addr_1.ip())); assert!(!registered_ips.contains(&addr_2.ip())); // Mark peer as connected let reservation = mailbox.listen(pk_1.clone(), addr_1.ip()).await; assert!(reservation.is_some()); let (peer_mailbox, mut peer_rx) = peer::Mailbox::new(NZUsize!(1)); let _ = mailbox.connect(pk_1.clone(), peer_mailbox); // Register another set which doesn't include first peer oracle.track( 1, Map::<_, crate::Address>::try_from([(pk_2.clone(), addr_2.into())]).unwrap(), ); // Wait for a listener update let registered_ips = listener_receiver.next().await.unwrap(); assert!(!registered_ips.contains(&my_addr.ip())); assert!(!registered_ips.contains(&addr_1.ip())); assert!(registered_ips.contains(&addr_2.ip())); // The first peer should have received a kill message because its // peer set was removed when `tracked_peer_sets` is 1. assert!(matches!(peer_rx.next().await, Some(peer::Message::Kill)),) }); } #[test] fn test_register_keeps_connected_peer_present_across_rollover() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (my_sk, _) = new_signer_and_pk(0); let pk_1 = new_signer_and_pk(1).1; let addr_1 = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 2).into(), 9001); let pk_2 = new_signer_and_pk(2).1; let addr_2 = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 9002); let (mut cfg, mut listener_receiver) = test_config(my_sk, false); cfg.tracked_peer_sets = NZUsize!(1); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg); oracle.track( 0, Map::<_, crate::Address>::try_from([(pk_1.clone(), addr_1.into())]).unwrap(), ); let registered_ips = listener_receiver.next().await.unwrap(); assert!(registered_ips.contains(&addr_1.ip())); assert!(!registered_ips.contains(&addr_2.ip())); let reservation = mailbox .listen(pk_1.clone(), addr_1.ip()) .await .expect("peer should reserve"); let (peer_mailbox, mut peer_rx) = peer::Mailbox::new(NZUsize!(1)); let _ = mailbox.connect(pk_1.clone(), peer_mailbox); oracle.track( 1, Map::<_, crate::Address>::try_from([ (pk_1.clone(), addr_1.into()), (pk_2.clone(), addr_2.into()), ]) .unwrap(), ); let registered_ips = listener_receiver.next().await.unwrap(); assert!(registered_ips.contains(&addr_1.ip())); assert!(registered_ips.contains(&addr_2.ip())); assert!( !matches!(peer_rx.next().now_or_never(), Some(Some(peer::Message::Kill))), "connected peer present in the new set should not be killed when the old set rolls off" ); assert_eq!(reservation.metadata().public_key(), &pk_1); }); } #[test] fn test_reserved_removed_peer_rejected_on_connect() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (my_sk, _) = new_signer_and_pk(0); let pk_1 = new_signer_and_pk(1).1; let addr_1 = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 2).into(), 9001); let pk_2 = new_signer_and_pk(2).1; let addr_2 = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 9002); let (mut cfg, mut listener_receiver) = test_config(my_sk, false); cfg.tracked_peer_sets = NZUsize!(1); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg); oracle.track( 0, Map::<_, crate::Address>::try_from([(pk_1.clone(), addr_1.into())]).unwrap(), ); let registered_ips = listener_receiver.next().await.unwrap(); assert!(registered_ips.contains(&addr_1.ip())); assert!(!registered_ips.contains(&addr_2.ip())); let reservation = mailbox .listen(pk_1.clone(), addr_1.ip()) .await .expect("peer should reserve"); assert_eq!(reservation.metadata().public_key(), &pk_1); oracle.track( 1, Map::<_, crate::Address>::try_from([(pk_2.clone(), addr_2.into())]).unwrap(), ); let registered_ips = listener_receiver.next().await.unwrap(); assert!(!registered_ips.contains(&addr_1.ip())); assert!(registered_ips.contains(&addr_2.ip())); let (peer_mailbox, mut peer_rx) = peer::Mailbox::new(NZUsize!(1)); let _ = mailbox.connect(pk_1.clone(), peer_mailbox); assert!( matches!(peer_rx.next().await, Some(peer::Message::Kill)), "connect rejection is signaled by killing the peer" ); }); } #[test] fn test_reserved_peer_killed_on_connect_after_tracked_address_change() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (my_sk, _) = new_signer_and_pk(0); let pk = new_signer_and_pk(1).1; let addr_a = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1001); let addr_b = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 1002); let (mut cfg, mut listener_receiver) = test_config(my_sk, false); cfg.tracked_peer_sets = NZUsize!(2); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg); oracle.track( 0, Map::<_, crate::Address>::try_from([(pk.clone(), addr_a.into())]).unwrap(), ); let registered_ips = listener_receiver.next().await.unwrap(); assert!(registered_ips.contains(&addr_a.ip())); let (reservation, ingress) = mailbox.dial(pk.clone()).await.expect("peer should reserve"); assert_eq!(reservation.metadata().public_key(), &pk); assert_eq!(ingress, Ingress::Socket(addr_a)); oracle.track( 1, Map::<_, crate::Address>::try_from([(pk.clone(), addr_b.into())]).unwrap(), ); let registered_ips = listener_receiver.next().await.unwrap(); assert!(!registered_ips.contains(&addr_a.ip())); assert!(registered_ips.contains(&addr_b.ip())); let (peer_mailbox, mut peer_rx) = peer::Mailbox::new(NZUsize!(1)); let _ = mailbox.connect(pk.clone(), peer_mailbox); assert!( matches!(peer_rx.next().await, Some(peer::Message::Kill)), "connect rejection is signaled by killing the peer" ); }); } #[test] fn test_reserved_peer_killed_on_connect_after_overwrite() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (my_sk, _) = new_signer_and_pk(0); let pk = new_signer_and_pk(1).1; let addr_a = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1001); let addr_b = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 1002); let (cfg, mut listener_receiver) = test_config(my_sk, false); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg); oracle.track( 0, Map::<_, crate::Address>::try_from([(pk.clone(), addr_a.into())]).unwrap(), ); let registered_ips = listener_receiver.next().await.unwrap(); assert!(registered_ips.contains(&addr_a.ip())); let (reservation, ingress) = mailbox.dial(pk.clone()).await.expect("peer should reserve"); assert_eq!(reservation.metadata().public_key(), &pk); assert_eq!(ingress, Ingress::Socket(addr_a)); oracle.overwrite([(pk.clone(), addr_b.into())].try_into().unwrap()); let registered_ips = listener_receiver.next().await.unwrap(); assert!(!registered_ips.contains(&addr_a.ip())); assert!(registered_ips.contains(&addr_b.ip())); let (peer_mailbox, mut peer_rx) = peer::Mailbox::new(NZUsize!(1)); let _ = mailbox.connect(pk.clone(), peer_mailbox); assert!( matches!(peer_rx.next().await, Some(peer::Message::Kill)), "connect rejection is signaled by killing the peer" ); }); } #[test] fn test_stale_inbound_source_rejected_after_overwrite() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (my_sk, _) = new_signer_and_pk(0); let pk = new_signer_and_pk(1).1; let addr_a = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1001); let addr_b = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 1002); let (cfg, mut listener_receiver) = test_config(my_sk, false); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg); oracle.track( 0, Map::<_, crate::Address>::try_from([(pk.clone(), addr_a.into())]).unwrap(), ); let registered_ips = listener_receiver.next().await.unwrap(); assert!(registered_ips.contains(&addr_a.ip())); assert!(mailbox.acceptable(pk.clone(), addr_a.ip()).await); oracle.overwrite([(pk.clone(), addr_b.into())].try_into().unwrap()); let registered_ips = listener_receiver.next().await.unwrap(); assert!(!registered_ips.contains(&addr_a.ip())); assert!(registered_ips.contains(&addr_b.ip())); let reservation = mailbox.listen(pk.clone(), addr_a.ip()).await; assert!( reservation.is_none(), "inbound handshake from the old source IP must be rejected" ); let reservation = mailbox.listen(pk.clone(), addr_b.ip()).await; assert!(reservation.is_some()); }); } #[test] fn test_overwrite_triggers_listener() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (my_sk, my_pk) = new_signer_and_pk(0); let my_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9000); let pk_1 = new_signer_and_pk(1).1; let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 9001); let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 9002); let (cfg, mut listener_receiver) = test_config(my_sk, false); let TestHarness { mut oracle, .. } = setup_actor(context.child("actor"), cfg); oracle.track( 0, Map::<_, crate::Address>::try_from([ (my_pk.clone(), my_addr.into()), (pk_1.clone(), addr_1.into()), ]) .unwrap(), ); let registered_ips = listener_receiver.next().await.unwrap(); assert!(registered_ips.contains(&addr_1.ip())); assert!(!registered_ips.contains(&addr_2.ip())); oracle.overwrite([(pk_1.clone(), addr_2.into())].try_into().unwrap()); let registered_ips = listener_receiver.next().await.unwrap(); assert!(!registered_ips.contains(&addr_1.ip())); assert!(registered_ips.contains(&addr_2.ip())); }); } #[test] fn test_overwrite_via_oracle() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (cfg, _) = test_config(PrivateKey::from_seed(0), false); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg); let (_, pk) = new_signer_and_pk(1); let addr_1 = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1001); let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1002); oracle.track( 0, Map::<_, crate::Address>::try_from([(pk.clone(), addr_1.into())]).unwrap(), ); context.sleep(Duration::from_millis(10)).await; let result = mailbox.dial(pk.clone()).await; assert!(result.is_some()); let (_, ingress) = result.unwrap(); assert_eq!(ingress, Ingress::Socket(addr_1)); oracle.overwrite([(pk.clone(), addr_2.into())].try_into().unwrap()); context.sleep(Duration::from_millis(1010)).await; let result = mailbox.dial(pk.clone()).await; assert!(result.is_some()); let (_, ingress) = result.unwrap(); assert_eq!(ingress, Ingress::Socket(addr_2)); }); } #[test] fn test_overwrite_blocked_peer_not_in_listenable() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (my_sk, my_pk) = new_signer_and_pk(0); let my_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9000); let pk_1 = new_signer_and_pk(1).1; let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 9001); let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 9002); let (cfg, mut listener_receiver) = test_config(my_sk, false); let TestHarness { mut oracle, .. } = setup_actor(context.child("actor"), cfg); oracle.track( 0, Map::<_, crate::Address>::try_from([ (my_pk.clone(), my_addr.into()), (pk_1.clone(), addr_1.into()), ]) .unwrap(), ); let registered_ips = listener_receiver.next().await.unwrap(); assert!(registered_ips.contains(&addr_1.ip())); crate::block_peer(&mut oracle, pk_1.clone()); let registered_ips = listener_receiver.next().await.unwrap(); assert!(!registered_ips.contains(&addr_1.ip())); oracle.overwrite([(pk_1.clone(), addr_2.into())].try_into().unwrap()); let registered_ips = listener_receiver.next().await.unwrap(); assert!(!registered_ips.contains(&addr_1.ip())); assert!(!registered_ips.contains(&addr_2.ip())); }); } #[test] fn test_overwrite_untracked_peer_silently_ignored() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (cfg, _) = test_config(PrivateKey::from_seed(0), false); let TestHarness { mut oracle, .. } = setup_actor(context.child("actor"), cfg); let (_, pk) = new_signer_and_pk(1); let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1001); // Peer not in the directory is silently skipped (no error, no effect) oracle.overwrite([(pk, addr.into())].try_into().unwrap()); }); } #[test] fn test_overwrite_changes_acceptable_ip() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let pk_1 = new_signer_and_pk(1).1; let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 9001); let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 9002); let (cfg, _) = test_config(PrivateKey::from_seed(0), false); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg); oracle.track( 0, Map::<_, crate::Address>::try_from([(pk_1.clone(), addr_1.into())]).unwrap(), ); context.sleep(Duration::from_millis(10)).await; assert!(mailbox.acceptable(pk_1.clone(), addr_1.ip()).await); assert!(!mailbox.acceptable(pk_1.clone(), addr_2.ip()).await); oracle.overwrite([(pk_1.clone(), addr_2.into())].try_into().unwrap()); assert!(!mailbox.acceptable(pk_1.clone(), addr_1.ip()).await); assert!(mailbox.acceptable(pk_1.clone(), addr_2.ip()).await); }); } #[test] fn test_overwrite_severs_existing_connection() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (cfg, _) = test_config(PrivateKey::from_seed(0), false); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg); let (_, pk) = new_signer_and_pk(1); let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1001); let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 1002); oracle.track( 0, Map::<_, crate::Address>::try_from([(pk.clone(), addr_1.into())]).unwrap(), ); context.sleep(Duration::from_millis(10)).await; // Establish connection let reservation = mailbox.listen(pk.clone(), addr_1.ip()).await; assert!(reservation.is_some()); let (peer_mailbox, mut peer_rx) = peer::Mailbox::new(NZUsize!(1)); let _ = mailbox.connect(pk.clone(), peer_mailbox); // Update address - should kill the connection oracle.overwrite([(pk.clone(), addr_2.into())].try_into().unwrap()); // Peer should receive kill message assert!(matches!(peer_rx.next().await, Some(peer::Message::Kill))); }); } #[test] fn test_add_set_severs_connection_on_address_change() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (cfg, mut listener_receiver) = test_config(PrivateKey::from_seed(0), false); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg); let (_, pk) = new_signer_and_pk(1); let addr_a = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1001); let addr_b = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 1002); // Register peer set with peer at address A oracle.track( 0, Map::<_, crate::Address>::try_from([(pk.clone(), addr_a.into())]).unwrap(), ); let registered_ips = listener_receiver.next().await.unwrap(); assert!(registered_ips.contains(&addr_a.ip())); // Establish connection to peer let reservation = mailbox.listen(pk.clone(), addr_a.ip()).await; assert!(reservation.is_some()); let (peer_mailbox, mut peer_rx) = peer::Mailbox::new(NZUsize!(1)); let _ = mailbox.connect(pk.clone(), peer_mailbox); // Register new peer set with same peer at address B oracle.track( 1, Map::<_, crate::Address>::try_from([(pk.clone(), addr_b.into())]).unwrap(), ); // Peer should receive Kill message (connection severed due to address change) assert!(matches!(peer_rx.next().await, Some(peer::Message::Kill))); // Verify listenable IPs updated to new address let registered_ips = listener_receiver.next().await.unwrap(); assert!(!registered_ips.contains(&addr_a.ip())); assert!(registered_ips.contains(&addr_b.ip())); }); } #[test] fn test_overwrite_batch_mixed_peers() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let (cfg, mut listener_receiver) = test_config(PrivateKey::from_seed(0), false); let TestHarness { mailbox, mut oracle, .. } = setup_actor(context.child("actor"), cfg); let (_, pk_tracked) = new_signer_and_pk(1); let (_, pk_unchanged) = new_signer_and_pk(2); let (_, pk_untracked) = new_signer_and_pk(3); let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1001); let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(9, 9, 9, 9)), 1002); let addr_unchanged = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 10, 10, 10)), 1003); // Register some peers oracle.track( 0, Map::<_, crate::Address>::try_from([ (pk_tracked.clone(), addr_1.into()), (pk_unchanged.clone(), addr_unchanged.into()), ]) .unwrap(), ); let _ = listener_receiver.next().await.unwrap(); // Establish connection to pk_tracked let reservation = mailbox.listen(pk_tracked.clone(), addr_1.ip()).await; assert!(reservation.is_some()); let (tracked_mailbox, mut tracked_rx) = peer::Mailbox::new(NZUsize!(1)); let _ = mailbox.connect(pk_tracked.clone(), tracked_mailbox); // Establish connection to pk_unchanged let reservation = mailbox .listen(pk_unchanged.clone(), addr_unchanged.ip()) .await; assert!(reservation.is_some()); let (unchanged_mailbox, mut unchanged_rx) = peer::Mailbox::new(NZUsize!(1)); let _ = mailbox.connect(pk_unchanged.clone(), unchanged_mailbox); // Call overwrite with mix of tracked+changed, tracked+unchanged, and unknown peers oracle.overwrite( [ (pk_tracked.clone(), addr_2.into()), (pk_unchanged.clone(), addr_unchanged.into()), (pk_untracked.clone(), addr_1.into()), ] .try_into() .unwrap(), ); // Only tracked+changed peer (pk_tracked) gets killed assert!(matches!(tracked_rx.next().await, Some(peer::Message::Kill))); // Unchanged peer should NOT receive kill - verify the receiver has no pending messages. assert!( !matches!( unchanged_rx.next().now_or_never(), Some(Some(peer::Message::Kill)) ), "Unchanged peer should not receive kill" ); }); } }