use crate::stateful::{
db::{Anchor, DatabaseSet},
Application,
};
use commonware_codec::{EncodeSize, Error, FixedSize, Read, ReadExt, Write};
use commonware_consensus::{
marshal::{
core::{CommitmentFallback, Mailbox as MarshalMailbox, Variant},
Identifier,
},
simplex::types::Finalization,
types::Height,
CertifiableBlock, Heightable, Roundable,
};
use commonware_cryptography::{certificate::Scheme, Digest, Digestible};
use commonware_runtime::{Buf, BufMut, Clock, Metrics, Spawner, Storage};
use commonware_storage::metadata::{self, Metadata};
use commonware_utils::{fixed_bytes, sequence::FixedBytes};
use rand::Rng;
mod actor;
pub(crate) use actor::{Config, Syncer};
mod mailbox;
pub(crate) use mailbox::Mailbox;
mod plan;
pub use plan::SyncPlan;
const SYNC_METADATA_SUFFIX: &str = "state_sync_metadata";
const SYNC_STATE_KEY: FixedBytes<1> = fixed_bytes!("C0");
type BlockDigest = <>::Block as Digestible>::Digest;
/// Durable identity for an in-progress state sync floor.
///
/// The height enforces monotonic restarts, and the commitment distinguishes
/// conflicting blocks at the same height.
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct FloorMarker
where
C: Digest,
{
height: Height,
commitment: C,
}
impl FloorMarker
where
C: Digest,
{
/// Constructs a durable floor marker from the resolved floor block.
pub(crate) const fn new(height: Height, commitment: C) -> Self {
Self { height, commitment }
}
/// Ensures a newly selected floor is compatible with this persisted one.
///
/// Restarts may resume from the same floor or advance to a newer one, but
/// must never move backward or switch to a different block at the same height.
pub(crate) fn ensure_not_behind(&self, selected: &Self) {
assert!(
selected.height >= self.height,
"selected state sync floor cannot move behind the persisted in-progress floor",
);
if selected.height == self.height {
assert!(
selected.commitment == self.commitment,
"selected state sync floor conflicts with the persisted in-progress floor",
);
}
}
}
impl Write for FloorMarker
where
C: Digest,
{
fn write(&self, writer: &mut impl BufMut) {
self.height.write(writer);
self.commitment.write(writer);
}
}
impl EncodeSize for FloorMarker
where
C: Digest,
{
fn encode_size(&self) -> usize {
self.height.encode_size() + self.commitment.encode_size()
}
}
impl Read for FloorMarker
where
C: Digest,
{
type Cfg = ();
fn read_cfg(reader: &mut impl Buf, _: &()) -> Result {
Ok(Self {
height: Height::read(reader)?,
commitment: C::read_cfg(reader, &())?,
})
}
}
/// Durable sync progress.
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum SyncState
where
C: Digest,
{
InProgress(FloorMarker),
Complete(Height),
}
impl SyncState
where
C: Digest,
{
/// Returns the completed state sync height, if state sync has finished.
pub(crate) const fn sync_height(&self) -> Option {
match self {
Self::InProgress(_) => None,
Self::Complete(height) => Some(*height),
}
}
}
impl Write for SyncState
where
C: Digest,
{
fn write(&self, writer: &mut impl BufMut) {
match self {
Self::InProgress(floor) => {
0u8.write(writer);
floor.write(writer);
}
Self::Complete(height) => {
1u8.write(writer);
height.write(writer);
}
}
}
}
impl EncodeSize for SyncState
where
C: Digest,
{
fn encode_size(&self) -> usize {
u8::SIZE
+ match self {
Self::InProgress(floor) => floor.encode_size(),
Self::Complete(height) => height.encode_size(),
}
}
}
impl Read for SyncState
where
C: Digest,
{
type Cfg = ();
fn read_cfg(reader: &mut impl Buf, _: &()) -> Result {
match u8::read(reader)? {
0 => Ok(Self::InProgress(FloorMarker::::read(reader)?)),
1 => Ok(Self::Complete(Height::read(reader)?)),
n => Err(Error::InvalidEnum(n)),
}
}
}
/// The result of a state sync operation.
pub struct SyncResult
where
E: Rng + Spawner + Metrics + Clock,
A: Application,
{
/// The database handle set.
pub databases: A::Databases,
/// The anchor at which state sync completed.
pub anchor: Anchor>,
}
impl Clone for SyncResult
where
E: Rng + Spawner + Metrics + Clock,
A: Application,
{
fn clone(&self) -> Self {
Self {
databases: self.databases.clone(),
anchor: self.anchor,
}
}
}
/// Resolved state sync floor data derived from the selected finalization.
pub(crate) struct ResolvedFloor
where
E: Rng + Spawner + Metrics + Clock,
A: Application,
C: Digest,
{
pub anchor: Anchor>,
pub targets: >::SyncTargets,
pub marker: FloorMarker,
}
/// Durable state-sync metadata.
pub(crate) struct StateSyncMetadata
where
E: Storage + Clock + Metrics,
C: Digest,
{
partition_prefix: String,
metadata: Metadata, SyncState>,
}
impl StateSyncMetadata
where
E: Storage + Clock + Metrics,
C: Digest,
{
/// Load the durable state-sync metadata partition, creating it if needed.
pub(crate) async fn init(context: &E, partition_prefix: impl AsRef) -> Self {
let partition_prefix = partition_prefix.as_ref().to_string();
let metadata = Metadata::init(
context.child("metadata"),
metadata::Config {
partition: format!("{partition_prefix}{SYNC_METADATA_SUFFIX}"),
codec_config: (),
},
)
.await
.expect("failed to load sync metadata");
Self {
partition_prefix,
metadata,
}
}
/// Returns the partition prefix for this state-sync metadata store.
pub(crate) const fn partition_prefix(&self) -> &str {
self.partition_prefix.as_str()
}
/// Returns the completed state sync height, if state sync has finished.
pub(crate) fn sync_height(&self) -> Option {
self.metadata
.get(&SYNC_STATE_KEY)
.map(SyncState::sync_height)
.unwrap_or_default()
}
/// Returns whether state sync is in progress.
pub(crate) fn in_progress(&self) -> bool {
matches!(
self.metadata.get(&SYNC_STATE_KEY),
Some(SyncState::InProgress(_))
)
}
/// Marks state sync as in progress for the resolved floor.
///
/// This must be persisted before any state sync database mutation begins so the database
/// sync engine can reopen partial sync state and validate the next selected floor after a crash.
///
/// If an interrupted state sync already stored a floor, the newly selected
/// floor must resume from that same floor or a later one.
pub(crate) async fn begin_sync(&mut self, floor: FloorMarker) {
match self.metadata.get(&SYNC_STATE_KEY) {
Some(SyncState::InProgress(existing)) => {
existing.ensure_not_behind(&floor);
}
Some(SyncState::Complete(_)) => {
panic!("completed state sync cannot be marked in-progress");
}
None => {}
}
self.metadata
.put_sync(SYNC_STATE_KEY, SyncState::InProgress(floor))
.await
.expect("failed to set state sync state to in-progress");
}
/// Records that one-time state sync completed at the given height.
///
/// Once this height is set, future startups skip peer state sync and initialize
/// from the later of this height and marshal's processed height instead. This
/// action is irreversible.
pub(crate) async fn set_complete(&mut self, height: Height) {
match self.metadata.get(&SYNC_STATE_KEY) {
Some(SyncState::InProgress(floor)) => {
assert!(
height >= floor.height,
"completed state sync height cannot be behind the in-progress floor",
);
}
Some(SyncState::Complete(existing)) => {
assert!(
height >= *existing,
"completed state sync height cannot move backward",
);
}
None => {}
}
self.metadata
.put_sync(SYNC_STATE_KEY, SyncState::::Complete(height))
.await
.expect("failed to set state sync state to complete");
}
}
/// Resolves the selected state sync floor into the anchor, targets, and
/// durable floor marker used by restart validation.
pub(crate) async fn resolve_state_sync_floor(
marshal: &MarshalMailbox,
finalization: &Finalization,
) -> ResolvedFloor
where
E: Rng + Spawner + Metrics + Clock,
A: Application,
S: Scheme,
V: Variant,
{
// Wait to retrieve the floor block from marshal. We use `Wait` here,
// since marshal triggers a fetch for the floor block if it is not
// already available.
let floor = {
let block = marshal
.subscribe_by_commitment(finalization.proposal.payload, CommitmentFallback::Wait)
.await
.expect("marshal must yield floor block");
V::into_inner(block)
};
ResolvedFloor {
anchor: Anchor::from(&floor),
targets: A::sync_targets(&floor),
marker: FloorMarker::new(floor.height(), finalization.proposal.payload),
}
}
/// The result of initializing state from marshal on startup.
pub(crate) struct StartupResult
where
E: Rng + Spawner + Metrics + Clock,
A: Application,
{
/// The initialized database set and anchor.
pub sync: SyncResult,
/// Finalized marshal blocks at or below this height are already reflected
/// in the initialized database set and should be acknowledged without
/// applying them again.
pub skip_finalized_until: Option,
}
/// Initializes databases at marshal's current startup anchor.
///
/// This initialization route is used when startup should recover from marshal
/// instead of running peer state sync. If marshal has not yet recorded a
/// processed height, this falls back to marshal's genesis block so fresh boots
/// and post-sync restarts share the same path.
///
/// If the databases are found to be inconsistent with the marshal floor, this
/// function will attempt to repair by rewinding the databases which are ahead. If the
/// databases are entirely inconsistent, this function will panic.
pub(crate) async fn init_databases_from_marshal(
context: &E,
marshal: &MarshalMailbox,
db_config: >::Config,
mut sync_metadata: StateSyncMetadata,
) -> StartupResult
where
E: Rng + Storage + Spawner + Clock + Metrics,
A: Application,
S: Scheme,
V: Variant,
{
let sync_height = sync_metadata.sync_height();
let processed_height = marshal.get_processed_height().await;
let skip_finalized_until = match (sync_height, processed_height) {
(Some(sync_height), Some(processed_height)) if processed_height < sync_height => {
Some(sync_height)
}
(Some(sync_height), None) => Some(sync_height),
_ => None,
};
let marshal_floor = sync_height
.into_iter()
.chain(processed_height)
.max()
.unwrap_or_else(Height::zero);
let floor_block = {
let marshal_block = marshal
.get_block(Identifier::Height(marshal_floor))
.await
.expect("marshal must return floor block");
V::into_inner(marshal_block)
};
let databases = A::Databases::init(context.child("db_set"), db_config).await;
let processed_targets = A::sync_targets(&floor_block);
// In the case that the committed targets do not match the marshal floor, we may
// have suffered a crash that left the set in an inconsistent state. In this case,
// we attempt to repair by rewinding the databases back to the marshal floor. If
// the rewind fails to produce a consistent state, we must crash. This can occur
// if the databases were corrupted or pruned to aggressively.
if databases.committed_targets().await != processed_targets {
databases.rewind_to_targets(processed_targets.clone()).await;
let rewound_targets = databases.committed_targets().await;
assert!(
rewound_targets == processed_targets,
"databases must be consistent with marshal floor after rewind"
);
}
// Once startup has aligned databases with marshal, future boots should skip peer
// state sync and recover from the later of this anchor and marshal's durable
// processed height.
sync_metadata.set_complete(floor_block.height()).await;
let anchor = Anchor {
height: floor_block.height(),
round: floor_block.context().round(),
digest: floor_block.digest(),
};
StartupResult {
sync: SyncResult { databases, anchor },
skip_finalized_until,
}
}