use super::Reservation; use crate::{ authenticated::{ dialing::Dialable, discovery::{ actors::{peer, tracker::Metadata}, types, }, }, PeerSetSubscription, TrackedPeers, }; use commonware_actor::{ mailbox::{self, Policy}, Feedback, }; use commonware_cryptography::PublicKey; use commonware_utils::channel::{mpsc, oneshot}; use std::collections::VecDeque; /// Messages that can be sent to the tracker actor. #[derive(Debug)] pub enum Message { // ---------- Used by oracle ---------- /// Register a peer set at a given index. Register { index: u64, peers: TrackedPeers }, // ---------- Used by peer set provider ---------- /// Fetch primary and secondary peers for a given ID. PeerSet { /// The index of the peer set to fetch. index: u64, /// One-shot channel to send the tracked peers. responder: oneshot::Sender>>, }, /// Subscribe to notifications when new peer sets are added. Subscribe { /// One-shot channel to send the subscription receiver. responder: oneshot::Sender>, }, // ---------- Used by blocker ---------- /// Block a peer, disconnecting them if currently connected and preventing future connections /// for as long as the peer remains in at least one active peer set. Block { public_key: C }, // ---------- Used by peer ---------- /// Notify the tracker that a peer has been successfully connected. /// /// The tracker responds with the greeting info that must be sent to the peer /// before any other messages. If the peer is not eligible, the channel is dropped /// (signaling termination). Connect { /// The public key of the peer. public_key: C, /// The mailbox of the peer actor. peer: peer::Mailbox, /// `true` if we are the dialer, `false` if we are the listener. dialer: bool, /// One-shot channel to return the greeting info. Dropped if peer is not eligible. responder: oneshot::Sender>, }, /// Ready to send a [types::Payload::BitVec] message to a peer. This message doubles as a /// keep-alive signal to the peer. /// /// This request is formed on a recurring interval. The tracker sends any response to the /// mailbox stored by the peer's most recent [`Message::Connect`]. Construct { /// The public key of the peer. public_key: C, }, /// Notify the tracker that a [types::Payload::BitVec] message has been received from a peer. /// /// The tracker will construct a [types::Payload::Peers] message in response and send it to the /// mailbox stored by the peer's most recent [`Message::Connect`]. BitVec { /// The public key of the peer that sent the bit vector. public_key: C, /// The bit vector received. bit_vec: types::BitVec, }, /// Notify the tracker that a [types::Payload::Peers] message has been received from a peer. Peers { /// The list of peers received. peers: Vec>, }, // ---------- Used by dialer ---------- /// Request a list of dialable peers. Dialable { /// One-shot channel to send the dialable peers and next query deadline. responder: oneshot::Sender>, }, /// Request a reservation for a particular peer to dial. /// /// The tracker will respond with an [`Option>`], which will be `None` if the /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already /// has an active reservation). Dial { /// The public key of the peer to reserve. public_key: C, /// sender to respond with the reservation. reservation: oneshot::Sender>>, }, // ---------- Used by listener ---------- /// Check if a peer is acceptable (can accept an incoming connection from them). Acceptable { /// The public key of the peer to check. public_key: C, /// The sender to respond with whether the peer is acceptable. responder: oneshot::Sender, }, /// Request a reservation for a particular peer. /// /// The tracker will respond with an [`Option>`], which will be `None` if the /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already /// has an active reservation). Listen { /// The public key of the peer to reserve. public_key: C, /// The sender to respond with the reservation. reservation: oneshot::Sender>>, }, // ---------- Used by reservation ---------- /// Release a reservation. Release { /// The metadata of the reservation to release. metadata: Metadata, }, } impl Policy for Message { type Overflow = VecDeque; fn handle(overflow: &mut Self::Overflow, message: Self) { overflow.push_back(message); } } /// Mailbox for sending messages to the tracker actor. #[derive(Clone, Debug)] pub struct Mailbox(mailbox::Sender>); impl Mailbox { pub(crate) const fn new(sender: mailbox::Sender>) -> Self { Self(sender) } /// Send a `Connect` message to the tracker and receive the greeting info. /// /// Returns `Some(info)` if the peer is eligible, `None` if the channel was /// dropped (peer not eligible or tracker shut down). pub(crate) async fn connect( &self, public_key: C, peer: peer::Mailbox, dialer: bool, ) -> Option> { let (responder, receiver) = oneshot::channel(); let _ = self.0.enqueue(Message::Connect { public_key, peer, dialer, responder, }); receiver.await.ok() } /// Send a `Construct` message to the tracker. pub(crate) fn construct(&self, public_key: C) -> Feedback { self.0.enqueue(Message::Construct { public_key }) } /// Send a `BitVec` message to the tracker. pub(crate) fn bit_vec(&self, public_key: C, bit_vec: types::BitVec) -> Feedback { self.0.enqueue(Message::BitVec { public_key, bit_vec, }) } /// Send a `Peers` message to the tracker. pub(crate) fn peers(&self, peers: Vec>) -> Feedback { self.0.enqueue(Message::Peers { peers }) } /// Request dialable peers from the tracker. /// /// Returns an empty response if the tracker is shut down. pub(crate) async fn dialable(&self) -> Dialable { let (responder, receiver) = oneshot::channel(); let _ = self.0.enqueue(Message::Dialable { responder }); receiver.await.unwrap_or_default() } /// Send a `Dial` message to the tracker. /// /// Returns `None` if the tracker is shut down. pub(crate) async fn dial(&self, public_key: C) -> Option> { let (reservation, receiver) = oneshot::channel(); let _ = self.0.enqueue(Message::Dial { public_key, reservation, }); receiver.await.ok().flatten() } /// Send an `Acceptable` message to the tracker. /// /// Returns `false` if the tracker is shut down. pub(crate) async fn acceptable(&self, public_key: C) -> bool { let (responder, receiver) = oneshot::channel(); let _ = self.0.enqueue(Message::Acceptable { public_key, responder, }); receiver.await.unwrap_or(false) } /// Send a `Listen` message to the tracker. /// /// Returns `None` if the tracker is shut down. pub(crate) async fn listen(&self, public_key: C) -> Option> { let (reservation, receiver) = oneshot::channel(); let _ = self.0.enqueue(Message::Listen { public_key, reservation, }); receiver.await.ok().flatten() } } /// Allows releasing reservations #[derive(Clone, Debug)] pub struct Releaser { sender: mailbox::Sender>, } impl Releaser { /// Create a new releaser. pub(crate) const fn new(sender: mailbox::Sender>) -> Self { Self { sender } } /// Release a reservation. pub fn release(&mut self, metadata: Metadata) -> Feedback { self.sender.enqueue(Message::Release { metadata }) } } /// Mechanism to register authorized peers. /// /// Peers that are not explicitly authorized /// will be blocked by commonware-p2p. #[derive(Debug, Clone)] pub struct Oracle { sender: mailbox::Sender>, } impl Oracle { pub(super) const fn new(sender: mailbox::Sender>) -> Self { Self { sender } } } impl crate::Provider for Oracle { type PublicKey = C; async fn peer_set(&mut self, id: u64) -> Option> { let (responder, receiver) = oneshot::channel(); let _ = self.sender.enqueue(Message::PeerSet { index: id, responder, }); receiver.await.ok().flatten() } async fn subscribe(&mut self) -> PeerSetSubscription { let (responder, receiver) = oneshot::channel(); let _ = self.sender.enqueue(Message::Subscribe { responder }); receiver.await.unwrap_or_else(|_| { let (_, rx) = mpsc::unbounded_channel(); rx }) } } impl crate::Manager for Oracle { fn track(&mut self, index: u64, peers: R) -> Feedback where R: Into> + Send, { self.sender.enqueue(Message::Register { index, peers: peers.into(), }) } } impl crate::Blocker for Oracle { type PublicKey = C; fn block(&mut self, public_key: Self::PublicKey) -> Feedback { self.sender.enqueue(Message::Block { public_key }) } }