use crate::{ simplex::{ signing_scheme::Scheme, types::{Activity, Finalization, Notarization}, }, types::Round, Block, Reporter, }; use commonware_cryptography::Digest; use commonware_storage::archive; use futures::{ channel::{mpsc, oneshot}, SinkExt, }; use tracing::error; /// An identifier for a block request. pub enum Identifier { /// The height of the block to retrieve. Height(u64), /// The commitment of the block to retrieve. Commitment(D), /// The highest finalized block. It may be the case that marshal does not have some of the /// blocks below this height. Latest, } // Allows using u64 directly for convenience. impl From for Identifier { fn from(src: u64) -> Self { Self::Height(src) } } // Allows using &Digest directly for convenience. impl From<&D> for Identifier { fn from(src: &D) -> Self { Self::Commitment(*src) } } // Allows using archive identifiers directly for convenience. impl From> for Identifier { fn from(src: archive::Identifier<'_, D>) -> Self { match src { archive::Identifier::Index(index) => Self::Height(index), archive::Identifier::Key(key) => Self::Commitment(*key), } } } /// Messages sent to the marshal [Actor](super::super::actor::Actor). /// /// These messages are sent from the consensus engine and other parts of the /// system to drive the state of the marshal. pub(crate) enum Message { // -------------------- Application Messages -------------------- /// A request to retrieve the (height, commitment) of a block by its identifier. /// The block must be finalized; returns `None` if the block is not finalized. GetInfo { /// The identifier of the block to get the information of. identifier: Identifier, /// A channel to send the retrieved (height, commitment). response: oneshot::Sender>, }, /// A request to retrieve a block by its identifier. /// /// Requesting by [Identifier::Height] or [Identifier::Latest] will only return finalized /// blocks, whereas requesting by commitment may return non-finalized or even unverified blocks. GetBlock { /// The identifier of the block to retrieve. identifier: Identifier, /// A channel to send the retrieved block. response: oneshot::Sender>, }, /// A request to retrieve a finalization by height. GetFinalization { /// The height of the finalization to retrieve. height: u64, /// A channel to send the retrieved finalization. response: oneshot::Sender>>, }, /// A request to retrieve a block by its commitment. Subscribe { /// The view in which the block was notarized. This is an optimization /// to help locate the block. round: Option, /// The commitment of the block to retrieve. commitment: B::Commitment, /// A channel to send the retrieved block. response: oneshot::Sender, }, /// A request to broadcast a block to all peers. Broadcast { /// The block to broadcast. block: B, }, /// A notification that a block has been verified by the application. Verified { /// The round in which the block was verified. round: Round, /// The verified block. block: B, }, // -------------------- Consensus Engine Messages -------------------- /// A notarization from the consensus engine. Notarization { /// The notarization. notarization: Notarization, }, /// A finalization from the consensus engine. Finalization { /// The finalization. finalization: Finalization, }, } /// A mailbox for sending messages to the marshal [Actor](super::super::actor::Actor). #[derive(Clone)] pub struct Mailbox { sender: mpsc::Sender>, } impl Mailbox { /// Creates a new mailbox. pub(crate) fn new(sender: mpsc::Sender>) -> Self { Self { sender } } /// A request to retrieve the information about the highest finalized block. pub async fn get_info( &mut self, identifier: impl Into>, ) -> Option<(u64, B::Commitment)> { let (tx, rx) = oneshot::channel(); if self .sender .send(Message::GetInfo { identifier: identifier.into(), response: tx, }) .await .is_err() { error!("failed to send get info message to actor: receiver dropped"); } match rx.await { Ok(result) => result, Err(_) => { error!("failed to get info: receiver dropped"); None } } } /// A best-effort attempt to retrieve a given block from local /// storage. It is not an indication to go fetch the block from the network. pub async fn get_block( &mut self, identifier: impl Into>, ) -> Option { let (tx, rx) = oneshot::channel(); if self .sender .send(Message::GetBlock { identifier: identifier.into(), response: tx, }) .await .is_err() { error!("failed to send get block message to actor: receiver dropped"); } match rx.await { Ok(result) => result, Err(_) => { error!("failed to get block: receiver dropped"); None } } } /// A best-effort attempt to retrieve a given [Finalization] from local /// storage. It is not an indication to go fetch the [Finalization] from the network. pub async fn get_finalization( &mut self, height: u64, ) -> Option> { let (tx, rx) = oneshot::channel(); if self .sender .send(Message::GetFinalization { height, response: tx, }) .await .is_err() { error!("failed to send get finalization message to actor: receiver dropped"); } match rx.await { Ok(result) => result, Err(_) => { error!("failed to get finalization: receiver dropped"); None } } } /// A request to retrieve a block by its commitment. /// /// If the block is found available locally, the block will be returned immediately. /// /// If the block is not available locally, the request will be registered and the caller will /// be notified when the block is available. If the block is not finalized, it's possible that /// it may never become available. /// /// The oneshot receiver should be dropped to cancel the subscription. pub async fn subscribe( &mut self, round: Option, commitment: B::Commitment, ) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); if self .sender .send(Message::Subscribe { round, commitment, response: tx, }) .await .is_err() { error!("failed to send subscribe message to actor: receiver dropped"); } rx } /// Broadcast indicates that a block should be sent to all peers. pub async fn broadcast(&mut self, block: B) { if self .sender .send(Message::Broadcast { block }) .await .is_err() { error!("failed to send broadcast message to actor: receiver dropped"); } } /// Notifies the actor that a block has been verified. pub async fn verified(&mut self, round: Round, block: B) { if self .sender .send(Message::Verified { round, block }) .await .is_err() { error!("failed to send verified message to actor: receiver dropped"); } } } impl Reporter for Mailbox { type Activity = Activity; async fn report(&mut self, activity: Self::Activity) { let message = match activity { Activity::Notarization(notarization) => Message::Notarization { notarization }, Activity::Finalization(finalization) => Message::Finalization { finalization }, _ => { // Ignore other activity types return; } }; if self.sender.send(message).await.is_err() { error!("failed to report activity to actor: receiver dropped"); } } }