//! Speculative execution engine for the [`Stateful`](super::Stateful) actor. //! //! The [`Processor`] owns the in-memory pending-tip DAG and the committed //! database set. It is the workhorse behind the actor's `Processing` mode, //! handling three operations: //! //! - Propose/Verify: fork unmerkleized batches from a parent's pending //! state (or from committed state), delegate to the [`Application`], and //! cache the resulting merkleized batches keyed by block digest. //! //! - Lazy recovery: when a parent's pending state is missing (e.g. after //! restart), [`Processor::rebuild_pending`] walks the block DAG backward //! via marshal to the nearest known anchor, then replays //! forward via [`Application::apply`], inserting each intermediate result //! into the pending map. //! //! - Finalization: apply the winning fork's merkleized batches to the //! committed databases, then prune all pending entries at or below the //! finalized round. //! //! All propose/verify paths are cancellation-aware: if the caller drops the //! response channel, in-progress work stops at the next await point via //! [`await_or_cancel`]. use crate::stateful::{ db::{Anchor, DatabaseSet}, Application, Proposed, }; use commonware_consensus::{ marshal::{ ancestry::BlockProvider, core::{Mailbox as MarshalMailbox, Variant as MarshalVariant}, Identifier, }, types::{Height, Round}, Block, CertifiableBlock, Heightable, Roundable, }; use commonware_cryptography::{certificate::Scheme, Digestible}; use commonware_macros::select; use commonware_runtime::{telemetry::metrics::GaugeExt, Clock, Metrics, Spawner}; use commonware_utils::channel::{fallible::OneshotExt, oneshot}; use futures::{stream, Stream, StreamExt}; use rand::Rng; use std::{ collections::{BTreeMap, HashSet, VecDeque}, future::Future, }; use tracing::{debug, warn}; mod metrics; pub(crate) use metrics::Metrics as ProcessorMetrics; type PendingDigest = <>::Block as Digestible>::Digest; type PendingBatches = <>::Databases as DatabaseSet>::Merkleized; type PendingMap = BTreeMap, PendingEntry>; /// Cached speculative state for a block digest. struct PendingEntry where E: Rng + Spawner + Metrics + Clock, A: Application, { round: Round, parent: PendingDigest, merkleized: PendingBatches, } /// Errors while preparing parent-relative batches for propose/verify. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub(super) enum PrepareBatchesError { /// Parent ancestry is provably invalid. Invalid, /// Parent ancestry ended before validity could be proven. Incomplete, /// Caller dropped the response while waiting. Cancelled, } /// Finalization result for a finalized block report. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub(super) enum FinalizeStatus { /// The finalized digest was already processed. Duplicate, /// The finalized state was persisted and in-memory forks were pruned. Persisted { height: Height }, } /// Owns speculative execution and state persistence for a running stateful actor. pub(super) struct Processor where E: Rng + Spawner + Metrics + Clock, A: Application, { app: A, databases: A::Databases, pending: PendingMap, last_processed: Anchor>, metrics: ProcessorMetrics, } impl Processor where E: Rng + Spawner + Metrics + Clock, A: Application, { /// Create a new processor with the given application, databases, and /// the last finalized block's anchor. pub(super) const fn new( app: A, databases: A::Databases, last_processed: Anchor>, metrics: ProcessorMetrics, ) -> Self { Self { app, databases, pending: BTreeMap::new(), last_processed, metrics, } } /// Returns a reference to the database set. pub(super) const fn databases(&self) -> &A::Databases { &self.databases } /// Prepare parent-relative batches and delegate to the application to /// build a new block proposal. The resulting block and its merkleized /// state are cached in `pending`. Sends `None` on `response` if the /// ancestry is invalid or the application declines to propose. pub(super) async fn propose( &mut self, context: &E, marshal: MarshalMailbox, (runtime_context, consensus_context): (E, A::Context), ancestry: impl Stream + Send + 'static, input_provider: &mut A::InputProvider, mut response: oneshot::Sender>, ) where S: Scheme, V: MarshalVariant, MarshalMailbox: BlockProvider, { let timer = self.metrics.propose_duration.timer(context); let mut ancestry = Box::pin(ancestry); let parent = match next_or_cancel(&mut response, &mut ancestry).await { Some(Some(parent)) => parent, Some(None) => { response.send_lossy(None); return; } None => { debug!("proposal request cancelled before initial ancestry arrived"); return; } }; let parent_digest = parent.digest(); let ancestry = stream::once(std::future::ready(parent.clone())).chain(ancestry); let round = consensus_context.round(); let batches = match self .prepare_batches(context, marshal, parent, &mut response) .await { Ok(batches) => batches, Err(PrepareBatchesError::Invalid) => { response.send_lossy(None); return; } Err(PrepareBatchesError::Incomplete) => { debug!( ?parent_digest, "proposal request waiting on incomplete ancestry during prepare_batches" ); response.closed().await; return; } Err(PrepareBatchesError::Cancelled) => { debug!( ?parent_digest, "proposal request cancelled during prepare_batches" ); return; } }; let proposed = match await_or_cancel( &mut response, self.app.propose( (runtime_context, consensus_context), ancestry, batches, input_provider, ), ) .await { Some(result) => result, None => { debug!(?parent_digest, "proposal request cancelled during propose"); return; } }; let Some(Proposed { block, merkleized }) = proposed else { response.send_lossy(None); return; }; assert!( A::Databases::matches_sync_targets(&merkleized, &A::sync_targets(&block)), "proposed state must match block commitments", ); self.cache_pending(block.digest(), parent_digest, round, merkleized); let _ = self.metrics.pending_blocks.try_set(self.pending.len()); timer.observe(context); response.send_lossy(Some(block)); } /// Prepare parent-relative batches and delegate to the application to /// verify a received block. On success the block's merkleized state is /// cached in `pending` and `true` is sent on `response`. pub(super) async fn verify( &mut self, context: &E, marshal: MarshalMailbox, (runtime_context, consensus_context): (E, A::Context), ancestry: impl Stream + Send + 'static, mut response: oneshot::Sender, ) where S: Scheme, V: MarshalVariant, MarshalMailbox: BlockProvider, { let timer = self.metrics.verify_duration.timer(context); let mut ancestry = Box::pin(ancestry); let block = match next_or_cancel(&mut response, &mut ancestry).await { Some(Some(block)) => block, Some(None) => { debug!("verification request waiting on incomplete block ancestry"); response.closed().await; return; } None => { debug!("verification request cancelled before initial block arrived"); return; } }; let block_digest = block.digest(); // If the block has already been executed, don't execute again. if self.pending.contains_key(&block_digest) { timer.observe(context); response.send_lossy(true); return; } // The voter may ask us to verify blocks that are at or below the // already-processed height. This happens because marshal/state sync and // simplex advance on different message streams. // // Re-execution is impossible because databases already contain state at // or beyond that height, but we still need to prove the block matches // the canonical finalized chain before short-circuiting. // // `last_processed.height` is only advanced from finalized state // (genesis, startup reconciliation, or finalize/ack path). match is_already_processed(self.last_processed, marshal.clone(), &block, &mut response) .await { Ok(true) => { timer.observe(context); response.send_lossy(true); return; } Ok(false) => { if block.height() <= self.last_processed.height { response.send_lossy(false); return; } } Err(PrepareBatchesError::Cancelled) => { debug!( ?block_digest, "verification request cancelled during processed-block check" ); return; } Err(PrepareBatchesError::Incomplete) => { debug!( ?block_digest, "verification request waiting on incomplete processed-block ancestry" ); response.closed().await; return; } Err(PrepareBatchesError::Invalid) => { unreachable!("processed-block check cannot return Invalid") } } let round = consensus_context.round(); let parent = match next_or_cancel(&mut response, &mut ancestry).await { Some(Some(parent)) => parent, Some(None) => { debug!( ?block_digest, "verification request waiting on incomplete parent ancestry" ); response.closed().await; return; } None => { debug!( ?block_digest, "verification request cancelled before parent ancestry arrived" ); return; } }; let parent_digest = parent.digest(); let batches = match self .prepare_batches(context, marshal, parent.clone(), &mut response) .await { Ok(batches) => batches, Err(PrepareBatchesError::Invalid) => { warn!( ?parent_digest, ?block_digest, pending_keys = self.pending.len(), last_processed = ?self.last_processed.digest, "verification rejected: prepare_batches returned Invalid" ); response.send_lossy(false); return; } Err(PrepareBatchesError::Incomplete) => { debug!( ?parent_digest, ?block_digest, "verification request waiting on incomplete ancestry during prepare_batches" ); response.closed().await; return; } Err(PrepareBatchesError::Cancelled) => { debug!( ?parent_digest, "verification request cancelled during prepare_batches" ); return; } }; let ancestry = stream::iter([block.clone(), parent]).chain(ancestry); let verified = match await_or_cancel( &mut response, self.app .verify((runtime_context, consensus_context), ancestry, batches), ) .await { Some(result) => result, None => { debug!( ?parent_digest, "verification request cancelled during verify" ); return; } }; let Some(merkleized) = verified else { warn!( ?parent_digest, ?block_digest, "verification rejected: app.verify returned None" ); response.send_lossy(false); return; }; if !A::Databases::matches_sync_targets(&merkleized, &A::sync_targets(&block)) { warn!( ?parent_digest, ?block_digest, "verification rejected: verified state must match block commitments" ); response.send_lossy(false); return; } self.cache_pending(block_digest, parent_digest, round, merkleized); let _ = self.metrics.pending_blocks.try_set(self.pending.len()); timer.observe(context); response.send_lossy(true); } /// Ensure parent state exists, then prepare unmerkleized batches for execution. pub(super) async fn prepare_batches( &mut self, context: &E, marshal: MarshalMailbox, parent: A::Block, response: &mut oneshot::Sender, ) -> Result<>::Unmerkleized, PrepareBatchesError> where S: Scheme, V: MarshalVariant, MarshalMailbox: BlockProvider, { let parent_digest = parent.digest(); // Rebuild pending state if no pending state exists for the parent and the // parent is not the processed tip. if self.last_processed.digest != parent_digest && !self.pending.contains_key(&parent_digest) { self.rebuild_pending(context, marshal, parent, response) .await?; } await_or_cancel(response, self.fork_batches(&parent_digest)) .await .unwrap_or(Err(PrepareBatchesError::Cancelled)) } /// Fork unmerkleized batches from known parent state. pub(super) async fn fork_batches( &mut self, parent: &::Digest, ) -> Result<>::Unmerkleized, PrepareBatchesError> { if let Some(entry) = self.pending.get(parent) { return Ok(>::fork_batches( &entry.merkleized, )); } if &self.last_processed.digest == parent { return Ok(self.databases.new_batches().await); } Err(PrepareBatchesError::Invalid) } /// Rebuild missing pending ancestry up to `target` lazily from a block provider. pub(super) async fn rebuild_pending( &mut self, context: &E, provider: P, target: A::Block, response: &mut oneshot::Sender, ) -> Result<(), PrepareBatchesError> where P: BlockProvider + Clone, { let timer = self.metrics.rebuild_pending_duration.timer(context); let target_digest = target.digest(); // Walk backward until we hit a known safe anchor. let mut replay_path = Vec::new(); let mut cursor = target; while cursor.digest() != self.last_processed.digest && !self.pending.contains_key(&cursor.digest()) { let Some(parent) = await_or_cancel(response, provider.clone().subscribe_parent(&cursor)).await else { return Err(PrepareBatchesError::Cancelled); }; let Some(parent) = parent else { debug!( ?target_digest, cursor = ?cursor.digest(), "ancestor subscription ended before delivery" ); return Err(PrepareBatchesError::Incomplete); }; let cursor_height = cursor.height(); if parent.digest() != cursor.parent() || parent.height().next() != cursor_height { warn!( ?target_digest, cursor = ?cursor.digest(), parent = ?parent.digest(), cursor_height = cursor_height.get(), parent_height = parent.height().get(), expected_parent = ?cursor.parent(), "rebuild_pending received non-contiguous ancestry" ); return Err(PrepareBatchesError::Invalid); } if cursor_height <= self.last_processed.height { warn!( ?target_digest, cursor = ?cursor.digest(), current_height = cursor_height.get(), last_processed_height = self.last_processed.height.get(), last_processed = ?self.last_processed.digest, "rebuild_pending reached stale ancestry below processed height" ); return Err(PrepareBatchesError::Invalid); } // By definition, there are no blocks below height 0. if cursor_height.previous().is_none() { warn!( ?target_digest, cursor = ?cursor.digest(), reached_height = %cursor_height, last_processed = ?self.last_processed.digest, pending_keys = self.pending.len(), "rebuild reached ancestry boundary without known anchor" ); return Err(PrepareBatchesError::Invalid); } replay_path.push(cursor); cursor = parent; } let depth = replay_path.len(); // Replay from oldest to newest and cache intermediate tips. for block in replay_path.into_iter().rev() { let (digest, parent_digest) = (block.digest(), block.parent()); let consensus_context = block.context(); let round = consensus_context.round(); let Some(batches) = await_or_cancel(response, self.fork_batches(&parent_digest)).await else { return Err(PrepareBatchesError::Cancelled); }; let batches = batches.expect("rebuild replay parent must be available"); let Some(merkleized) = await_or_cancel( response, self.app.apply( (context.child("rebuild_pending_apply"), consensus_context), &block, batches, ), ) .await else { return Err(PrepareBatchesError::Cancelled); }; if !A::Databases::matches_sync_targets(&merkleized, &A::sync_targets(&block)) { warn!( ?target_digest, block = ?digest, "rebuild replay state root must match block commitments" ); return Err(PrepareBatchesError::Invalid); } self.cache_pending(digest, parent_digest, round, merkleized); } let _ = self.metrics.pending_blocks.try_set(self.pending.len()); let _ = self.metrics.rebuild_pending_depth.try_set(depth); timer.observe(context); Ok(()) } /// Persist finalized state and prune dead in-memory forks. pub(super) async fn finalize(&mut self, context: &E, block: A::Block) -> FinalizeStatus { let (height, digest) = (block.height(), block.digest()); if height < self.last_processed.height { panic!( "received finalized block below processed height: finalized={} processed={}", height.get(), self.last_processed.height.get(), ); } if height == self.last_processed.height { assert_eq!( digest, self.last_processed.digest, "received conflicting finalized block at processed height", ); return FinalizeStatus::Duplicate; } let timer = self.metrics.finalize_duration.timer(context); let block_context = block.context(); let round = block_context.round(); // Marshal finalization is ordered. A pending miss means we can replay // this block on top of finalized state. // // Safety contract: replayed `Application::apply` output must match the // block commitments previously enforced by `Application::verify`. let batch = match self.pending.remove(&digest) { Some(entry) => entry.merkleized, None => { let batches = self.databases.new_batches().await; let batch = self .app .apply( (context.child("finalize_replay"), block_context), &block, batches, ) .await; assert!( A::Databases::matches_sync_targets(&batch, &A::sync_targets(&block)), "finalize replay state root must match block commitments", ); batch } }; self.databases.finalize(batch).await; self.app .finalized( (context.child("finalized"), block.context()), &block, &self.databases, ) .await; self.prune_pending_after_finalize(&digest, round); self.last_processed = Anchor { height, round, digest, }; timer.observe(context); FinalizeStatus::Persisted { height } } /// Remove pending state that is not compatible with the finalized winner. /// /// A pending block is kept only when: /// - it is a descendant of `finalized_digest`, and /// - it was created after `finalized_round`. fn prune_pending_after_finalize( &mut self, finalized_digest: &::Digest, finalized_round: Round, ) { let mut children_by_parent = BTreeMap::new(); for (candidate_digest, entry) in &self.pending { children_by_parent .entry(entry.parent) .or_insert_with(Vec::new) .push(*candidate_digest); } let mut compatible = HashSet::new(); compatible.insert(*finalized_digest); let mut to_visit = VecDeque::new(); to_visit.push_back(*finalized_digest); while let Some(parent) = to_visit.pop_front() { let Some(children) = children_by_parent.get(&parent) else { continue; }; for &child in children { if compatible.insert(child) { to_visit.push_back(child); } } } let before = self.pending.len(); self.pending.retain(|candidate_digest, entry| { entry.round > finalized_round && compatible.contains(candidate_digest) }); let pruned = before - self.pending.len(); self.metrics.pruned_forks.inc_by(pruned as u64); let _ = self.metrics.pending_blocks.try_set(self.pending.len()); } /// Cache merkleized pending state for a block digest. fn cache_pending( &mut self, digest: PendingDigest, parent: PendingDigest, round: Round, merkleized: PendingBatches, ) { if let Some(existing) = self.pending.get(&digest) { debug_assert_eq!(existing.parent, parent, "pending parent changed for digest"); debug_assert_eq!(existing.round, round, "pending round changed for digest"); return; } self.pending.insert( digest, PendingEntry { round, parent, merkleized, }, ); } } /// Returns true when `block` is already covered by committed state. async fn is_already_processed( last_processed: Anchor<::Digest>, marshal: MarshalMailbox, block: &V::ApplicationBlock, response: &mut oneshot::Sender, ) -> Result where S: Scheme, V: MarshalVariant, V::ApplicationBlock: Block + Clone, { let target_height = block.height(); if target_height > last_processed.height { return Ok(false); } if target_height == last_processed.height { return Ok(block.digest() == last_processed.digest); } let Some(canonical) = await_or_cancel( response, marshal.get_block(Identifier::Height(target_height)), ) .await else { return Err(PrepareBatchesError::Cancelled); }; let Some(canonical) = canonical else { warn!( target_height = target_height.get(), processed_height = last_processed.height.get(), "failed to fetch canonical processed block for stale-block check" ); return Err(PrepareBatchesError::Incomplete); }; Ok(canonical.digest() == block.digest()) } /// Read the next ancestry item unless the response receiver is dropped. pub(super) async fn next_or_cancel( response: &mut oneshot::Sender, stream: &mut S, ) -> Option> where S: Stream + Unpin, { await_or_cancel(response, stream.next()).await } /// Wait for `future` unless the response receiver is dropped. pub(super) async fn await_or_cancel( response: &mut oneshot::Sender, future: F, ) -> Option where F: Future, { select! { _ = response.closed() => None, output = future => Some(output), } } #[cfg(test)] mod tests { use super::{await_or_cancel, next_or_cancel, FinalizeStatus, PrepareBatchesError, Processor}; use crate::stateful::{ actor::processor::ProcessorMetrics, db::{Anchor, DatabaseSet, Merkleized as _, Unmerkleized as _}, Application, Proposed, }; use commonware_codec::{Encode, EncodeSize, Error as CodecError, Read, ReadExt as _, Write}; use commonware_consensus::{ marshal::ancestry::BlockProvider, simplex::{mocks::scheme::Scheme as MockScheme, types::Context as ConsensusContext}, types::{Epoch, Height, Round, View}, Block as ConsensusBlock, CertifiableBlock, Heightable, Roundable, }; use commonware_cryptography::{ ed25519, sha256::Digest, Digest as _, Digestible, Hasher, Sha256, Signer as _, }; use commonware_parallel::Sequential; use commonware_runtime::{ buffer::paged::CacheRef, deterministic, ContextCell, Runner as _, Supervisor as _, }; use commonware_storage::{ journal::contiguous::fixed::Config as FixedLogConfig, mmr::{self, full::Config as MmrJournalConfig, Location}, qmdb::{any, sync::Target}, translator::TwoCap, }; use commonware_utils::{ channel::oneshot, non_empty_range, range::NonEmptyRange, sync::{AsyncRwLock, Mutex}, NZUsize, NZU16, NZU64, }; use futures::{Stream, StreamExt}; use std::{ collections::{BTreeMap, VecDeque}, future::Future, num::NonZeroUsize, sync::{ atomic::{AtomicUsize, Ordering}, Arc, }, }; type TestContext = ConsensusContext; const PAGE_SIZE: std::num::NonZeroU16 = NZU16!(1024); const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(8); const IO_BUFFER_SIZE: NonZeroUsize = NZUsize!(2048); type Qmdb = any::unordered::fixed::Db; type DbSet = Arc>>; #[derive(Clone, Debug, PartialEq, Eq)] struct Block { context: TestContext, parent: Digest, height: Height, state_root: Digest, range: NonEmptyRange, } impl Write for Block { fn write(&self, buf: &mut impl commonware_runtime::BufMut) { self.context.write(buf); self.parent.write(buf); self.height.write(buf); self.state_root.write(buf); self.range.write(buf); } } impl EncodeSize for Block { fn encode_size(&self) -> usize { self.context.encode_size() + self.parent.encode_size() + self.height.encode_size() + self.state_root.encode_size() + self.range.encode_size() } } impl Read for Block { type Cfg = (); fn read_cfg( buf: &mut impl commonware_runtime::Buf, _: &Self::Cfg, ) -> Result { Ok(Self { context: TestContext::read(buf)?, parent: Digest::read(buf)?, height: Height::read(buf)?, state_root: Digest::read(buf)?, range: commonware_utils::range::NonEmptyRange::read(buf)?, }) } } impl Digestible for Block { type Digest = Digest; fn digest(&self) -> Digest { Sha256::hash(&self.encode()) } } impl Heightable for Block { fn height(&self) -> Height { self.height } } impl ConsensusBlock for Block { fn parent(&self) -> Digest { self.parent } } impl CertifiableBlock for Block { type Context = TestContext; fn context(&self) -> Self::Context { self.context.clone() } } impl Block { fn genesis() -> Self { Self { context: consensus_context(Digest::EMPTY, View::zero()), parent: Digest::EMPTY, height: Height::zero(), state_root: Digest::EMPTY, range: non_empty_range!(Location::new(0), Location::new(1)), } } } fn consensus_context(parent: Digest, view: View) -> TestContext { TestContext { round: Round::new(Epoch::zero(), view), leader: ed25519::PrivateKey::from_seed(0).public_key(), parent: ( if view.is_zero() { View::zero() } else { View::new(view.get() - 1) }, parent, ), } } fn u64_to_digest(value: u64) -> Digest { let mut bytes = [0u8; 32]; bytes[..8].copy_from_slice(&value.to_be_bytes()); Digest::from(bytes) } fn digest_to_u64(value: &Digest) -> u64 { let bytes: &[u8] = value.as_ref(); u64::from_be_bytes( bytes[..8] .try_into() .expect("digest prefix should be 8 bytes"), ) } fn height_key(height: Height) -> Digest { Sha256::hash(&height.get().to_be_bytes()) } fn counter_key() -> Digest { Sha256::hash(b"processor_harness_counter") } #[derive(Clone)] struct FinalizedObserver { db_config: any::FixedConfig, reopened_counters: Arc>>, } #[derive(Clone)] struct ExecutionApp { genesis: Block, finalized_observer: Option, } impl ExecutionApp { fn new() -> Self { Self { genesis: Block::genesis(), finalized_observer: None, } } fn with_finalized_observer( db_config: any::FixedConfig, ) -> (Self, Arc>>) { let reopened_counters = Arc::new(Mutex::new(Vec::new())); let observer = FinalizedObserver { db_config, reopened_counters: reopened_counters.clone(), }; ( Self { genesis: Block::genesis(), finalized_observer: Some(observer), }, reopened_counters, ) } async fn execute( height: Height, view: View, mut batches: as DatabaseSet>::Unmerkleized, ) -> as DatabaseSet>::Merkleized { let current_counter = batches .get(&counter_key()) .await .expect("counter read should succeed") .map_or(0, |digest| digest_to_u64(&digest)); batches = batches.write(counter_key(), Some(u64_to_digest(current_counter + 1))); batches = batches.write(height_key(height), Some(u64_to_digest(view.get()))); batches.merkleize().await.expect("merkleize should succeed") } } impl Application for ExecutionApp { type SigningScheme = MockScheme; type Context = TestContext; type Block = Block; type Databases = DbSet; type InputProvider = (); async fn genesis(&mut self) -> Self::Block { self.genesis.clone() } async fn propose( &mut self, context: (deterministic::Context, Self::Context), ancestry: impl Stream + Send, batches: >::Unmerkleized, _input: &mut Self::InputProvider, ) -> Option> { let mut ancestry = Box::pin(ancestry); let parent = ancestry.next().await?; let context = context.1.clone(); let view = context.round.view(); let height = parent.height().next(); let merkleized = Self::execute(height, view, batches).await; let block = Block { context, parent: parent.digest(), height, state_root: merkleized.root(), range: non_empty_range!( merkleized.bounds().inactivity_floor, Location::new(merkleized.bounds().total_size) ), }; Some(Proposed { block, merkleized }) } async fn verify( &mut self, _context: (deterministic::Context, Self::Context), ancestry: impl Stream + Send, batches: >::Unmerkleized, ) -> Option<>::Merkleized> { let mut ancestry = Box::pin(ancestry); let block = ancestry.next().await?; let merkleized = Self::execute(block.height(), block.context.round.view(), batches).await; if merkleized.root() != block.state_root { return None; } Some(merkleized) } async fn apply( &mut self, _context: (deterministic::Context, Self::Context), block: &Self::Block, batches: >::Unmerkleized, ) -> >::Merkleized { Self::execute(block.height(), block.context.round.view(), batches).await } fn sync_targets( block: &Self::Block, ) -> >::SyncTargets { Target::new(block.state_root, block.range.clone()) } async fn finalized( &mut self, context: (deterministic::Context, Self::Context), _block: &Self::Block, _databases: &Self::Databases, ) { let Some(observer) = &self.finalized_observer else { return; }; let reopened = Qmdb::init( context.0.child("finalized_observer_reopen"), observer.db_config.clone(), ) .await .expect("database reopen inside finalized hook should succeed"); let counter = reopened .get(&counter_key()) .await .expect("reopened counter read should succeed") .map(|value| digest_to_u64(&value)) .unwrap_or(0); observer.reopened_counters.lock().push(counter); } } #[derive(Clone, Default)] struct MapProvider { blocks: Arc>>, fetches: Arc, } impl MapProvider { fn insert(&self, block: Block) { self.blocks.lock().insert(block.digest(), block); } fn fetch_by_digest(&self, digest: Digest) -> Option { self.fetches.fetch_add(1, Ordering::SeqCst); self.blocks.lock().get(&digest).cloned() } fn fetches(&self) -> usize { self.fetches.load(Ordering::SeqCst) } } impl BlockProvider for MapProvider { type Block = Block; fn subscribe_parent( &self, block: &Self::Block, ) -> impl Future> + Send + 'static { let provider = self.clone(); let parent = block.parent(); async move { provider.fetch_by_digest(parent) } } } #[derive(Clone, Default)] struct ScriptedParentProvider { responses: Arc>>>>, fetches: Arc, } impl ScriptedParentProvider { fn push(&self, child: &Block, responses: impl IntoIterator>) { self.responses .lock() .insert(child.digest(), responses.into_iter().collect()); } fn fetches(&self) -> usize { self.fetches.load(Ordering::SeqCst) } } impl BlockProvider for ScriptedParentProvider { type Block = Block; fn subscribe_parent( &self, block: &Self::Block, ) -> impl Future> + Send + 'static { let provider = self.clone(); let child = block.digest(); async move { provider.fetches.fetch_add(1, Ordering::SeqCst); provider .responses .lock() .get_mut(&child) .and_then(VecDeque::pop_front) .flatten() } } } struct Harness { context_cell: ContextCell, processor: Processor, provider: MapProvider, db_config: any::FixedConfig, finalized_reopened_counters: Option>>>, } impl Harness { async fn new(context: deterministic::Context) -> Self { let provider = MapProvider::default(); let config = qmdb_config(&next_partition_prefix(), &context); Self::with_app(context, provider, config.clone(), ExecutionApp::new(), None).await } async fn new_with_finalized_observer(context: deterministic::Context) -> Self { let provider = MapProvider::default(); let config = qmdb_config(&next_partition_prefix(), &context); let (app, finalized_reopened_counters) = ExecutionApp::with_finalized_observer(config.clone()); Self::with_app( context, provider, config, app, Some(finalized_reopened_counters), ) .await } async fn with_app( context: deterministic::Context, provider: MapProvider, config: any::FixedConfig, app: ExecutionApp, finalized_reopened_counters: Option>>>, ) -> Self { let databases = as DatabaseSet< deterministic::Context, >>::init(context.child("db_set"), config.clone()) .await; let metrics = ProcessorMetrics::new(context.child("processor_metrics")); Self { context_cell: ContextCell::new(context), processor: Processor::new( app, databases, Anchor { height: Height::zero(), round: Block::genesis().context().round, digest: Block::genesis().digest(), }, metrics, ), provider, db_config: config, finalized_reopened_counters, } } async fn stage_pending_child(&mut self, parent: &Block, view: View) -> Block { let context = consensus_context(parent.digest(), view); let height = Height::new(parent.height().get() + 1); let batches = self .processor .fork_batches(&parent.digest()) .await .expect("parent should be available"); let merkleized = ExecutionApp::execute(height, view, batches).await; let block = Block { context, parent: parent.digest(), height, state_root: merkleized.root(), range: non_empty_range!( merkleized.bounds().inactivity_floor, Location::new(merkleized.bounds().total_size) ), }; let round = Round::new(Epoch::zero(), view); self.processor .cache_pending(block.digest(), parent.digest(), round, merkleized); self.provider.insert(block.clone()); block } async fn rebuild_pending( &mut self, target: Digest, response: &mut oneshot::Sender, ) -> Result<(), PrepareBatchesError> { let mut replay_path = Vec::new(); let mut cursor = target; while cursor != self.processor.last_processed.digest && !self.processor.pending.contains_key(&cursor) { let Some(block) = await_or_cancel(response, async { self.provider.fetch_by_digest(cursor) }) .await else { return Err(PrepareBatchesError::Cancelled); }; let Some(block) = block else { continue; }; if block.height() <= self.processor.last_processed.height { return Err(PrepareBatchesError::Invalid); } if block.height().previous().is_none() { return Err(PrepareBatchesError::Invalid); } cursor = block.parent(); replay_path.push(block); } for block in replay_path.into_iter().rev() { let (digest, parent_digest) = (block.digest(), block.parent()); let consensus_context = block.context(); let round = consensus_context.round(); let batches = self .processor .fork_batches(&parent_digest) .await .expect("rebuild replay parent must be available"); let merkleized = self .processor .app .apply( ( self.context_cell .as_present() .child("rebuild_pending_apply"), consensus_context, ), &block, batches, ) .await; if !DbSet::::matches_sync_targets( &merkleized, &ExecutionApp::sync_targets(&block), ) { return Err(PrepareBatchesError::Invalid); } self.processor .cache_pending(digest, parent_digest, round, merkleized); } Ok(()) } fn is_canonical_processed(&self, block: &Block) -> bool { let target_height = block.height(); if target_height > self.processor.last_processed.height { return false; } if target_height == self.processor.last_processed.height { return block.digest() == self.processor.last_processed.digest; } let mut cursor = self.processor.last_processed.digest; while let Some(canonical) = self.provider.fetch_by_digest(cursor) { let canonical_height = canonical.height(); if canonical_height == target_height { return canonical.digest() == block.digest(); } if canonical_height < target_height { return false; } cursor = canonical.parent(); } false } async fn finalize(&mut self, block: Block) -> FinalizeStatus { self.processor .finalize(self.context_cell.as_present(), block) .await } async fn height_value(&self, height: Height) -> Option { let db = self.processor.databases.read().await; db.get(&height_key(height)) .await .expect("database read should succeed") .map(|value| digest_to_u64(&value)) } async fn counter_value(&self) -> Option { let db = self.processor.databases.read().await; db.get(&counter_key()) .await .expect("database read should succeed") .map(|value| digest_to_u64(&value)) } async fn reopen_height_value( &self, context: deterministic::Context, height: Height, ) -> Option { let reopened: Qmdb = Qmdb::init(context.child("reopen_db"), self.db_config.clone()) .await .expect("database reopen should succeed"); reopened .get(&height_key(height)) .await .expect("reopened db read should succeed") .map(|value| digest_to_u64(&value)) } fn finalized_reopened_counters(&self) -> Vec { self.finalized_reopened_counters .as_ref() .expect("finalized observer should be configured") .lock() .clone() } } fn next_partition_prefix() -> String { static NEXT_ID: AtomicUsize = AtomicUsize::new(0); let id = NEXT_ID.fetch_add(1, Ordering::SeqCst); format!("processor_harness_{id}") } fn qmdb_config( prefix: &str, context: &deterministic::Context, ) -> any::FixedConfig { let page_cache = CacheRef::from_pooler(context, PAGE_SIZE, PAGE_CACHE_SIZE); any::FixedConfig { merkle_config: MmrJournalConfig { journal_partition: format!("{prefix}_mmr_journal"), metadata_partition: format!("{prefix}_mmr_metadata"), items_per_blob: NZU64!(11), write_buffer: IO_BUFFER_SIZE, strategy: Sequential, page_cache: page_cache.clone(), }, journal_config: FixedLogConfig { partition: format!("{prefix}_log_journal"), items_per_blob: NZU64!(7), page_cache, write_buffer: IO_BUFFER_SIZE, }, translator: TwoCap, } } #[test] fn execution_finalization_prunes_losing_fork() { deterministic::Runner::default().start(|context| async move { let mut harness = Harness::new(context).await; let genesis = Block::genesis(); let block1 = harness.stage_pending_child(&genesis, View::new(1)).await; let winner = harness.stage_pending_child(&block1, View::new(3)).await; let loser = harness.stage_pending_child(&block1, View::new(2)).await; assert!(harness.processor.pending.contains_key(&winner.digest())); assert!(harness.processor.pending.contains_key(&loser.digest())); let status = harness.finalize(winner.clone()).await; assert_eq!( status, FinalizeStatus::Persisted { height: Height::new(2) }, "finalization should persist winner state", ); assert!( !harness.processor.pending.contains_key(&loser.digest()), "losing fork at finalized round should be pruned", ); assert_eq!(harness.processor.last_processed.digest, winner.digest()); assert_eq!(harness.height_value(Height::new(2)).await, Some(3)); }); } #[test] fn execution_finalization_prunes_losing_fork_descendants() { deterministic::Runner::default().start(|context| async move { let mut harness = Harness::new(context).await; let genesis = Block::genesis(); let block1 = harness.stage_pending_child(&genesis, View::new(1)).await; let loser = harness.stage_pending_child(&block1, View::new(2)).await; let winner = harness.stage_pending_child(&block1, View::new(3)).await; let loser_child = harness.stage_pending_child(&loser, View::new(4)).await; assert!(harness.processor.pending.contains_key(&winner.digest())); assert!(harness.processor.pending.contains_key(&loser.digest())); assert!(harness .processor .pending .contains_key(&loser_child.digest())); let status = harness.finalize(winner.clone()).await; assert_eq!( status, FinalizeStatus::Persisted { height: Height::new(2) }, "finalization should persist winner state", ); assert!( !harness.processor.pending.contains_key(&loser.digest()), "losing fork at finalized round should be pruned", ); assert!( !harness .processor .pending .contains_key(&loser_child.digest()), "descendants of the losing fork should also be pruned", ); }); } #[test] fn execution_rebuild_pending_restores_missing_chain() { deterministic::Runner::default().start(|context| async move { let mut harness = Harness::new(context).await; let genesis = Block::genesis(); let block1 = harness.stage_pending_child(&genesis, View::new(1)).await; let status = harness.finalize(block1.clone()).await; assert_eq!( status, FinalizeStatus::Persisted { height: Height::new(1) } ); let block2 = harness.stage_pending_child(&block1, View::new(2)).await; let block3 = harness.stage_pending_child(&block2, View::new(3)).await; harness.processor.pending.clear(); harness.provider.insert(block2.clone()); harness.provider.insert(block3.clone()); let (mut response, _rx) = oneshot::channel::(); let result = harness .rebuild_pending(block3.digest(), &mut response) .await; assert_eq!(result, Ok(()), "rebuild should succeed"); assert!( harness.processor.pending.contains_key(&block2.digest()), "first missing descendant should be reconstructed", ); assert!( harness.processor.pending.contains_key(&block3.digest()), "target block should be reconstructed", ); }); } #[test] fn execution_rebuild_pending_rejects_stale_ancestor_quickly() { deterministic::Runner::default().start(|context| async move { let mut harness = Harness::new(context).await; let genesis = Block::genesis(); let mut chain = Vec::new(); let mut parent = genesis; for view in 1..=5 { let block = harness.stage_pending_child(&parent, View::new(view)).await; let status = harness.finalize(block.clone()).await; assert_eq!( status, FinalizeStatus::Persisted { height: Height::new(view), } ); parent = block.clone(); chain.push(block); } harness.processor.pending.clear(); let stale_parent = chain[1].digest(); // height 2, below processed height 5 let fetches_before = harness.provider.fetches(); let (mut response, _rx) = oneshot::channel::(); let result = harness.rebuild_pending(stale_parent, &mut response).await; assert_eq!( result, Err(PrepareBatchesError::Invalid), "stale ancestry should be rejected", ); let fetches_after = harness.provider.fetches(); assert_eq!( fetches_after.saturating_sub(fetches_before), 1, "stale ancestry should be rejected after a single fetch", ); }); } #[test] fn execution_rebuild_pending_rejects_sync_target_mismatch_before_caching() { deterministic::Runner::default().start(|context| async move { let mut harness = Harness::new(context).await; let genesis = Block::genesis(); let block1 = harness.stage_pending_child(&genesis, View::new(1)).await; let status = harness.finalize(block1.clone()).await; assert_eq!( status, FinalizeStatus::Persisted { height: Height::new(1) } ); let mut block2 = harness.stage_pending_child(&block1, View::new(2)).await; harness.processor.pending.clear(); block2.range = non_empty_range!(Location::new(1), Location::new(2)); harness.provider.insert(block2.clone()); let (mut response, _rx) = oneshot::channel::(); let result = harness .rebuild_pending(block2.digest(), &mut response) .await; assert_eq!( result, Err(PrepareBatchesError::Invalid), "rebuild should reject a replayed batch whose sync target does not match the block", ); assert!( !harness.processor.pending.contains_key(&block2.digest()), "rejected replay must not be inserted into the pending cache", ); }); } #[test] fn execution_rebuild_pending_rejects_height_gap_to_processed_anchor() { deterministic::Runner::default().start(|context| async move { let mut harness = Harness::new(context.child("harness")).await; let genesis = Block::genesis(); let block1 = harness.stage_pending_child(&genesis, View::new(1)).await; let status = harness.finalize(block1.clone()).await; assert_eq!( status, FinalizeStatus::Persisted { height: Height::new(1) } ); let gap_height = Height::new(3); let gap_view = View::new(3); let batches = harness .processor .fork_batches(&block1.digest()) .await .expect("processed anchor should be available"); let merkleized = ExecutionApp::execute(gap_height, gap_view, batches).await; let gap_block = Block { context: consensus_context(block1.digest(), gap_view), parent: block1.digest(), height: gap_height, state_root: merkleized.root(), range: non_empty_range!( merkleized.bounds().inactivity_floor, Location::new(merkleized.bounds().total_size) ), }; let provider = ScriptedParentProvider::default(); provider.push(&gap_block, [Some(block1)]); let (mut response, _rx) = oneshot::channel::(); let result = harness .processor .rebuild_pending( harness.context_cell.as_present(), provider, gap_block.clone(), &mut response, ) .await; assert_eq!( result, Err(PrepareBatchesError::Invalid), "rebuild must reject non-contiguous ancestry above the processed anchor", ); assert!( !harness.processor.pending.contains_key(&gap_block.digest()), "height-gap block must not be cached as pending", ); }); } #[test] fn execution_verify_rejects_conflicting_stale_block() { deterministic::Runner::default().start(|context| async move { let mut harness = Harness::new(context).await; let genesis = Block::genesis(); let canonical = harness.stage_pending_child(&genesis, View::new(1)).await; let conflicting = harness.stage_pending_child(&genesis, View::new(2)).await; let status = harness.finalize(canonical).await; assert_eq!( status, FinalizeStatus::Persisted { height: Height::new(1), } ); assert!( !harness.is_canonical_processed(&conflicting), "conflicting stale block must not be accepted as already processed", ); }); } #[test] #[should_panic(expected = "received conflicting finalized block at processed height")] fn execution_finalize_panics_on_conflicting_duplicate_height() { deterministic::Runner::default().start(|context| async move { let mut harness = Harness::new(context).await; let genesis = Block::genesis(); let canonical = harness.stage_pending_child(&genesis, View::new(1)).await; let conflicting = harness.stage_pending_child(&genesis, View::new(2)).await; let status = harness.finalize(canonical).await; assert_eq!( status, FinalizeStatus::Persisted { height: Height::new(1), } ); let _ = harness.finalize(conflicting).await; }); } #[test] fn execution_finalization_persists_state_to_db() { deterministic::Runner::default().start(|context| async move { let mut harness = Harness::new(context.child("harness")).await; let genesis = Block::genesis(); let block1 = harness.stage_pending_child(&genesis, View::new(1)).await; let status = harness.finalize(block1).await; assert_eq!( status, FinalizeStatus::Persisted { height: Height::new(1) } ); assert_eq!(harness.counter_value().await, Some(1)); assert_eq!( harness .reopen_height_value(context.child("reopen"), Height::new(1)) .await, Some(1), "height state should survive reopen after finalization", ); }); } #[test] #[should_panic(expected = "finalize replay state root must match block commitments")] fn execution_finalize_replay_rejects_state_root_mismatch() { deterministic::Runner::default().start(|context| async move { let mut harness = Harness::new(context).await; let genesis = Block::genesis(); let mut block1 = harness.stage_pending_child(&genesis, View::new(1)).await; block1.state_root = u64_to_digest(999); harness.processor.pending.clear(); let _ = harness.finalize(block1.clone()).await; }); } #[test] fn execution_finalized_hook_runs_after_durable_finalize() { deterministic::Runner::default().start(|context| async move { let mut harness = Harness::new_with_finalized_observer(context).await; let genesis = Block::genesis(); let block1 = harness.stage_pending_child(&genesis, View::new(1)).await; let status = harness.finalize(block1).await; assert_eq!( status, FinalizeStatus::Persisted { height: Height::new(1) } ); assert_eq!( harness.finalized_reopened_counters(), vec![1], "finalized hook should observe the durably committed state", ); }); } #[test] fn initial_ancestry_read_cancels_when_response_dropped() { deterministic::Runner::default().start(|_context| async move { let (mut response, receiver) = oneshot::channel::(); let mut ancestry = Box::pin(futures::stream::pending::()); drop(receiver); assert_eq!(next_or_cancel(&mut response, &mut ancestry).await, None); }); } #[test] fn execution_rebuild_pending_returns_incomplete_when_parent_subscription_ends() { deterministic::Runner::default().start(|context| async move { let mut harness = Harness::new(context.child("harness")).await; let genesis = Block::genesis(); let block1 = harness.stage_pending_child(&genesis, View::new(1)).await; let status = harness.finalize(block1.clone()).await; assert_eq!( status, FinalizeStatus::Persisted { height: Height::new(1) } ); let block2 = harness.stage_pending_child(&block1, View::new(2)).await; harness.processor.pending.clear(); let provider = ScriptedParentProvider::default(); provider.push(&block2, [None]); let (mut response, _rx) = oneshot::channel::(); let result = harness .processor .rebuild_pending( harness.context_cell.as_present(), provider, block2, &mut response, ) .await; assert_eq!(result, Err(PrepareBatchesError::Incomplete)); }); } #[test] fn execution_rebuild_pending_does_not_retry_closed_provider_forever() { deterministic::Runner::default().start(|context| async move { let mut harness = Harness::new(context.child("harness")).await; let genesis = Block::genesis(); let block1 = harness.stage_pending_child(&genesis, View::new(1)).await; let status = harness.finalize(block1.clone()).await; assert_eq!( status, FinalizeStatus::Persisted { height: Height::new(1) } ); let block2 = harness.stage_pending_child(&block1, View::new(2)).await; harness.processor.pending.clear(); let provider = ScriptedParentProvider::default(); provider.push(&block2, [None, Some(block1.clone())]); let (mut response, _rx) = oneshot::channel::(); let result = harness .processor .rebuild_pending( harness.context_cell.as_present(), provider.clone(), block2, &mut response, ) .await; assert_eq!(result, Err(PrepareBatchesError::Incomplete)); assert_eq!( provider.fetches(), 1, "closed ancestry should not be retried" ); }); } }