//! Mailbox for the [`super::Stateful`] actor. use crate::stateful::Application; use commonware_actor::{ mailbox::{Overflow, Policy, Sender}, Feedback, }; use commonware_consensus::{marshal::Update, Application as ConsensusApplication, Reporter}; use commonware_runtime::{Clock, Metrics, Spawner}; use commonware_utils::{acknowledgement::Exact, channel::oneshot}; use futures::Stream; use rand::Rng; use std::{collections::VecDeque, pin::Pin}; /// Type alias for an ancestor stream sent through the actor mailbox. pub(crate) type ErasedAncestorStream = Pin + Send>>; /// Messages processed by the actor loop. pub(crate) enum Message where E: Rng + Spawner + Metrics + Clock, A: Application, { /// A request to propose a block. Propose { context: (E, A::Context), ancestry: ErasedAncestorStream, response: oneshot::Sender>, }, /// A request to verify a block. Verify { context: (E, A::Context), ancestry: ErasedAncestorStream, response: oneshot::Sender, }, /// A reporting of a new finalized block. Finalized { block: A::Block, acknowledgement: Exact, }, /// Requests the attached database set. /// /// The actor replies once the database set has been attached to the /// serving stateful actor, or immediately if that has already happened. SubscribeDatabases { response: oneshot::Sender, }, } impl Message where E: Rng + Spawner + Metrics + Clock, A: Application, { fn response_closed(&self) -> bool { match self { Self::Propose { response, .. } => response.is_closed(), Self::Verify { response, .. } => response.is_closed(), Self::SubscribeDatabases { response } => response.is_closed(), Self::Finalized { .. } => false, } } } pub(crate) struct Pending(VecDeque>) where E: Rng + Spawner + Metrics + Clock, A: Application; impl Default for Pending where E: Rng + Spawner + Metrics + Clock, A: Application, { fn default() -> Self { Self(VecDeque::new()) } } impl Overflow> for Pending where E: Rng + Spawner + Metrics + Clock, A: Application, { fn is_empty(&self) -> bool { self.0.is_empty() } fn drain(&mut self, mut push: F) where F: FnMut(Message) -> Option>, { while let Some(message) = self.0.pop_front() { if message.response_closed() { continue; } if let Some(message) = push(message) { self.0.push_front(message); break; } } } } impl Policy for Message where E: Rng + Spawner + Metrics + Clock, A: Application, { type Overflow = Pending; fn handle(overflow: &mut Self::Overflow, message: Self) { if message.response_closed() { return; } overflow.0.push_back(message); } } /// Channel-based proxy to the [`Stateful`](super::Stateful) actor. /// /// Implements the consensus application and verifying traits by forwarding /// each call to the actor via a message and awaiting the response. pub struct Mailbox where E: Rng + Spawner + Metrics + Clock, A: Application, { sender: Sender>, } impl Clone for Mailbox where E: Rng + Spawner + Metrics + Clock, A: Application, { fn clone(&self) -> Self { Self { sender: self.sender.clone(), } } } impl Mailbox where E: Rng + Spawner + Metrics + Clock, A: Application, { /// Create a mailbox from the send half of the actor's message channel. pub(crate) const fn new(sender: Sender>) -> Self { Self { sender } } } impl Mailbox where E: Rng + Spawner + Metrics + Clock, A: Application, { /// Wait for the attached database set. /// /// This resolves once startup handoff has attached the database set to the /// serving actor. Late callers receive the current database set /// immediately. pub async fn subscribe_databases(&self) -> A::Databases { let (response, receiver) = oneshot::channel(); let _ = self .sender .enqueue(Message::SubscribeDatabases { response }); receiver .await .expect("stateful actor dropped during subscribe_databases") } } impl ConsensusApplication for Mailbox where E: Rng + Spawner + Metrics + Clock, A: Application, { type SigningScheme = A::SigningScheme; type Context = A::Context; type Block = A::Block; async fn propose( &mut self, context: (E, Self::Context), ancestry: impl Stream + Send + 'static, ) -> Option { let (response, receiver) = oneshot::channel(); let _ = self.sender.enqueue(Message::Propose { context, ancestry: Box::pin(ancestry), response, }); receiver.await.ok().flatten() } async fn verify( &mut self, context: (E, Self::Context), ancestry: impl Stream + Send + 'static, ) -> bool { // We must panic if we don't get a response; We cannot override the decision // of the application based on the availabilitiy of the actor. let (response, receiver) = oneshot::channel(); let _ = self.sender.enqueue(Message::Verify { context, ancestry: Box::pin(ancestry), response, }); receiver .await .expect("stateful actor dropped during verify") } } impl Reporter for Mailbox where E: Rng + Spawner + Metrics + Clock, A: Application, { type Activity = Update; fn report(&mut self, activity: Self::Activity) -> Feedback { let message = match activity { Update::Tip(_, _, _) => return Feedback::Ok, Update::Block(block, acknowledgement) => Message::Finalized { block, acknowledgement, }, }; self.sender.enqueue(message) } }