>::fork_batches(
&entry.merkleized,
));
}
if &self.last_processed.digest == parent {
return Ok(self.databases.new_batches().await);
}
Err(PrepareBatchesError::Invalid)
}
/// Rebuild missing pending ancestry up to `target` lazily from a block provider.
pub(super) async fn rebuild_pending(
&mut self,
context: &E,
provider: P,
target: A::Block,
response: &mut oneshot::Sender,
) -> Result<(), PrepareBatchesError>
where
P: BlockProvider + Clone,
{
let timer = self.metrics.rebuild_pending_duration.timer(context);
let target_digest = target.digest();
// Walk backward until we hit a known safe anchor.
let mut replay_path = Vec::new();
let mut cursor = target;
while cursor.digest() != self.last_processed.digest
&& !self.pending.contains_key(&cursor.digest())
{
let Some(parent) =
await_or_cancel(response, provider.clone().subscribe_parent(&cursor)).await
else {
return Err(PrepareBatchesError::Cancelled);
};
let Some(parent) = parent else {
debug!(
?target_digest,
cursor = ?cursor.digest(),
"ancestor subscription ended before delivery"
);
return Err(PrepareBatchesError::Incomplete);
};
let cursor_height = cursor.height();
if parent.digest() != cursor.parent() || parent.height().next() != cursor_height {
warn!(
?target_digest,
cursor = ?cursor.digest(),
parent = ?parent.digest(),
cursor_height = cursor_height.get(),
parent_height = parent.height().get(),
expected_parent = ?cursor.parent(),
"rebuild_pending received non-contiguous ancestry"
);
return Err(PrepareBatchesError::Invalid);
}
if cursor_height <= self.last_processed.height {
warn!(
?target_digest,
cursor = ?cursor.digest(),
current_height = cursor_height.get(),
last_processed_height = self.last_processed.height.get(),
last_processed = ?self.last_processed.digest,
"rebuild_pending reached stale ancestry below processed height"
);
return Err(PrepareBatchesError::Invalid);
}
// By definition, there are no blocks below height 0.
if cursor_height.previous().is_none() {
warn!(
?target_digest,
cursor = ?cursor.digest(),
reached_height = %cursor_height,
last_processed = ?self.last_processed.digest,
pending_keys = self.pending.len(),
"rebuild reached ancestry boundary without known anchor"
);
return Err(PrepareBatchesError::Invalid);
}
replay_path.push(cursor);
cursor = parent;
}
let depth = replay_path.len();
// Replay from oldest to newest and cache intermediate tips.
for block in replay_path.into_iter().rev() {
let (digest, parent_digest) = (block.digest(), block.parent());
let consensus_context = block.context();
let round = consensus_context.round();
let Some(batches) = await_or_cancel(response, self.fork_batches(&parent_digest)).await
else {
return Err(PrepareBatchesError::Cancelled);
};
let batches = batches.expect("rebuild replay parent must be available");
let Some(merkleized) = await_or_cancel(
response,
self.app.apply(
(context.child("rebuild_pending_apply"), consensus_context),
&block,
batches,
),
)
.await
else {
return Err(PrepareBatchesError::Cancelled);
};
if !A::Databases::matches_sync_targets(&merkleized, &A::sync_targets(&block)) {
warn!(
?target_digest,
block = ?digest,
"rebuild replay state root must match block commitments"
);
return Err(PrepareBatchesError::Invalid);
}
self.cache_pending(digest, parent_digest, round, merkleized);
}
let _ = self.metrics.pending_blocks.try_set(self.pending.len());
let _ = self.metrics.rebuild_pending_depth.try_set(depth);
timer.observe(context);
Ok(())
}
/// Persist finalized state and prune dead in-memory forks.
pub(super) async fn finalize(&mut self, context: &E, block: A::Block) -> FinalizeStatus {
let (height, digest) = (block.height(), block.digest());
if height < self.last_processed.height {
panic!(
"received finalized block below processed height: finalized={} processed={}",
height.get(),
self.last_processed.height.get(),
);
}
if height == self.last_processed.height {
assert_eq!(
digest, self.last_processed.digest,
"received conflicting finalized block at processed height",
);
return FinalizeStatus::Duplicate;
}
let timer = self.metrics.finalize_duration.timer(context);
let block_context = block.context();
let round = block_context.round();
// Marshal finalization is ordered. A pending miss means we can replay
// this block on top of finalized state.
//
// Safety contract: replayed `Application::apply` output must match the
// block commitments previously enforced by `Application::verify`.
let batch = match self.pending.remove(&digest) {
Some(entry) => entry.merkleized,
None => {
let batches = self.databases.new_batches().await;
let batch = self
.app
.apply(
(context.child("finalize_replay"), block_context),
&block,
batches,
)
.await;
assert!(
A::Databases::matches_sync_targets(&batch, &A::sync_targets(&block)),
"finalize replay state root must match block commitments",
);
batch
}
};
self.databases.finalize(batch).await;
self.app
.finalized(
(context.child("finalized"), block.context()),
&block,
&self.databases,
)
.await;
self.prune_pending_after_finalize(&digest, round);
self.last_processed = Anchor {
height,
round,
digest,
};
timer.observe(context);
FinalizeStatus::Persisted { height }
}
/// Remove pending state that is not compatible with the finalized winner.
///
/// A pending block is kept only when:
/// - it is a descendant of `finalized_digest`, and
/// - it was created after `finalized_round`.
fn prune_pending_after_finalize(
&mut self,
finalized_digest: &::Digest,
finalized_round: Round,
) {
let mut children_by_parent = BTreeMap::new();
for (candidate_digest, entry) in &self.pending {
children_by_parent
.entry(entry.parent)
.or_insert_with(Vec::new)
.push(*candidate_digest);
}
let mut compatible = HashSet::new();
compatible.insert(*finalized_digest);
let mut to_visit = VecDeque::new();
to_visit.push_back(*finalized_digest);
while let Some(parent) = to_visit.pop_front() {
let Some(children) = children_by_parent.get(&parent) else {
continue;
};
for &child in children {
if compatible.insert(child) {
to_visit.push_back(child);
}
}
}
let before = self.pending.len();
self.pending.retain(|candidate_digest, entry| {
entry.round > finalized_round && compatible.contains(candidate_digest)
});
let pruned = before - self.pending.len();
self.metrics.pruned_forks.inc_by(pruned as u64);
let _ = self.metrics.pending_blocks.try_set(self.pending.len());
}
/// Cache merkleized pending state for a block digest.
fn cache_pending(
&mut self,
digest: PendingDigest,
parent: PendingDigest,
round: Round,
merkleized: PendingBatches,
) {
if let Some(existing) = self.pending.get(&digest) {
debug_assert_eq!(existing.parent, parent, "pending parent changed for digest");
debug_assert_eq!(existing.round, round, "pending round changed for digest");
return;
}
self.pending.insert(
digest,
PendingEntry {
round,
parent,
merkleized,
},
);
}
}
/// Returns true when `block` is already covered by committed state.
async fn is_already_processed(
last_processed: Anchor<::Digest>,
marshal: MarshalMailbox,
block: &V::ApplicationBlock,
response: &mut oneshot::Sender,
) -> Result
where
S: Scheme,
V: MarshalVariant,
V::ApplicationBlock: Block + Clone,
{
let target_height = block.height();
if target_height > last_processed.height {
return Ok(false);
}
if target_height == last_processed.height {
return Ok(block.digest() == last_processed.digest);
}
let Some(canonical) = await_or_cancel(
response,
marshal.get_block(Identifier::Height(target_height)),
)
.await
else {
return Err(PrepareBatchesError::Cancelled);
};
let Some(canonical) = canonical else {
warn!(
target_height = target_height.get(),
processed_height = last_processed.height.get(),
"failed to fetch canonical processed block for stale-block check"
);
return Err(PrepareBatchesError::Incomplete);
};
Ok(canonical.digest() == block.digest())
}
/// Read the next ancestry item unless the response receiver is dropped.
pub(super) async fn next_or_cancel(
response: &mut oneshot::Sender,
stream: &mut S,
) -> Option