//! Stateful application that manages the pending-tip DAG of merkleized batches on behalf of an [`Application`].
//!
//! The [`Stateful`] actor is split into two control loops:
//! - [`Syncing`] manages the state sync process.
//! - [`Processing`] manages the pending-tip DAG and drives the inner application.
use crate::stateful::{
actor::{
core::{mailbox::Message, processing::Processing, syncing::Syncing},
processor::{Processor, ProcessorMetrics},
syncer::{self, SyncPlan, SyncResult},
},
db::{
assert_rewind_window_safety, AttachableResolverSet, DatabaseSet, StateSyncSet,
SyncEngineConfig,
},
Application,
};
use commonware_actor::mailbox::{self as actor_mailbox};
use commonware_consensus::{
marshal::{
ancestry::BlockProvider,
core::{Mailbox as MarshalMailbox, Variant},
},
simplex::types::Finalization,
};
use commonware_cryptography::{certificate::Scheme, Digestible};
use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner, Storage};
use commonware_utils::{channel::oneshot, sync::AsyncMutex};
use futures::join;
use rand::Rng;
use std::{num::NonZeroUsize, sync::Arc};
mod mailbox;
pub use mailbox::Mailbox;
mod processing;
mod syncing;
type BlockDigest = <>::Block as Digestible>::Digest;
/// Configuration for constructing a [`Stateful`] application.
pub struct Config
where
E: Rng + Spawner + Metrics + Clock + Storage,
A: Application,
S: Scheme,
V: Variant,
{
/// The inner application that drives state transitions.
pub application: A,
/// Configuration used to construct the database set.
pub db_config: >::Config,
/// Source of input (e.g. transactions) passed to the application on propose.
pub input_provider: A::InputProvider,
/// Marshal mailbox used for startup anchoring and lazy recovery.
pub marshal: MarshalMailbox,
/// Marshal ack window used by the provided marshal mailbox.
///
/// This must match the marshal config used to construct [`Self::marshal`].
pub max_pending_acks: NonZeroUsize,
/// Capacity of the stateful actor mailbox channel.
pub mailbox_size: NonZeroUsize,
/// Startup plan loaded via [`SyncPlan::init`], optionally augmented with
/// a finalized floor via [`SyncPlan::with_floor`]. Carries the durable
/// metadata handle and the startup decision shared with marshal.
pub plan: SyncPlan,
/// Resolver(s) for state sync fetches and post-bootstrap serving.
pub resolvers: R,
/// Sync engine tuning knobs.
pub sync_config: SyncEngineConfig,
}
/// Stateful application that manages the pending-tip DAG of merkleized
/// batches on behalf of an [`Application`], implementing the consensus
/// application and verifying traits.
pub struct Stateful
where
E: Rng + Spawner + Metrics + Clock + Storage,
A: Application,
S: Scheme,
V: Variant,
{
/// Runtime context providing RNG, task spawning, metrics, and clock.
context: ContextCell,
/// The receiver for messages.
mailbox: actor_mailbox::Receiver>,
/// The inner application that drives state transitions.
application: A,
/// Source of input (e.g. transactions) passed to the application on propose.
input_provider: A::InputProvider,
/// Marshal mailbox used for startup anchoring and lazy recovery.
marshal: MarshalMailbox,
/// Configuration used to initialize the database set at startup.
db_config: >::Config,
/// Startup plan carrying the metadata handle and floor decision.
plan: SyncPlan,
/// Resolver(s) for state sync fetches and post-bootstrap serving.
resolvers: R,
/// Sync engine tuning knobs.
sync_config: SyncEngineConfig,
}
impl Stateful
where
E: Rng + Spawner + Metrics + Clock + Storage,
A: Application,
A::Databases: StateSyncSet>,
S: Scheme,
V: Variant,
R: AttachableResolverSet,
MarshalMailbox: BlockProvider,
{
/// Construct a [`Stateful`] actor and its [`Mailbox`].
///
/// This only wires dependencies and allocates the mailbox. The actor does
/// not process messages until [`Stateful::start`] is called.
pub fn init(context: E, config: Config) -> (Self, Mailbox) {
assert_rewind_window_safety::(config.max_pending_acks);
let (sender, mailbox) = actor_mailbox::new(context.child("mailbox"), config.mailbox_size);
(
Self {
context: ContextCell::new(context),
mailbox,
application: config.application,
input_provider: config.input_provider,
marshal: config.marshal,
db_config: config.db_config,
plan: config.plan,
resolvers: config.resolvers,
sync_config: config.sync_config,
},
Mailbox::new(sender),
)
}
pub fn start(mut self) -> Handle<()> {
spawn_cell!(self.context, self.run())
}
async fn run(self) {
if let Some(floor) = self.plan.floor().cloned() {
self.start_state_sync(floor).await;
} else if self.plan.requires_state_sync_floor() {
panic!("interrupted state sync must resume from a newly selected floor");
} else {
self.start_from_marshal().await;
}
}
/// Starts the application in [`Syncing`] mode, kicking off a state sync process
/// towards the finalized floor specified in the [`SyncPlan`].
async fn start_state_sync(self, floor: Finalization) {
let sync_metadata = Arc::new(AsyncMutex::new(self.plan.into_sync_metadata()));
let (sync_complete, sync_completed) = oneshot::channel();
let (syncer, syncer_mailbox) = syncer::Syncer::new(syncer::Config {
context: self.context.child("syncer"),
db_config: self.db_config,
sync_config: self.sync_config,
resolvers: self.resolvers.clone(),
sync_metadata: sync_metadata.clone(),
finalization: floor,
marshal: self.marshal.clone(),
sync_complete,
});
let syncing = Syncing {
context: self.context,
mailbox: self.mailbox,
application: self.application,
input_provider: self.input_provider,
marshal: self.marshal,
sync_metadata,
syncer: syncer_mailbox,
held_verify_requests: Vec::new(),
database_subscribers: Vec::new(),
artifact: None,
resolvers: self.resolvers,
sync_completed,
};
let _ = join!(syncer.start(), syncing.start());
}
/// Starts the application by initializing the database set at marshal's current floor.
async fn start_from_marshal(self) {
let syncer::StartupResult {
sync: SyncResult { databases, anchor },
skip_finalized_until,
} = syncer::init_databases_from_marshal::(
self.context.as_present(),
&self.marshal,
self.db_config,
self.plan.into_sync_metadata(),
)
.await;
// Attach the resolvers to the initialized databases before starting the processor,
// so that this instance can serve peers database operations and proofs.
self.resolvers.attach_databases(databases.clone()).await;
let processor_metrics = ProcessorMetrics::new(self.context.child("processor"));
let processor = Processor::new(self.application, databases, anchor, processor_metrics);
Processing {
context: self.context,
mailbox: self.mailbox,
input_provider: self.input_provider,
marshal: self.marshal,
resolvers: self.resolvers,
processor,
skip_finalized_until,
}
.start()
.await
}
}
#[cfg(test)]
mod tests {
use super::{Config, Stateful};
use crate::stateful::{
actor::syncer::SyncPlan,
db::{AttachableResolver, StateSyncDb, SyncEngineConfig},
tests::mocks::{TestApp, TestBlock, TestDb, TestScheme, TestVariant},
};
use commonware_consensus::{
marshal::{self, ancestry, core::Actor as MarshalActor},
simplex::{
mocks::scheme as scheme_mocks,
types::{Finalization, Finalize, Proposal},
},
types::{Epoch, FixedEpocher, Round, View, ViewDelta},
Application as _, CertifiableBlock as _,
};
use commonware_cryptography::{
certificate::{mocks::Fixture, ConstantProvider},
sha256::Digest as Sha256Digest,
};
use commonware_macros::select;
use commonware_parallel::Sequential;
use commonware_runtime::{
buffer::paged::CacheRef, deterministic, Clock as _, Runner as _, Supervisor as _,
};
use commonware_storage::archive::immutable;
use commonware_utils::{channel::mpsc, sync::AsyncRwLock, NZUsize, NZU16, NZU64};
use std::{convert::Infallible, sync::Arc, time::Duration};
#[derive(Clone)]
struct NoopResolver;
impl AttachableResolver for NoopResolver {
async fn attach_database(&self, _db: Arc>) {}
}
impl StateSyncDb for TestDb {
type SyncError = Infallible;
async fn sync_db(
_context: deterministic::Context,
_config: Self::Config,
_resolver: NoopResolver,
_target: Self::SyncTarget,
_tip_updates: mpsc::Receiver,
_finish: Option>,
_reached_target: Option>,
_sync_config: SyncEngineConfig,
) -> Result {
Ok(Self)
}
}
fn archive_config(page_cache: CacheRef, partition: &str) -> immutable::Config<()> {
immutable::Config {
metadata_partition: format!("{partition}-metadata"),
freezer_table_partition: format!("{partition}-table"),
freezer_table_initial_size: 4,
freezer_table_resize_frequency: 2,
freezer_table_resize_chunk_size: 2,
freezer_key_partition: format!("{partition}-key"),
freezer_key_page_cache: page_cache,
freezer_value_partition: format!("{partition}-value"),
freezer_value_target_size: 128,
freezer_value_compression: None,
ordinal_partition: format!("{partition}-ordinal"),
items_per_section: NZU64!(4),
codec_config: (),
replay_buffer: NZUsize!(64),
freezer_key_write_buffer: NZUsize!(64),
freezer_value_write_buffer: NZUsize!(64),
ordinal_write_buffer: NZUsize!(64),
}
}
fn build_finalization(
fixture: &Fixture,
payload: Sha256Digest,
) -> Finalization {
let proposal = Proposal::new(
Round::new(Epoch::zero(), View::new(1)),
View::zero(),
payload,
);
let votes: Vec<_> = fixture
.schemes
.iter()
.map(|scheme| Finalize::sign(scheme, proposal.clone()).unwrap())
.collect();
Finalization::from_finalizes(&fixture.verifier, &votes, &Sequential)
.expect("finalization quorum")
}
#[test]
fn mailbox_rejects_propose_while_floor_resolution_waits() {
deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move {
let mut signing_context = context.child("signing");
let fixture = scheme_mocks::fixture(&mut signing_context, b"pending-floor", 1);
let provider = ConstantProvider::new(fixture.schemes[0].clone());
let finalization = build_finalization(&fixture, Sha256Digest::from([7; 32]));
let page_cache = CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(8));
let finalizations_by_height = immutable::Archive::init(
context.child("finalizations_by_height"),
archive_config(page_cache.clone(), "pending-floor-finalizations"),
)
.await
.expect("failed to initialize finalizations archive");
let finalized_blocks = immutable::Archive::init(
context.child("finalized_blocks"),
archive_config(page_cache.clone(), "pending-floor-blocks"),
)
.await
.expect("failed to initialize blocks archive");
let (_marshal_actor, marshal, _height) =
MarshalActor::<_, TestVariant, _, _, _, _, _>::init(
context.child("marshal"),
finalizations_by_height,
finalized_blocks,
marshal::Config {
provider,
epocher: FixedEpocher::new(NZU64!(u64::MAX)),
start: marshal::Start::Genesis(TestBlock::new(0, 0)),
partition_prefix: "pending-floor-marshal".to_string(),
mailbox_size: NZUsize!(8),
view_retention_timeout: ViewDelta::new(1),
prunable_items_per_section: NZU64!(4),
page_cache,
replay_buffer: NZUsize!(64),
key_write_buffer: NZUsize!(64),
value_write_buffer: NZUsize!(64),
block_codec_config: (),
max_repair: NZUsize!(1),
max_pending_acks: NZUsize!(1),
strategy: Sequential,
},
)
.await;
let plan = SyncPlan::init(&context, "pending-floor-stateful".to_string()).await;
let (stateful, mut mailbox) = Stateful::init(
context.child("stateful"),
Config {
application: TestApp,
db_config: (),
input_provider: (),
marshal,
max_pending_acks: NZUsize!(1),
mailbox_size: NZUsize!(8),
plan: plan.with_floor(finalization),
resolvers: NoopResolver,
sync_config: SyncEngineConfig {
fetch_batch_size: NZU64!(1),
apply_batch_size: 1,
max_outstanding_requests: 1,
update_channel_size: NZUsize!(1),
max_retained_roots: 1,
},
},
);
let handle = stateful.start();
select! {
result = mailbox.propose(
(context.child("proposal"), TestBlock::new(1, 1).context()),
ancestry::from_iter([]),
) => {
assert!(result.is_none());
},
_ = context.sleep(Duration::from_millis(100)) => {
panic!("stateful mailbox stalled while resolving state sync floor");
},
}
handle.abort();
});
}
}