use crate::Broadcaster; use commonware_actor::{ mailbox::{Overflow, Policy, Sender}, Feedback, }; use commonware_codec::Codec; use commonware_cryptography::{Digestible, PublicKey}; use commonware_p2p::Recipients; use commonware_utils::channel::oneshot; use std::collections::VecDeque; /// Message types that can be sent to the `Mailbox` pub(crate) enum Message { /// Broadcast a [crate::Broadcaster::Message] to the network. Broadcast { recipients: Recipients

, message: M, }, /// Subscribe to receive a message by digest. /// /// The responder will be sent the message when it is available; either /// instantly (if cached) or when it is received from the network. The request can be canceled /// by dropping the responder. Subscribe { digest: M::Digest, responder: oneshot::Sender, }, /// Get a message by digest. Get { digest: M::Digest, responder: oneshot::Sender>, }, } impl Message { fn response_closed(&self) -> bool { match self { Self::Subscribe { responder, .. } => responder.is_closed(), Self::Get { responder, .. } => responder.is_closed(), Self::Broadcast { .. } => false, } } } pub(crate) struct Pending(VecDeque>); impl Default for Pending { fn default() -> Self { Self(VecDeque::new()) } } impl Overflow> for Pending { fn is_empty(&self) -> bool { self.0.is_empty() } fn drain(&mut self, mut push: F) where F: FnMut(Message) -> Option>, { while let Some(message) = self.0.pop_front() { if message.response_closed() { continue; } if let Some(message) = push(message) { self.0.push_front(message); break; } } } } impl Policy for Message { type Overflow = Pending; fn handle(overflow: &mut Self::Overflow, message: Self) { if message.response_closed() { return; } overflow.0.push_back(message); } } /// Ingress mailbox for [super::Engine]. #[derive(Clone)] pub struct Mailbox { sender: Sender>, } impl Mailbox { pub(super) const fn new(sender: Sender>) -> Self { Self { sender } } /// Subscribe to a message by digest. /// /// The responder will be sent the message when it is available; either /// instantly (if cached) or when it is received from the network. The request can be canceled /// by dropping the responder. /// /// If the engine has shut down, the returned receiver will resolve to `Canceled`. pub fn subscribe(&self, digest: M::Digest) -> oneshot::Receiver { let (responder, receiver) = oneshot::channel(); let _ = self .sender .enqueue(Message::Subscribe { digest, responder }); receiver } /// Subscribe to a message by digest with an externally prepared responder. /// /// The responder will be sent the message when it is available; either /// instantly (if cached) or when it is received from the network. The request can be canceled /// by dropping the responder. /// /// If the engine has shut down, this is a no-op. pub fn subscribe_prepared(&self, digest: M::Digest, responder: oneshot::Sender) { let _ = self .sender .enqueue(Message::Subscribe { digest, responder }); } /// Get a message by digest. /// /// If the engine has shut down, returns `None`. pub async fn get(&self, digest: M::Digest) -> Option { let (responder, receiver) = oneshot::channel(); let _ = self.sender.enqueue(Message::Get { digest, responder }); receiver.await.unwrap_or_default() } } impl Broadcaster for Mailbox { type Recipients = Recipients

; type Message = M; /// Broadcast a message to recipients. /// /// If the engine has shut down, returns [`Feedback::Closed`]. fn broadcast(&self, recipients: Self::Recipients, message: Self::Message) -> Feedback { self.sender.enqueue(Message::Broadcast { recipients, message, }) } }