use crate::Broadcaster; use commonware_codec::Codec; use commonware_cryptography::{Committable, Digestible, PublicKey}; use commonware_p2p::Recipients; use commonware_utils::channels::fallible::AsyncFallibleExt; use futures::channel::{mpsc, oneshot}; /// Message types that can be sent to the `Mailbox` pub enum Message { /// Broadcast a [crate::Broadcaster::Message] to the network. /// /// The responder will be sent a list of peers that received the message. Broadcast { recipients: Recipients

, message: M, responder: oneshot::Sender>, }, /// Subscribe to receive a message by digest. /// /// The responder will be sent the first message for an commitment when it is available; either /// instantly (if cached) or when it is received from the network. The request can be canceled /// by dropping the responder. Subscribe { peer: Option

, commitment: M::Commitment, digest: Option, responder: oneshot::Sender, }, /// Get all messages for an commitment. Get { peer: Option

, commitment: M::Commitment, digest: Option, responder: oneshot::Sender>, }, } /// Ingress mailbox for [super::Engine]. #[derive(Clone)] pub struct Mailbox { sender: mpsc::Sender>, } impl Mailbox { pub(super) const fn new(sender: mpsc::Sender>) -> Self { Self { sender } } /// Subscribe to a message by peer (optionally), commitment, and digest (optionally). /// /// The responder will be sent the first message for an commitment when it is available; either /// instantly (if cached) or when it is received from the network. The request can be canceled /// by dropping the responder. /// /// If the engine has shut down, the returned receiver will resolve to `Canceled`. pub async fn subscribe( &mut self, peer: Option

, commitment: M::Commitment, digest: Option, ) -> oneshot::Receiver { let (responder, receiver) = oneshot::channel(); self.sender .send_lossy(Message::Subscribe { peer, commitment, digest, responder, }) .await; receiver } /// Subscribe to a message by peer (optionally), commitment, and digest (optionally) with an /// externally prepared responder. /// /// The responder will be sent the first message for an commitment when it is available; either /// instantly (if cached) or when it is received from the network. The request can be canceled /// by dropping the responder. /// /// If the engine has shut down, this is a no-op. pub async fn subscribe_prepared( &mut self, peer: Option

, commitment: M::Commitment, digest: Option, responder: oneshot::Sender, ) { self.sender .send_lossy(Message::Subscribe { peer, commitment, digest, responder, }) .await; } /// Get all messages for an commitment. /// /// If the engine has shut down, returns an empty vector. pub async fn get( &mut self, peer: Option

, commitment: M::Commitment, digest: Option, ) -> Vec { self.sender .request(|responder| Message::Get { peer, commitment, digest, responder, }) .await .unwrap_or_default() } } impl Broadcaster for Mailbox { type Recipients = Recipients

; type Message = M; type Response = Vec

; /// Broadcast a message to recipients. /// /// If the engine has shut down, the returned receiver will resolve to `Canceled`. async fn broadcast( &mut self, recipients: Self::Recipients, message: Self::Message, ) -> oneshot::Receiver { let (sender, receiver) = oneshot::channel(); self.sender .send_lossy(Message::Broadcast { recipients, message, responder: sender, }) .await; receiver } }