use crate::{ingress, Fetch, Resolver, TargetedResolver}; use commonware_actor::{mailbox::Sender, Feedback}; use commonware_cryptography::PublicKey; use commonware_utils::{vec::NonEmptyVec, Span}; /// A key to fetch data for, optionally with target peers. pub type FetchKey = ingress::FetchKey>>; /// Messages that can be sent to the peer actor. pub type Message = ingress::Message>>; fn fetch_key(fetch: Fetch, targets: Option>) -> FetchKey { FetchKey { key: fetch.key, subscribers: NonEmptyVec::new(fetch.subscriber), metadata: targets, } } /// A way to send messages to the peer actor. #[derive(Clone)] pub struct Mailbox { /// The channel that delivers messages to the peer actor. sender: Sender>, } impl Mailbox { /// Create a new mailbox. pub(super) const fn new(sender: Sender>) -> Self { Self { sender } } } impl Resolver for Mailbox where K: Span, P: PublicKey, S: Clone + Eq + Send + 'static, { type Key = K; type Subscriber = S; /// Send a fetch to the peer actor. /// /// If a fetch is already in progress for this key, this clears any existing /// targets for that key (the fetch will try any available peer). /// /// If the engine has shut down, this is a no-op. fn fetch(&mut self, key: D) -> Feedback where D: Into> + Send, { let Fetch { key, subscriber } = key.into(); self.sender.enqueue(Message::Fetch(vec![FetchKey { key, subscribers: NonEmptyVec::new(subscriber), metadata: None, }])) } /// Send fetches to the peer actor for a batch of keys. /// /// If a fetch is already in progress for any key, this clears any existing /// targets for that key (the fetch will try any available peer). /// /// If the engine has shut down, this is a no-op. fn fetch_all(&mut self, keys: Vec) -> Feedback where D: Into> + Send, { self.sender.enqueue(Message::Fetch( keys.into_iter() .map(|key| fetch_key(key.into(), None)) .collect(), )) } /// Send a retain request to the peer actor. /// /// If the engine has shut down, this is a no-op. fn retain( &mut self, predicate: impl Fn(&Self::Key, &Self::Subscriber) -> bool + Send + 'static, ) -> Feedback { self.sender.enqueue(Message::Retain { predicate: Box::new(predicate), }) } } impl TargetedResolver for Mailbox where K: Span, P: PublicKey, S: Clone + Eq + Send + 'static, { type PublicKey = P; /// Send a targeted fetch to the peer actor. /// /// If a fetch is already in progress for this key: /// - If the existing fetch has targets, the new targets are added to the set. /// - If the existing fetch has no targets, it remains unrestricted. /// /// To clear targeting and fall back to any peer, call [`fetch`](Self::fetch). /// /// Targets are automatically cleared when the fetch succeeds or is canceled. /// When a peer is blocked for invalid data, only that peer is removed from /// the target set. /// /// If the engine has shut down, this is a no-op. fn fetch_targeted( &mut self, key: impl Into> + Send, targets: NonEmptyVec, ) -> Feedback { let Fetch { key, subscriber } = key.into(); self.sender.enqueue(Message::Fetch(vec![FetchKey { key, subscribers: NonEmptyVec::new(subscriber), metadata: Some(targets), }])) } /// Send targeted fetches to the peer actor for a batch of keys. /// /// If the engine has shut down, this is a no-op. fn fetch_all_targeted(&mut self, keys: Vec<(D, NonEmptyVec)>) -> Feedback where D: Into> + Send, { self.sender.enqueue(Message::Fetch( keys.into_iter() .map(|(key, targets)| fetch_key(key.into(), Some(targets))) .collect(), )) } } #[cfg(test)] mod tests { use super::*; use commonware_actor::mailbox::{Overflow, Policy}; type TestMessage = Message; type TestPending = ingress::Pending>>; fn fetch(key: u8, subscriber: u16, targets: Option>) -> TestMessage { Message::Fetch(vec![FetchKey { key, subscribers: NonEmptyVec::new(subscriber), metadata: targets, }]) } fn fetch_with_subscribers( key: u8, subscribers: Vec, targets: Option>, ) -> TestMessage { Message::Fetch(vec![FetchKey { key, subscribers: NonEmptyVec::from_unchecked(subscribers), metadata: targets, }]) } fn subscriber_is(value: u16) -> impl Fn(&u8, &u16) -> bool + Send { move |_, subscriber| *subscriber == value } fn targets(values: &[u8]) -> NonEmptyVec { NonEmptyVec::from_unchecked(values.to_vec()) } fn drain(pending: &mut TestPending) -> Vec { let mut messages = Vec::new(); Overflow::drain(pending, |message| { messages.push(message); None }); messages } fn assert_fetch(message: &TestMessage, expected_key: u8, expected_targets: Option<&[u8]>) { let Message::Fetch(keys) = message else { panic!("expected fetch"); }; assert_eq!(keys.len(), 1); assert_eq!(keys[0].key, expected_key); match (&keys[0].metadata, expected_targets) { (None, None) => {} (Some(actual), Some(expected)) => assert_eq!(&actual[..], expected), _ => panic!("unexpected targets"), } } fn assert_fetch_keys(message: &TestMessage, expected: &[u8]) { let Message::Fetch(keys) = message else { panic!("expected fetch"); }; let actual: Vec<_> = keys.iter().map(|key| key.key).collect(); assert_eq!(actual, expected); } fn assert_fetch_subscribers( message: &TestMessage, expected_key: u8, expected_subscribers: &[u16], ) { let Message::Fetch(keys) = message else { panic!("expected fetch"); }; assert_eq!(keys.len(), 1); assert_eq!(keys[0].key, expected_key); assert_eq!(&keys[0].subscribers[..], expected_subscribers); } #[test] fn targeted_fetches_for_same_key_are_merged() { let mut pending = TestPending::default(); Policy::handle(&mut pending, fetch(1, 10, Some(targets(&[2, 3])))); Policy::handle(&mut pending, fetch(1, 11, Some(targets(&[3, 4])))); let messages = drain(&mut pending); assert_eq!(messages.len(), 1); assert_fetch(&messages[0], 1, Some(&[2, 3, 4])); assert_fetch_subscribers(&messages[0], 1, &[10, 11]); } #[test] fn duplicate_fetches_for_same_key_merge_subscribers() { let mut pending = TestPending::default(); Policy::handle(&mut pending, fetch_with_subscribers(1, vec![10, 11], None)); Policy::handle(&mut pending, fetch_with_subscribers(1, vec![11, 12], None)); let messages = drain(&mut pending); assert_eq!(messages.len(), 1); assert_fetch_subscribers(&messages[0], 1, &[10, 11, 12]); } #[test] fn unrestricted_fetch_dominates_targeted_fetches() { let mut pending = TestPending::default(); Policy::handle(&mut pending, fetch(1, 10, Some(targets(&[2])))); Policy::handle(&mut pending, fetch(1, 11, None)); Policy::handle(&mut pending, fetch(1, 12, Some(targets(&[3])))); let messages = drain(&mut pending); assert_eq!(messages.len(), 1); assert_fetch(&messages[0], 1, None); } #[test] fn retain_removes_fetches_for_dropped_subscribers() { let mut pending = TestPending::default(); Policy::handle(&mut pending, fetch(1, 10, None)); Policy::handle(&mut pending, fetch(2, 11, None)); Policy::handle( &mut pending, Message::Retain { predicate: Box::new(subscriber_is(11)), }, ); let messages = drain(&mut pending); assert_eq!(messages.len(), 2); assert!(matches!(messages[0], Message::Retain { .. })); assert_fetch(&messages[1], 2, None); } #[test] fn retain_prunes_pending_fetch_subscribers() { let mut pending = TestPending::default(); Policy::handle(&mut pending, fetch_with_subscribers(1, vec![10, 11], None)); Policy::handle( &mut pending, Message::Retain { predicate: Box::new(subscriber_is(11)), }, ); let messages = drain(&mut pending); assert_eq!(messages.len(), 2); assert!(matches!(messages[0], Message::Retain { .. })); assert_fetch_subscribers(&messages[1], 1, &[11]); } #[test] fn retain_drops_pending_fetch_when_all_subscribers_are_dropped() { let mut pending = TestPending::default(); Policy::handle(&mut pending, fetch_with_subscribers(1, vec![10, 11], None)); Policy::handle( &mut pending, Message::Retain { predicate: Box::new(subscriber_is(12)), }, ); let messages = drain(&mut pending); assert_eq!(messages.len(), 1); assert!(matches!(messages[0], Message::Retain { .. })); } #[test] fn fetch_after_retain_is_retained_when_subscriber_is_dropped() { let mut pending = TestPending::default(); Policy::handle( &mut pending, Message::Retain { predicate: Box::new(|_, subscriber| *subscriber != 10), }, ); Policy::handle(&mut pending, fetch(1, 10, None)); Policy::handle(&mut pending, fetch(2, 11, None)); let messages = drain(&mut pending); assert_eq!(messages.len(), 2); assert!(matches!(messages[0], Message::Retain { .. })); assert_fetch_keys(&messages[1], &[1, 2]); } }