use crate::stateful::{ actor::{ core::mailbox::Message, processor::{FinalizeStatus, Processor}, }, Application, }; use commonware_actor::mailbox as actor_mailbox; use commonware_consensus::{ marshal::{ ancestry::BlockProvider, core::{Mailbox as MarshalMailbox, Variant}, }, types::Height, Heightable, }; use commonware_cryptography::certificate::Scheme; use commonware_macros::select_loop; use commonware_runtime::{Clock, ContextCell, Metrics, Spawner}; use commonware_utils::{channel::fallible::OneshotExt, Acknowledgement}; use rand::Rng; use tracing::debug; pub(super) struct Processing where E: Rng + Spawner + Metrics + Clock, A: Application, S: Scheme, V: Variant, { /// Runtime context. pub(super) context: ContextCell, /// Actor ingress. pub(super) mailbox: actor_mailbox::Receiver>, /// Source of input (e.g. transactions) passed to the application on propose. pub(super) input_provider: A::InputProvider, /// Marshal mailbox used for lazy block lookup. pub(super) marshal: MarshalMailbox, /// State sync resolvers stay alive here so peers can keep syncing from us. #[expect( dead_code, reason = "processing keeps resolver handles alive for peer state sync" )] pub(super) resolvers: R, /// The processing state of the actor. pub(super) processor: Processor, /// Finalized marshal blocks at or below this height were already reflected /// in the selected database anchor and should be acknowledged only. pub(super) skip_finalized_until: Option, } impl Processing where E: Rng + Spawner + Metrics + Clock, A: Application, S: Scheme, V: Variant, MarshalMailbox: BlockProvider, { pub async fn start(mut self) { select_loop! { self.context, on_stopped => { debug!("processor received shutdown signal"); }, Some(message) = self.mailbox.recv() else { debug!("mailbox closed, shutting down processor"); break; } => match message { Message::Propose { context, ancestry, response, } => { self.processor .propose( self.context.as_present(), self.marshal.clone(), context, ancestry, &mut self.input_provider, response, ) .await; } Message::Verify { context, ancestry, response, } => { self.processor .verify( self.context.as_present(), self.marshal.clone(), context, ancestry, response, ) .await; } Message::Finalized { block, acknowledgement, } => { if skip_finalized_block(&mut self.skip_finalized_until, block.height()) { acknowledgement.acknowledge(); continue; } if let FinalizeStatus::Persisted { height } = self.processor.finalize(&self.context, block).await { debug!(height = height.get(), "persisted finalized database batch"); } acknowledgement.acknowledge(); } Message::SubscribeDatabases { response } => { response.send_lossy(self.processor.databases().clone()); } }, } } } fn skip_finalized_block(skip_until: &mut Option, height: Height) -> bool { let Some(target) = *skip_until else { return false; }; if height > target { *skip_until = None; return false; } if height == target { *skip_until = None; } true } #[cfg(test)] mod tests { use super::skip_finalized_block; use commonware_consensus::types::Height; #[test] fn skip_finalized_block_skips_through_target_height() { let mut skip_until = Some(Height::new(3)); assert!(skip_finalized_block(&mut skip_until, Height::new(1))); assert_eq!(skip_until, Some(Height::new(3))); assert!(skip_finalized_block(&mut skip_until, Height::new(3))); assert_eq!(skip_until, None); assert!(!skip_finalized_block(&mut skip_until, Height::new(4))); } #[test] fn skip_finalized_block_clears_stale_target() { let mut skip_until = Some(Height::new(3)); assert!(!skip_finalized_block(&mut skip_until, Height::new(4))); assert_eq!(skip_until, None); } }