use crate::Resolver; use commonware_cryptography::PublicKey; use commonware_utils::{vec::NonEmptyVec, Span}; use futures::{channel::mpsc, SinkExt}; type Predicate = Box bool + Send>; /// A request to fetch data for a key, optionally with target peers. pub struct FetchRequest { /// The key to fetch. pub key: K, /// Target peers to restrict the fetch to. /// /// - `None`: No targeting (or clear existing targeting), try any available peer /// - `Some(peers)`: Only try the specified peers pub targets: Option>, } /// Messages that can be sent to the peer actor. pub enum Message { /// Initiate fetch requests. Fetch(Vec>), /// Cancel a fetch request by key. Cancel { key: K }, /// Cancel all fetch requests. Clear, /// Cancel all fetch requests that do not satisfy the predicate. Retain { predicate: Predicate }, } /// A way to send messages to the peer actor. #[derive(Clone)] pub struct Mailbox { /// The channel that delivers messages to the peer actor. sender: mpsc::Sender>, } impl Mailbox { /// Create a new mailbox. pub(super) const fn new(sender: mpsc::Sender>) -> Self { Self { sender } } } impl Resolver for Mailbox { type Key = K; type PublicKey = P; /// Send a fetch request to the peer actor. /// /// If a fetch is already in progress for this key, this clears any existing /// targets for that key (the fetch will try any available peer). /// /// Panics if the send fails. async fn fetch(&mut self, key: Self::Key) { self.sender .send(Message::Fetch(vec![FetchRequest { key, targets: None }])) .await .expect("Failed to send fetch"); } /// Send a fetch request to the peer actor for a batch of keys. /// /// If a fetch is already in progress for any key, this clears any existing /// targets for that key (the fetch will try any available peer). /// /// Panics if the send fails. async fn fetch_all(&mut self, keys: Vec) { self.sender .send(Message::Fetch( keys.into_iter() .map(|key| FetchRequest { key, targets: None }) .collect(), )) .await .expect("Failed to send fetch_all"); } async fn fetch_targeted(&mut self, key: Self::Key, targets: NonEmptyVec) { self.sender .send(Message::Fetch(vec![FetchRequest { key, targets: Some(targets), }])) .await .expect("Failed to send fetch_targeted"); } async fn fetch_all_targeted( &mut self, requests: Vec<(Self::Key, NonEmptyVec)>, ) { self.sender .send(Message::Fetch( requests .into_iter() .map(|(key, targets)| FetchRequest { key, targets: Some(targets), }) .collect(), )) .await .expect("Failed to send fetch_all_targeted"); } /// Send a cancel request to the peer actor. /// /// Panics if the send fails. async fn cancel(&mut self, key: Self::Key) { self.sender .send(Message::Cancel { key }) .await .expect("Failed to send cancel_fetch"); } /// Send a cancel all request to the peer actor. /// /// Panics if the send fails. async fn retain(&mut self, predicate: impl Fn(&Self::Key) -> bool + Send + 'static) { self.sender .send(Message::Retain { predicate: Box::new(predicate), }) .await .expect("Failed to send retain"); } /// Send a clear request to the peer actor. /// /// Panics if the send fails. async fn clear(&mut self) { self.sender .send(Message::Clear) .await .expect("Failed to send cancel_all"); } }