//! Traits for database batch lifecycle and state sync in [`Stateful`](super::Stateful). //! //! This module defines the boundary between stateful application logic and //! storage backends (QMDB variants). //! //! # Batch Lifecycle //! //! Normal execution has three stages: //! 1. [`Unmerkleized`]: mutable, in-progress batch (concrete types expose reads and writes). //! 2. [`Merkleized`]: a sealed batch with a computed root. //! 3. Finalization: persist the sealed batch via [`ManagedDb::finalize`]. //! //! [`DatabaseSet`] groups one or more [`ManagedDb`] instances into one logical //! unit for execution and commit. //! //! # State Sync //! //! State sync orchestration is expressed by two traits: //! - [`StateSyncDb`]: per-database sync entrypoint. //! - [`StateSyncSet`]: set-level orchestration. //! //! ## Anchors //! //! Each set of sync targets is paired with an anchor `(Height, Round, D)` where //! `D` is the block digest. The db layer never interprets the anchor; it //! only tracks which anchor each database converged on. //! //! On completion, [`StateSyncSet::sync`] returns the anchor that all databases //! agreed on. The caller uses this to set the marshal floor and the //! last-processed digest, ensuring they match the actual convergence point //! rather than whatever marshal's head happens to be (which may have advanced //! during sync). //! //! ## Convergence Algorithm (tuple sets) //! //! Tuple [`StateSyncSet`] implementations assign each `(anchor, targets)` //! pair a *generation* number and use this algorithm: //! //! 1. Forward tip updates only to databases that have not yet reported //! "reached target". Reached databases are frozen to prevent them from //! running ahead to a newer anchor. //! 2. When all databases report reached, compare the generation each was //! assigned when it reported. //! 3. If all generations match, every database synced to targets from the //! same anchor. Return that anchor. //! 4. If generations differ, *regroup*: re-send the highest-reached //! generation's targets to the behind databases, clear their reached //! state, and repeat from step 1. //! //! ### Chasing a moving tip //! //! ```text //! time --------------------------------------------------------------> //! //! marshal finalized tip: A0 ------ A1 ------ A2 ------ A3 //! generation: g0 g1 g2 g3 //! //! db0 (slow): g0 ------------------> g1 -----------------> g3 reached //! db1 (fast): g0 ----> g1 reached -- frozen -- regroup --> g3 reached //! db2 (fast): g0 ----> g1 reached -- frozen -- regroup --> g3 reached //! //! coordinator queue while db0 is still catching up: //! [A2] [A3] -- drain --> keep only A3 //! //! finish only when: //! - every database has reported the same generation //! - no newer tip update is still queued behind it //! ``` //! //! The coordinator continuously drains tip updates and keeps only the latest //! value before forwarding, which avoids target-channel backpressure buildup. //! The `generation_state` map is pruned after every dispatch to only retain //! generations currently assigned to at least one database, so memory usage //! is bounded by the number of databases regardless of how long sync runs. use commonware_consensus::{ types::{Height, Round}, CertifiableBlock, Epochable, Roundable, Viewable, }; use commonware_cryptography::Digest; use commonware_macros::select; use commonware_runtime::{reschedule, Metrics, Spawner}; use commonware_utils::{ channel::{fallible::AsyncFallibleExt, mpsc, oneshot, ring}, sync::AsyncRwLock, }; use futures::{ future::{pending, Either}, join, }; use std::{ collections::BTreeMap, fmt::Debug, future::Future, num::{NonZeroU64, NonZeroUsize}, sync::Arc, }; const MAX_CHANNEL_DRAIN_PER_TICK: usize = 32; pub mod any; pub mod current; pub mod immutable; pub mod keyless; pub mod p2p; /// Mutable batch state before merkleization. /// /// Concrete types provide key-value operations (`get`, `write`, `set`, /// `append`, etc.) as inherent methods; the generic wrapper only needs /// [`merkleize`](Self::merkleize). pub trait Unmerkleized: Sized + Send { /// The merkleized batch produced by [`merkleize`](Self::merkleize). type Merkleized: Merkleized; /// The error type returned by fallible operations. type Error: Send; /// Resolve all mutations, compute the new state root, and produce a /// merkleized batch. fn merkleize(self) -> impl Future> + Send; } /// Sealed batch state with a computed root. /// /// The application uses [`root`](Self::root) in block headers, and the wrapper /// later finalizes this batch. pub trait Merkleized: Sized + Send + Sync { /// The digest type used for the state root. type Digest: Digest; /// The unmerkleized batch type produced by [`new_batch`](Self::new_batch). type Unmerkleized: Unmerkleized; /// The canonical state root committed in block headers. fn root(&self) -> Self::Digest; /// Create a child unmerkleized batch that reads through this batch's /// pending changes before falling back to the committed database state. /// /// In QMDB, this maps to `merkleized_batch.new_batch()`. fn new_batch(&self) -> Self::Unmerkleized; } /// One database managed by the [`Stateful`](super::Stateful) wrapper. /// /// Implementations create new batches from committed state and persist finalized /// batches back to storage. /// /// [`new_batch`](Self::new_batch) receives `Arc>` so batch /// types can keep read-through access to committed state. /// /// `E` is a trait generic (not an associated type), so one database type can /// work across runtimes that satisfy the bounds. pub trait ManagedDb: Send + Sync + Sized { /// An in-progress batch of mutations that has not yet been merkleized. type Unmerkleized: Unmerkleized; /// A batch whose root has been computed but has not yet been applied to /// the underlying database. /// /// Constrained so that [`Merkleized::new_batch`] produces the same /// [`Unmerkleized`] type as [`ManagedDb::new_batch`](Self::new_batch). type Merkleized: Merkleized; /// The error type returned by fallible operations. type Error: Debug + Send; /// Configuration needed to construct a new database instance. type Config: Send; /// Sync target type for state sync of this database. /// /// Typically a database-specific state commitment plus the operation range needed to reach it. type SyncTarget: Clone + PartialEq + Send + Sync; /// Construct a new database from its configuration. fn init( context: E, config: Self::Config, ) -> impl Future> + Send; /// Create a new unmerkleized batch rooted at the database's committed /// state. /// /// The `db` parameter is the `Arc>` that wraps this /// database, allowing batch types to capture a shared reference for /// read-through to committed state. fn new_batch(db: &Arc>) -> impl Future + Send; /// Return true if a merkleized batch matches a committed sync target. fn matches_sync_target(batch: &Self::Merkleized, target: &Self::SyncTarget) -> bool; /// Apply a merkleized batch's changeset to the underlying database. /// /// In QMDB, this encapsulates calling `merkleized.finalize()` to produce /// a `Changeset`, then `db.apply_batch(changeset)` and `db.commit()`. fn finalize( &mut self, batch: Self::Merkleized, ) -> impl Future> + Send; /// Return the sync target for this database's current committed state. fn sync_target(&self) -> impl Future + Send; /// Rewind committed state to `target`. /// /// Implementations must ensure rewind effects are durable before returning /// `Ok(())` (for example by committing after rewind). fn rewind_to_target( &mut self, target: Self::SyncTarget, ) -> impl Future> + Send; /// Return the maximum number of finalized commits this database can rewind. /// /// `None` means rewind depth is not bounded by a known finite limit. fn max_rewind_depth() -> Option { None } } /// A collection of individually locked [`ManagedDb`] instances. /// /// Each database is wrapped in `Arc>`, so the set is cheap to /// clone and each database can be shared without a global lock. /// /// `E` is a trait generic (not an associated type), so one set type can work /// across runtimes that satisfy the bounds. pub trait DatabaseSet: Clone + Send + Sync + 'static { /// Tuple of [`ManagedDb::Unmerkleized`] for every database in the set. type Unmerkleized: Send; /// Tuple of [`ManagedDb::Merkleized`] for every database in the set. type Merkleized: Send + Sync; /// Configuration needed to construct every database in the set. /// /// - Single database sets use that database's [`ManagedDb::Config`]. /// - Multi-database tuple sets use a tuple of per-database configs /// `(Db1::Config, Db2::Config, ...)`. type Config: Send; /// Per-database sync targets extracted from a finalized block. /// /// For a single-database set this is one target. For multi-database sets it is a tuple of /// targets, one per database. type SyncTargets: Clone + PartialEq + Send + Sync; /// Construct the database set from its configuration. fn init(context: E, config: Self::Config) -> impl Future + Send; /// Create unmerkleized batches from each database's committed state. /// /// Acquires a read lock on each database. fn new_batches(&self) -> impl Future + Send; /// Create child unmerkleized batches from a pending merkleized parent. /// /// No lock is needed; reads come from the in-memory merkleized state. fn fork_batches(parent: &Self::Merkleized) -> Self::Unmerkleized; /// Return true if merkleized batches match the committed sync targets. fn matches_sync_targets(batches: &Self::Merkleized, targets: &Self::SyncTargets) -> bool; /// Apply each merkleized batch's changeset to its underlying database. /// /// Acquires a write lock on each database. fn finalize(&self, batches: Self::Merkleized) -> impl Future + Send; /// Return sync targets for the set's current committed state. fn committed_targets(&self) -> impl Future + Send; /// Rewind the set to the provided per-database targets. /// /// Rewind failures are fatal for startup recovery and therefore panic. fn rewind_to_targets(&self, targets: Self::SyncTargets) -> impl Future + Send; /// Return the most restrictive finite rewind depth across the database set. /// /// `None` means every database in the set is unbounded. fn max_rewind_depth() -> Option; } pub(crate) fn assert_rewind_window_safety(max_pending_acks: NonZeroUsize) where D: DatabaseSet, { let Some(max_rewind_depth) = D::max_rewind_depth() else { return; }; assert!( max_pending_acks.get() <= max_rewind_depth, "marshal max_pending_acks={} exceeds database_set.max_rewind_depth={}", max_pending_acks, max_rewind_depth, ); } /// Parameters for a one-time state-sync pass. #[derive(Clone, Copy, Debug)] pub struct SyncEngineConfig { /// Maximum operations fetched per resolver request. pub fetch_batch_size: NonZeroU64, /// Number of operations applied per local apply step. pub apply_batch_size: usize, /// Maximum number of outstanding resolver requests. pub max_outstanding_requests: usize, /// Capacity of per-database target-update channels. pub update_channel_size: NonZeroUsize, /// Number of historical roots to retain for proof verification across /// target updates. pub max_retained_roots: usize, } /// A [`ManagedDb`] with a state-sync entrypoint. pub trait StateSyncDb: ManagedDb { /// Error returned by the state-sync engine for this database. type SyncError: Debug + Send; /// Run state-sync for this database and return a fully-initialized instance. #[allow(clippy::too_many_arguments)] fn sync_db( context: E, config: Self::Config, resolver: R, target: Self::SyncTarget, tip_updates: mpsc::Receiver, finish: Option>, reached_target: Option>, sync_config: SyncEngineConfig, ) -> impl Future> + Send; } /// Block metadata identifying the block that produced a set /// of sync targets. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct Anchor { /// Height of the anchoring block. pub height: Height, /// Consensus round of the anchoring block. pub round: Round, /// Digest of the anchoring block. pub digest: D, } impl From<&B> for Anchor where B: CertifiableBlock, B::Context: Epochable + Viewable, D: Digest, { fn from(block: &B) -> Self { Self { height: block.height(), round: block.context().round(), digest: block.digest(), } } } /// Tip update delivered to a live state-sync session. /// /// The optional observation barrier is used by the stateful actor to delay /// marshal acknowledgement until the sync coordinator has recorded the new /// anchor and targets. pub struct TipUpdate { anchor: Anchor, targets: T, observed: Option>, } impl TipUpdate { pub const fn new(anchor: Anchor, targets: T) -> Self { Self { anchor, targets, observed: None, } } pub(crate) fn with_observation(anchor: Anchor, targets: T) -> (Self, oneshot::Receiver<()>) { let (observed, receiver) = oneshot::channel(); ( Self { anchor, targets, observed: Some(observed), }, receiver, ) } pub(crate) fn record(mut self) -> (Anchor, T) { if let Some(observed) = self.observed.take() { let _ = observed.send(()); } (self.anchor, self.targets) } } /// A [`DatabaseSet`] that can run one-time state sync. /// /// `D` is the block digest type. Each set of sync targets is paired /// with an [`Anchor`] identifying the block that produced those targets. /// On convergence, `sync` returns the anchor that all databases agreed on. pub trait StateSyncSet: DatabaseSet where D: Digest, { /// Error returned if any database in the set fails state sync. type Error: Debug + Send; /// Run one-time state sync and return the initialized set /// together with the anchor all databases converged on. #[allow(clippy::too_many_arguments)] fn sync( context: E, config: Self::Config, resolvers: R, anchor: Anchor, targets: Self::SyncTargets, tip_updates: ring::Receiver>, sync_config: SyncEngineConfig, ) -> impl Future), Self::Error>> + Send; } /// Implement [`DatabaseSet`] for a single [`ManagedDb`] behind a lock. impl + 'static> DatabaseSet for Arc> { type Unmerkleized = T::Unmerkleized; type Merkleized = T::Merkleized; type Config = T::Config; type SyncTargets = T::SyncTarget; async fn init(context: E, config: Self::Config) -> Self { let db = T::init(context, config) .await .expect("database init failed"); Self::new(AsyncRwLock::new(db)) } async fn new_batches(&self) -> Self::Unmerkleized { T::new_batch(self).await } fn fork_batches(parent: &Self::Merkleized) -> Self::Unmerkleized { parent.new_batch() } fn matches_sync_targets(batches: &Self::Merkleized, targets: &Self::SyncTargets) -> bool { T::matches_sync_target(batches, targets) } async fn finalize(&self, batches: Self::Merkleized) { let mut database = self.write().await; finalize_or_panic(&mut *database, batches, None).await; } async fn committed_targets(&self) -> Self::SyncTargets { let database = self.read().await; T::sync_target(&*database).await } async fn rewind_to_targets(&self, target: Self::SyncTargets) { let mut database = self.write().await; if T::sync_target(&*database).await == target { return; } rewind_or_panic(&mut *database, target, None).await; } fn max_rewind_depth() -> Option { T::max_rewind_depth() } } impl StateSyncSet for Arc> where E: Send + Sync + Metrics, T: StateSyncDb + 'static, R: Send + 'static, D: Digest, { type Error = T::SyncError; #[allow(clippy::too_many_arguments)] async fn sync( context: E, config: Self::Config, resolver: R, anchor: Anchor, target: Self::SyncTargets, tip_updates: ring::Receiver>, sync_config: SyncEngineConfig, ) -> Result<(Self, Anchor), Self::Error> { let (target_tx, target_rx) = mpsc::channel(sync_config.update_channel_size.get()); let (finish_tx, finish_rx) = mpsc::channel(1); let (reached_tx, mut reached_rx) = mpsc::channel(1); let mut current_target = target.clone(); let sync = T::sync_db( context, config, resolver, target, target_rx, Some(finish_rx), Some(reached_tx), sync_config, ); let coordinator = async { let mut current_anchor = anchor; let mut tip_updates = Some(tip_updates); loop { if !drain_single_tip_updates( &mut tip_updates, &target_tx, &mut current_anchor, &mut current_target, ) .await { return (current_anchor, current_target); } let update_future = tip_updates.as_mut().map_or_else( || Either::Right(pending()), |updates| Either::Left(updates.recv()), ); select! { reached = reached_rx.recv() => { let Some(reached) = reached else { return (current_anchor, current_target); }; if !drain_single_tip_updates( &mut tip_updates, &target_tx, &mut current_anchor, &mut current_target, ) .await { return (current_anchor, current_target); }; if reached != current_target { continue; } let _ = finish_tx.send_lossy(()).await; return (current_anchor, current_target); }, update = update_future => { let Some(update) = update else { tip_updates = None; continue; }; let (new_anchor, new_target) = update.record(); if new_anchor.height <= current_anchor.height { continue; } current_anchor = new_anchor; if new_target == current_target { continue; } current_target = new_target.clone(); if !target_tx.send_lossy(new_target).await { return (current_anchor, current_target); } }, } } }; let (db_result, (converged_anchor, converged_target)) = join!(sync, coordinator); let database = db_result?; assert!( T::sync_target(&database).await == converged_target, "state sync database target does not match the coordinator target", ); Ok((Self::new(AsyncRwLock::new(database)), converged_anchor)) } } async fn drain_single_tip_updates( tip_updates: &mut Option>>, target_tx: &mpsc::Sender, current_anchor: &mut Anchor, current_target: &mut T, ) -> bool where D: Digest, T: Clone + PartialEq + Send + Sync, { let mut drained = 0usize; let mut latest = None; loop { let update = match tip_updates.as_mut().map(ring::Receiver::try_recv) { Some(Ok(update)) => update, Some(Err(ring::TryRecvError::Empty)) => break, Some(Err(ring::TryRecvError::Disconnected)) => { *tip_updates = None; break; } None => break, }; drained += 1; let (new_anchor, new_target) = update.record(); if drained.is_multiple_of(MAX_CHANNEL_DRAIN_PER_TICK) { reschedule().await; } let latest_height = latest .as_ref() .map_or(current_anchor.height, |(anchor, _): &(Anchor, T)| { anchor.height }); if new_anchor.height <= latest_height { continue; } latest = Some((new_anchor, new_target)); } let Some((new_anchor, new_target)) = latest else { return true; }; *current_anchor = new_anchor; if new_target == *current_target { return true; } *current_target = new_target.clone(); target_tx.send_lossy(new_target).await } /// Implement [`DatabaseSet`] for a tuple of individually-locked /// [`ManagedDb`] instances. macro_rules! impl_database_set { ($($T:ident : $idx:tt),+) => { impl + 'static),+> DatabaseSet for ($(Arc>,)+) { type Unmerkleized = ($($T::Unmerkleized,)+); type Merkleized = ($($T::Merkleized,)+); type Config = ($($T::Config,)+); type SyncTargets = ($($T::SyncTarget,)+); async fn init(context: E, config: Self::Config) -> Self { let result = join!($( async { let db = $T::init( context.child(concat!("db_", stringify!($idx))), config.$idx, ) .await .expect(concat!( "database init failed (index ", stringify!($idx), ", type ", stringify!($T), ")", )); Arc::new(AsyncRwLock::new(db)) }, )+); result } async fn new_batches(&self) -> Self::Unmerkleized { join!($($T::new_batch(&self.$idx),)+) } fn fork_batches(parent: &Self::Merkleized) -> Self::Unmerkleized { ($(parent.$idx.new_batch(),)+) } fn matches_sync_targets(batches: &Self::Merkleized, targets: &Self::SyncTargets) -> bool { $($T::matches_sync_target(&batches.$idx, &targets.$idx))&&+ } async fn finalize(&self, batches: Self::Merkleized) { join!($( async { let mut database = self.$idx.write().await; finalize_or_panic(&mut *database, batches.$idx, Some($idx)).await; }, )+); } async fn committed_targets(&self) -> Self::SyncTargets { join!($( async { let database = self.$idx.read().await; $T::sync_target(&*database).await }, )+) } async fn rewind_to_targets(&self, targets: Self::SyncTargets) { join!($( async { let mut database = self.$idx.write().await; if $T::sync_target(&*database).await == targets.$idx { return; } rewind_or_panic(&mut *database, targets.$idx, Some($idx)).await; }, )+); } fn max_rewind_depth() -> Option { let mut max_rewind_depth: Option = None; $( max_rewind_depth = match (max_rewind_depth, $T::max_rewind_depth()) { (Some(current), Some(next)) => Some(current.min(next)), (Some(current), None) => Some(current), (None, Some(next)) => Some(next), (None, None) => None, }; )+ max_rewind_depth } } }; } impl_database_set!(DB1: 0); impl_database_set!(DB1: 0, DB2: 1); impl_database_set!(DB1: 0, DB2: 1, DB3: 2); impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3); impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3, DB5: 4); impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3, DB5: 4, DB6: 5); impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3, DB5: 4, DB6: 5, DB7: 6); impl_database_set!(DB1: 0, DB2: 1, DB3: 2, DB4: 3, DB5: 4, DB6: 5, DB7: 6, DB8: 7); struct DbSyncChannels { target_tx: mpsc::Sender, target_rx: mpsc::Receiver, finish_tx: mpsc::Sender<()>, finish_rx: mpsc::Receiver<()>, generation_tx: mpsc::Sender<(usize, T)>, generation_rx: mpsc::Receiver<(usize, T)>, reached_tx: mpsc::Sender, reached_rx: mpsc::Receiver, } impl DbSyncChannels { fn new(update_channel_size: usize) -> Self { let (target_tx, target_rx) = mpsc::channel(update_channel_size); let (finish_tx, finish_rx) = mpsc::channel(1); let (generation_tx, generation_rx) = mpsc::channel(update_channel_size); let (reached_tx, reached_rx) = mpsc::channel(1); Self { target_tx, target_rx, finish_tx, finish_rx, generation_tx, generation_rx, reached_tx, reached_rx, } } } struct CoordinatorSyncSenders { target_tx: mpsc::Sender, finish_tx: mpsc::Sender<()>, generation_tx: mpsc::Sender<(usize, T)>, } macro_rules! impl_state_sync_set { ($($T:ident : $R:ident : $idx:tt),+) => { impl StateSyncSet for ($(Arc>,)+) where E: Send + Sync + Spawner + Metrics + 'static, D: Digest + 'static, $( $T: StateSyncDb + 'static, $R: Send + 'static, )+ { type Error = String; #[allow(clippy::too_many_arguments)] async fn sync( context: E, config: Self::Config, resolvers: ($($R,)+), anchor: Anchor, targets: Self::SyncTargets, tip_updates: ring::Receiver>, sync_config: SyncEngineConfig, ) -> Result<(Self, Anchor), Self::Error> { let db_channels = ($( DbSyncChannels::<<$T as ManagedDb>::SyncTarget>::new( sync_config.update_channel_size.get(), ), )+); let coordinator_senders = ($( CoordinatorSyncSenders { target_tx: db_channels.$idx.target_tx.clone(), finish_tx: db_channels.$idx.finish_tx.clone(), generation_tx: db_channels.$idx.generation_tx.clone(), }, )+); let coordinator_owned_senders = ($( CoordinatorSyncSenders { target_tx: db_channels.$idx.target_tx, finish_tx: db_channels.$idx.finish_tx, generation_tx: db_channels.$idx.generation_tx, }, )+); let (reached_event_tx, mut reached_event_rx) = mpsc::channel(16); let (completion_tx, mut completion_rx) = mpsc::channel(1); let db_count = [$($idx,)+].len(); let coordinator_targets = targets.clone(); let initial_targets = targets.clone(); let first_db_error: Arc>> = Arc::new(commonware_utils::sync::Mutex::new(None)); let coordinator_handle = context.child("coordinator").spawn({ move |_context| async move { let coordinator_owned_senders = coordinator_owned_senders; let mut tip_updates = Some(tip_updates); let mut state = CoordinatorState::new(db_count, anchor, coordinator_targets); let mut last_dispatched_targets = initial_targets; loop { loop { match reached_event_rx.try_recv() { Ok((idx, generation)) => state.record_reached(idx, generation), Err(mpsc::error::TryRecvError::Empty) => break, Err(mpsc::error::TryRecvError::Disconnected) => return None, } } if let Some(updates) = tip_updates.as_mut() { loop { match updates.try_recv() { Ok(update) => { let (anchor, targets) = update.record(); state.record_tip_update(anchor, targets); } Err(ring::TryRecvError::Empty) => break, Err(ring::TryRecvError::Disconnected) => { tip_updates = None; break; } } } } match state.next_action() { CoordinatorAction::Converged { anchor, targets } => { $( let _ = coordinator_senders.$idx.finish_tx.send_lossy(()).await; )+ return Some((anchor, targets)); } CoordinatorAction::Dispatch { generation, targets: dispatch_targets, } => { $( let dispatch_target = dispatch_targets.$idx.clone(); if !coordinator_senders.$idx .generation_tx .send_lossy((generation, dispatch_target.clone())) .await { return None; } if state.should_dispatch($idx) { if dispatch_target != last_dispatched_targets.$idx { if !coordinator_senders.$idx .target_tx .send_lossy(dispatch_target.clone()) .await { return None; } last_dispatched_targets.$idx = dispatch_target; } } else if dispatch_target == last_dispatched_targets.$idx { state.mark_reached_same_target($idx, generation); } )+ continue; } CoordinatorAction::Wait => {} } let update_future = tip_updates.as_mut().map_or_else( || Either::Right(pending()), |updates| Either::Left(updates.recv()), ); select! { reached_event = reached_event_rx.recv() => { let (idx, generation) = reached_event?; state.record_reached(idx, generation); }, _ = completion_rx.recv() => { drop(coordinator_owned_senders); return None; }, update = update_future => { let Some(update) = update else { tip_updates = None; continue; }; let (anchor, targets) = update.record(); state.record_tip_update(anchor, targets); }, }; } } }); let db_handles = ( $( context.child(concat!("db_", stringify!($idx))).spawn({ let first_db_error = first_db_error.clone(); let mut reached_target_rx = db_channels.$idx.reached_rx; let mut generation_rx = Some(db_channels.$idx.generation_rx); let mut current_generation = 0usize; let mut current_target = targets.$idx.clone(); let mut last_reached_target = None; let mut last_reported_generation = None; let reached_event_sender = reached_event_tx.clone(); let completion_signal = completion_tx.clone(); let config = config.$idx; let resolver = resolvers.$idx; let target = targets.$idx; let target_rx = db_channels.$idx.target_rx; let finish_rx = db_channels.$idx.finish_rx; let reached_tx = db_channels.$idx.reached_tx; move |context| async move { let sync = $T::sync_db( context, config, resolver, target, target_rx, Some(finish_rx), Some(reached_tx), sync_config, ); let forward_reached = async move { loop { drain_generation_updates( &mut generation_rx, &mut current_generation, &mut current_target, &last_reached_target, &mut last_reported_generation, &reached_event_sender, $idx, ) .await; let update_future = generation_rx.as_mut().map_or_else( || Either::Right(pending()), |updates| Either::Left(updates.recv()), ); select! { reached_target = reached_target_rx.recv() => { let Some(reached_target) = reached_target else { return; }; last_reached_target = Some(reached_target.clone()); drain_generation_updates( &mut generation_rx, &mut current_generation, &mut current_target, &last_reached_target, &mut last_reported_generation, &reached_event_sender, $idx, ) .await; if reached_target != current_target { continue; } if last_reported_generation != Some(current_generation) { if !reached_event_sender .send_lossy(($idx, current_generation)) .await { return; } last_reported_generation = Some(current_generation); } }, update = update_future => { let Some((generation, target)) = update else { generation_rx = None; continue; }; current_generation = generation; current_target = target; if last_reached_target.as_ref() == Some(¤t_target) && last_reported_generation != Some(current_generation) { if !reached_event_sender .send_lossy(($idx, current_generation)) .await { return; } last_reported_generation = Some(current_generation); } }, }; } }; let (sync_result, _) = join!(sync, forward_reached); let result = sync_result .map(|database| Arc::new(AsyncRwLock::new(database))) .map_err(|err| { format!( "state sync failed (index {}, db {}): {err:?}", $idx, core::any::type_name::<$T>(), ) }); if let Err(err) = &result { let mut first = first_db_error.lock(); if first.is_none() { *first = Some(err.clone()); } } let _ = completion_signal.send_lossy(()).await; result } }), )+ ); let synced = join!( $( async { db_handles.$idx .await .expect("state sync database task exited") }, )+ ); let converged_anchor = coordinator_handle .await .expect("state sync coordinator task exited"); if let Some(err) = first_db_error.lock().take() { return Err(err); } let synced = ($(synced.$idx?,)+); let Some((converged_anchor, converged_targets)) = converged_anchor else { return Err("state sync coordinator did not report a converged anchor".into()); }; if >::committed_targets(&synced).await != converged_targets { return Err( "state sync database targets do not match the coordinator target set" .into(), ); } Ok((synced, converged_anchor)) } } }; } impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1); impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1, DB3: R3: 2); impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1, DB3: R3: 2, DB4: R4: 3); impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1, DB3: R3: 2, DB4: R4: 3, DB5: R5: 4); impl_state_sync_set!(DB1: R1: 0, DB2: R2: 1, DB3: R3: 2, DB4: R4: 3, DB5: R5: 4, DB6: R6: 5); impl_state_sync_set!( DB1: R1: 0, DB2: R2: 1, DB3: R3: 2, DB4: R4: 3, DB5: R5: 4, DB6: R6: 5, DB7: R7: 6 ); impl_state_sync_set!( DB1: R1: 0, DB2: R2: 1, DB3: R3: 2, DB4: R4: 3, DB5: R5: 4, DB6: R6: 5, DB7: R7: 6, DB8: R8: 7 ); async fn drain_generation_updates( generation_rx: &mut Option>, current_generation: &mut usize, current_target: &mut T, last_reached_target: &Option, last_reported_generation: &mut Option, reached_event_sender: &mpsc::Sender<(usize, usize)>, idx: usize, ) where T: Clone + PartialEq, { if let Some(updates) = generation_rx.as_mut() { let mut drained = 0usize; loop { match updates.try_recv() { Ok((generation, target)) => { drained += 1; *current_generation = generation; *current_target = target; if last_reached_target.as_ref() == Some(current_target) && *last_reported_generation != Some(*current_generation) { if !reached_event_sender .send_lossy((idx, *current_generation)) .await { return; } *last_reported_generation = Some(*current_generation); } if drained.is_multiple_of(MAX_CHANNEL_DRAIN_PER_TICK) { reschedule().await; } } Err(mpsc::error::TryRecvError::Empty) => break, Err(mpsc::error::TryRecvError::Disconnected) => { *generation_rx = None; break; } } } } } /// Per-database sync tracking state. #[derive(Clone, Copy, Debug, PartialEq, Eq)] enum DbSyncState { /// Database is still syncing toward its assigned generation's targets. Seeking { generation: usize }, /// Database reported it reached its assigned generation's targets. Reached { generation: usize }, } impl DbSyncState { const fn generation(self) -> usize { match self { Self::Seeking { generation } | Self::Reached { generation } => generation, } } const fn is_reached(self) -> bool { matches!(self, Self::Reached { .. }) } } /// What the coordinator should do after processing events. enum CoordinatorAction { /// Nothing to do; wait for the next event. Wait, /// Dispatch targets to non-reached databases for `generation`. Dispatch { generation: usize, targets: T }, /// All databases converged on the same generation. Converged { anchor: Anchor, targets: T }, } /// Pure state machine for multi-database sync convergence. /// /// Tracks which generation each database is assigned to, which have /// reported "reached", and decides when to regroup or declare /// convergence. struct CoordinatorState { dbs: Vec, generation_state: BTreeMap, T)>, current_generation: usize, latest_tip: Option<(Anchor, T)>, last_dispatched_anchor: Anchor, } impl CoordinatorState { fn new(db_count: usize, anchor: Anchor, targets: T) -> Self { let dbs = vec![DbSyncState::Seeking { generation: 0 }; db_count]; let mut generation_state = BTreeMap::new(); generation_state.insert(0, (anchor, targets)); Self { dbs, generation_state, current_generation: 0, latest_tip: None, last_dispatched_anchor: anchor, } } /// Record that database `idx` reached `generation`. /// /// Reached events can arrive late. If the database has already been /// re-assigned to a newer generation, stale events are ignored. fn record_reached(&mut self, idx: usize, generation: usize) { if self.dbs[idx].generation() != generation { return; } if self.dbs[idx].is_reached() { return; } self.dbs[idx] = DbSyncState::Reached { generation }; } /// Record a new tip update. /// /// Sync targets must move strictly forward. Ignore stale and duplicate /// anchors to avoid dispatching backward targets. fn record_tip_update(&mut self, anchor: Anchor, targets: T) { let current_height = self .latest_tip .as_ref() .map_or(self.last_dispatched_anchor.height, |(latest_anchor, _)| { latest_anchor.height }); if anchor.height <= current_height { return; } self.latest_tip = Some((anchor, targets)); } /// Determine the next action. Mutates internal state for regroup/dispatch. /// /// Returns which database indices should receive targets via /// `dbs[idx].is_reached() == false` after a `Dispatch` action. fn next_action(&mut self) -> CoordinatorAction { let all_reached = self.dbs.iter().all(|db| db.is_reached()); if all_reached { let min_gen = self.dbs.iter().map(|db| db.generation()).min().unwrap(); let max_gen = self.dbs.iter().map(|db| db.generation()).max().unwrap(); if min_gen == max_gen { if let Some((anchor, targets)) = self.latest_tip.take() { let generation = self.current_generation + 1; self.current_generation = generation; for db in &mut self.dbs { *db = DbSyncState::Seeking { generation }; } self.generation_state .insert(generation, (anchor, targets.clone())); self.last_dispatched_anchor = anchor; self.prune_generations(); return CoordinatorAction::Dispatch { generation, targets, }; } let (anchor, targets) = self .generation_state .get(&min_gen) .expect("missing state for converged generation") .clone(); return CoordinatorAction::Converged { anchor, targets }; } // Regroup: reset behind databases to seek the highest generation. let (_anchor, targets) = self .generation_state .get(&max_gen) .expect("missing state for regroup generation") .clone(); for db in &mut self.dbs { if db.generation() != max_gen { *db = DbSyncState::Seeking { generation: max_gen, }; } } self.prune_generations(); return CoordinatorAction::Dispatch { generation: max_gen, targets, }; } // Not all reached. If there's a pending tip, dispatch it. let Some((anchor, targets)) = self.latest_tip.take() else { return CoordinatorAction::Wait; }; let generation = self.current_generation + 1; self.current_generation = generation; for db in &mut self.dbs { if !db.is_reached() { *db = DbSyncState::Seeking { generation }; } } self.generation_state .insert(generation, (anchor, targets.clone())); self.last_dispatched_anchor = anchor; self.prune_generations(); CoordinatorAction::Dispatch { generation, targets, } } /// Retain only generations referenced by at least one database. fn prune_generations(&mut self) { self.generation_state .retain(|gen, _| self.dbs.iter().any(|db| db.generation() == *gen)); } /// Whether database `idx` is a non-reached recipient for dispatch. fn should_dispatch(&self, idx: usize) -> bool { !self.dbs[idx].is_reached() } /// Advance a reached database to `generation` when its target is unchanged. fn mark_reached_same_target(&mut self, idx: usize, generation: usize) { if !self.dbs[idx].is_reached() { return; } self.dbs[idx] = DbSyncState::Reached { generation }; } } async fn finalize_or_panic>( database: &mut T, batch: T::Merkleized, index: Option, ) { // Mutable finalize failures are fatal by design because other databases in // the same set may already have committed, leaving partially applied state. if let Err(err) = database.finalize(batch).await { match index { Some(index) => panic!( "database finalize failed (index {index}, type {}): {err:?}", core::any::type_name::(), ), None => panic!( "database finalize failed (type {}): {err:?}", core::any::type_name::(), ), } } } async fn rewind_or_panic>( database: &mut T, target: T::SyncTarget, index: Option, ) { // Mutable rewind failures are fatal by design because the database handle // may be internally diverged after a failed rewind. if let Err(err) = database.rewind_to_target(target).await { match index { Some(index) => panic!( "database rewind failed (index {index}, type {}): {err:?}", core::any::type_name::(), ), None => panic!( "database rewind failed (type {}): {err:?}", core::any::type_name::(), ), } } } /// A resolver that can attach a database at runtime. /// /// Implementations receive a database handle after startup so they can /// serve incoming sync requests once the database is initialized. pub trait AttachableResolver: Clone + Send + Sync + 'static { /// Attach a database for serving incoming requests. fn attach_database(&self, db: Arc>) -> impl Future + Send; } /// Attach a database set to a resolver set with matching shape. pub trait AttachableResolverSet: Clone + Send + Sync + 'static { /// Attach all databases to their corresponding resolvers. fn attach_databases(&self, databases: DBs) -> impl Future + Send; } impl AttachableResolverSet>> for R where R: AttachableResolver, DB: Send + Sync + 'static, { async fn attach_databases(&self, db: Arc>) { self.attach_database(db).await; } } macro_rules! impl_attachable_resolver_set { ($($R:ident : $DB:ident : $idx:tt),+) => { impl<$($R, $DB),+> AttachableResolverSet<($(Arc>,)+)> for ($($R,)+) where $( $R: AttachableResolver<$DB>, $DB: Send + Sync + 'static, )+ { async fn attach_databases(&self, databases: ($(Arc>,)+)) { futures::join!($( self.$idx.attach_database(databases.$idx), )+); } } }; } impl_attachable_resolver_set!(R1: DB1: 0, R2: DB2: 1); impl_attachable_resolver_set!(R1: DB1: 0, R2: DB2: 1, R3: DB3: 2); impl_attachable_resolver_set!(R1: DB1: 0, R2: DB2: 1, R3: DB3: 2, R4: DB4: 3); impl_attachable_resolver_set!(R1: DB1: 0, R2: DB2: 1, R3: DB3: 2, R4: DB4: 3, R5: DB5: 4); impl_attachable_resolver_set!( R1: DB1: 0, R2: DB2: 1, R3: DB3: 2, R4: DB4: 3, R5: DB5: 4, R6: DB6: 5 ); impl_attachable_resolver_set!( R1: DB1: 0, R2: DB2: 1, R3: DB3: 2, R4: DB4: 3, R5: DB5: 4, R6: DB6: 5, R7: DB7: 6 ); impl_attachable_resolver_set!( R1: DB1: 0, R2: DB2: 1, R3: DB3: 2, R4: DB4: 3, R5: DB5: 4, R6: DB6: 5, R7: DB7: 6, R8: DB8: 7 ); #[cfg(test)] mod tests { use super::{ assert_rewind_window_safety, drain_single_tip_updates, Anchor, AttachableResolver, AttachableResolverSet, CoordinatorAction, CoordinatorState, DatabaseSet, ManagedDb, StateSyncDb, StateSyncSet, SyncEngineConfig, TipUpdate, MAX_CHANNEL_DRAIN_PER_TICK, }; use crate::stateful::tests::mocks::{anchor as mock_anchor, TestMerkleized, TestUnmerkleized}; use commonware_cryptography::sha256; use commonware_macros::select; use commonware_runtime::{ deterministic, reschedule, Clock, Runner as _, Spawner as _, Supervisor as _, }; use commonware_utils::{ channel::{mpsc, oneshot, ring}, sync::AsyncRwLock, }; use futures::{pin_mut, FutureExt, SinkExt}; use std::{ convert::Infallible, num::{NonZeroU64, NonZeroUsize}, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, }, time::Duration, }; #[derive(Default)] struct TestDb; #[derive(Default)] struct OneStepRewindDb; #[derive(Default)] struct ThreeStepRewindDb; struct CountingRewindDb { current_target: u64, rewind_count: usize, } impl ManagedDb for TestDb { type Unmerkleized = TestUnmerkleized; type Merkleized = TestMerkleized; type Error = Infallible; type Config = (); type SyncTarget = (); async fn init(_context: E, _config: Self::Config) -> Result { Ok(Self) } async fn new_batch(db: &Arc>) -> Self::Unmerkleized { let _guard = db.read().await; TestUnmerkleized } fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool { true } async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> { Ok(()) } async fn sync_target(&self) -> Self::SyncTarget {} async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> { Ok(()) } } impl ManagedDb for OneStepRewindDb { type Unmerkleized = TestUnmerkleized; type Merkleized = TestMerkleized; type Error = Infallible; type Config = (); type SyncTarget = (); async fn init(_context: E, _config: Self::Config) -> Result { Ok(Self) } async fn new_batch(_db: &Arc>) -> Self::Unmerkleized { TestUnmerkleized } fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool { true } async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> { Ok(()) } async fn sync_target(&self) -> Self::SyncTarget {} async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> { Ok(()) } fn max_rewind_depth() -> Option { Some(1) } } impl ManagedDb for ThreeStepRewindDb { type Unmerkleized = TestUnmerkleized; type Merkleized = TestMerkleized; type Error = Infallible; type Config = (); type SyncTarget = (); async fn init(_context: E, _config: Self::Config) -> Result { Ok(Self) } async fn new_batch(_db: &Arc>) -> Self::Unmerkleized { TestUnmerkleized } fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool { true } async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> { Ok(()) } async fn sync_target(&self) -> Self::SyncTarget {} async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> { Ok(()) } fn max_rewind_depth() -> Option { Some(3) } } impl ManagedDb for CountingRewindDb { type Unmerkleized = TestUnmerkleized; type Merkleized = TestMerkleized; type Error = Infallible; type Config = (); type SyncTarget = u64; async fn init(_context: E, _config: Self::Config) -> Result { unreachable!("CountingRewindDb is constructed directly in tests") } async fn new_batch(_db: &Arc>) -> Self::Unmerkleized { TestUnmerkleized } fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool { true } async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> { Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { self.current_target } async fn rewind_to_target(&mut self, target: Self::SyncTarget) -> Result<(), Self::Error> { self.current_target = target; self.rewind_count += 1; Ok(()) } } struct BlockingFinalizeDb { started: Option>, release: Option>, } impl BlockingFinalizeDb { fn new(started: oneshot::Sender<()>, release: oneshot::Receiver<()>) -> Self { Self { started: Some(started), release: Some(release), } } } #[derive(Debug)] struct TestFinalizeError; struct FailingFinalizeDb; struct SlowSyncDb { final_target: u64, } struct RejectDuplicateTargetSyncDb { final_target: u64, } struct StaleReachedSyncDb { final_target: u64, } struct FastSyncDb { final_target: u64, } struct ImmediateStateSyncDb; struct FailingStateSyncDb; struct MismatchedTargetSyncDb { final_target: u64, } struct FinishClosedSyncDb { final_target: u64, } struct ObservedSlowSyncDb { final_target: u64, } struct ObservedFastSyncDb { final_target: u64, } struct DistinctObservedFastSyncDb { final_target: u64, } #[derive(Clone)] struct SlowSyncController { release: Arc, } #[derive(Clone)] struct FastSyncObserver { ready: Arc, update_count: Arc, } impl ManagedDb for FailingFinalizeDb { type Unmerkleized = TestUnmerkleized; type Merkleized = TestMerkleized; type Error = TestFinalizeError; type Config = (); type SyncTarget = (); async fn init(_context: E, _config: Self::Config) -> Result { Ok(Self) } async fn new_batch(_db: &Arc>) -> Self::Unmerkleized { TestUnmerkleized } fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool { true } async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> { Err(TestFinalizeError) } async fn sync_target(&self) -> Self::SyncTarget {} async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> { Ok(()) } } #[test] fn single_db_set_reports_unbounded_rewind_depth() { let rewind_depth = > as DatabaseSet>::max_rewind_depth(); assert_eq!(rewind_depth, None); } #[test] fn single_db_set_reports_one_step_rewind_depth() { let rewind_depth = > as DatabaseSet< deterministic::Context, >>::max_rewind_depth(); assert_eq!(rewind_depth, Some(1)); } #[test] fn tuple_db_set_uses_most_restrictive_finite_rewind_depth() { type DbSet = ( Arc>, Arc>, Arc>, ); let rewind_depth = >::max_rewind_depth(); assert_eq!(rewind_depth, Some(1)); } #[test] fn rewind_window_assertion_accepts_equal_pending_acks_and_rewind_depth() { assert_rewind_window_safety::>>( NonZeroUsize::new(1).unwrap(), ); } #[test] #[should_panic(expected = "marshal max_pending_acks=2 exceeds database_set.max_rewind_depth=1")] fn rewind_window_assertion_panics_when_pending_acks_exceed_rewind_depth() { assert_rewind_window_safety::>>( NonZeroUsize::new(2).unwrap(), ); } #[test] fn tuple_rewind_to_targets_skips_already_aligned_databases() { deterministic::Runner::default().start(|_context| async move { type DbSet = ( Arc>, Arc>, ); let left = Arc::new(AsyncRwLock::new(CountingRewindDb { current_target: 2, rewind_count: 0, })); let right = Arc::new(AsyncRwLock::new(CountingRewindDb { current_target: 1, rewind_count: 0, })); let databases: DbSet = (left.clone(), right.clone()); >::rewind_to_targets(&databases, (1, 1)) .await; let left = left.read().await; assert_eq!(left.current_target, 1); assert_eq!(left.rewind_count, 1); let right = right.read().await; assert_eq!(right.current_target, 1); assert_eq!(right.rewind_count, 0); }); } impl ManagedDb for BlockingFinalizeDb { type Unmerkleized = TestUnmerkleized; type Merkleized = TestMerkleized; type Error = Infallible; type Config = (); type SyncTarget = (); async fn init(_context: E, _config: Self::Config) -> Result { unreachable!("BlockingFinalizeDb is constructed directly in tests") } async fn new_batch(_db: &Arc>) -> Self::Unmerkleized { TestUnmerkleized } fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool { true } async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> { if let Some(started) = self.started.take() { let _ = started.send(()); } if let Some(release) = self.release.take() { let _ = release.await; } Ok(()) } async fn sync_target(&self) -> Self::SyncTarget {} async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> { Ok(()) } } impl ManagedDb for SlowSyncDb { type Unmerkleized = TestUnmerkleized; type Merkleized = TestMerkleized; type Error = Infallible; type Config = (); type SyncTarget = u64; async fn init(_context: E, _config: Self::Config) -> Result { unreachable!("SlowSyncDb is only constructed through state sync in tests") } async fn new_batch(_db: &Arc>) -> Self::Unmerkleized { TestUnmerkleized } fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool { true } async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> { Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { self.final_target } async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> { Ok(()) } } impl ManagedDb for RejectDuplicateTargetSyncDb { type Unmerkleized = TestUnmerkleized; type Merkleized = TestMerkleized; type Error = Infallible; type Config = (); type SyncTarget = u64; async fn init(_context: E, _config: Self::Config) -> Result { unreachable!( "RejectDuplicateTargetSyncDb is only constructed through state sync in tests" ) } async fn new_batch(_db: &Arc>) -> Self::Unmerkleized { TestUnmerkleized } fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool { true } async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> { Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { self.final_target } async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> { Ok(()) } } impl ManagedDb for FastSyncDb { type Unmerkleized = TestUnmerkleized; type Merkleized = TestMerkleized; type Error = Infallible; type Config = (); type SyncTarget = u64; async fn init(_context: E, _config: Self::Config) -> Result { unreachable!("FastSyncDb is only constructed through state sync in tests") } async fn new_batch(_db: &Arc>) -> Self::Unmerkleized { TestUnmerkleized } fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool { true } async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> { Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { self.final_target } async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> { Ok(()) } } impl ManagedDb for FailingStateSyncDb { type Unmerkleized = TestUnmerkleized; type Merkleized = TestMerkleized; type Error = Infallible; type Config = (); type SyncTarget = u64; async fn init(_context: E, _config: Self::Config) -> Result { unreachable!("FailingStateSyncDb is only constructed through state sync in tests") } async fn new_batch(_db: &Arc>) -> Self::Unmerkleized { TestUnmerkleized } fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool { true } async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> { Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { 0 } async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> { Ok(()) } } impl ManagedDb for MismatchedTargetSyncDb { type Unmerkleized = TestUnmerkleized; type Merkleized = TestMerkleized; type Error = Infallible; type Config = (); type SyncTarget = u64; async fn init(_context: E, _config: Self::Config) -> Result { unreachable!("MismatchedTargetSyncDb is only constructed through state sync in tests") } async fn new_batch(_db: &Arc>) -> Self::Unmerkleized { TestUnmerkleized } fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool { true } async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> { Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { self.final_target } async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> { Ok(()) } } impl ManagedDb for ImmediateStateSyncDb { type Unmerkleized = TestUnmerkleized; type Merkleized = TestMerkleized; type Error = Infallible; type Config = (); type SyncTarget = u64; async fn init(_context: E, _config: Self::Config) -> Result { unreachable!("ImmediateStateSyncDb is only constructed through state sync in tests") } async fn new_batch(_db: &Arc>) -> Self::Unmerkleized { TestUnmerkleized } fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool { true } async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> { Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { 0 } async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> { Ok(()) } } impl ManagedDb for FinishClosedSyncDb { type Unmerkleized = TestUnmerkleized; type Merkleized = TestMerkleized; type Error = Infallible; type Config = (); type SyncTarget = u64; async fn init(_context: E, _config: Self::Config) -> Result { unreachable!("FinishClosedSyncDb is only constructed through state sync in tests") } async fn new_batch(_db: &Arc>) -> Self::Unmerkleized { TestUnmerkleized } fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool { true } async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> { Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { self.final_target } async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> { Ok(()) } } impl ManagedDb for ObservedSlowSyncDb { type Unmerkleized = TestUnmerkleized; type Merkleized = TestMerkleized; type Error = Infallible; type Config = (); type SyncTarget = u64; async fn init(_context: E, _config: Self::Config) -> Result { unreachable!("ObservedSlowSyncDb is only constructed through state sync in tests") } async fn new_batch(_db: &Arc>) -> Self::Unmerkleized { TestUnmerkleized } fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool { true } async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> { Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { self.final_target } async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> { Ok(()) } } impl ManagedDb for ObservedFastSyncDb { type Unmerkleized = TestUnmerkleized; type Merkleized = TestMerkleized; type Error = Infallible; type Config = (); type SyncTarget = u64; async fn init(_context: E, _config: Self::Config) -> Result { unreachable!("ObservedFastSyncDb is only constructed through state sync in tests") } async fn new_batch(_db: &Arc>) -> Self::Unmerkleized { TestUnmerkleized } fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool { true } async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> { Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { self.final_target } async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> { Ok(()) } } impl ManagedDb for DistinctObservedFastSyncDb { type Unmerkleized = TestUnmerkleized; type Merkleized = TestMerkleized; type Error = Infallible; type Config = (); type SyncTarget = u64; async fn init(_context: E, _config: Self::Config) -> Result { unreachable!( "DistinctObservedFastSyncDb is only constructed through state sync in tests" ) } async fn new_batch(_db: &Arc>) -> Self::Unmerkleized { TestUnmerkleized } fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool { true } async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> { Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { self.final_target } async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> { Ok(()) } } impl StateSyncDb> for SlowSyncDb where E: Send + Clock, { type SyncError = Infallible; async fn sync_db( context: E, _config: Self::Config, release: Arc, target: Self::SyncTarget, tip_updates: mpsc::Receiver, mut finish: Option>, reached_target: Option>, _sync_config: SyncEngineConfig, ) -> Result { while !release.load(Ordering::SeqCst) { context.sleep(Duration::from_millis(1)).await; } let mut final_target = target; let mut tip_updates = Some(tip_updates); loop { if let Some(reached_target) = reached_target.as_ref() { if reached_target.send(final_target).await.is_err() { break; } } context.sleep(Duration::from_millis(1)).await; if finish.is_none() && tip_updates.is_none() { break; } let finish_signal = finish.as_mut().map_or_else( || futures::future::Either::Right(futures::future::pending()), |finish_rx| futures::future::Either::Left(finish_rx.recv()), ); let update_signal = tip_updates.as_mut().map_or_else( || futures::future::Either::Right(futures::future::pending()), |update_rx| futures::future::Either::Left(update_rx.recv()), ); select! { _ = finish_signal => { break; }, update = update_signal => match update { Some(update) => { final_target = update; } None => { tip_updates = None; if finish.is_none() { break; } } }, } } Ok(Self { final_target }) } } impl StateSyncDb> for RejectDuplicateTargetSyncDb where E: Send + Clock, { type SyncError = Infallible; async fn sync_db( context: E, _config: Self::Config, release: Arc, target: Self::SyncTarget, mut tip_updates: mpsc::Receiver, mut finish: Option>, reached_target: Option>, _sync_config: SyncEngineConfig, ) -> Result { let mut final_target = target; while !release.load(Ordering::SeqCst) { match tip_updates.try_recv() { Ok(update) => { assert_ne!( update, final_target, "state sync must not send duplicate target updates" ); final_target = update; } Err(mpsc::error::TryRecvError::Empty) => {} Err(mpsc::error::TryRecvError::Disconnected) => break, } context.sleep(Duration::from_millis(1)).await; } if let Some(reached_target) = reached_target.as_ref() { let _ = reached_target.send(final_target).await; } if let Some(finish_rx) = finish.as_mut() { let _ = finish_rx.recv().await; } Ok(Self { final_target }) } } impl ManagedDb for StaleReachedSyncDb { type Unmerkleized = TestUnmerkleized; type Merkleized = TestMerkleized; type Error = Infallible; type Config = (); type SyncTarget = u64; async fn init(_context: E, _config: Self::Config) -> Result { unreachable!("StaleReachedSyncDb is only constructed through state sync in tests") } async fn new_batch(_db: &Arc>) -> Self::Unmerkleized { TestUnmerkleized } fn matches_sync_target(_batch: &Self::Merkleized, _target: &Self::SyncTarget) -> bool { true } async fn finalize(&mut self, _batch: Self::Merkleized) -> Result<(), Self::Error> { Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { self.final_target } async fn rewind_to_target(&mut self, _target: Self::SyncTarget) -> Result<(), Self::Error> { Ok(()) } } impl StateSyncDb for StaleReachedSyncDb where E: Send + Clock, { type SyncError = Infallible; async fn sync_db( context: E, _config: Self::Config, _resolver: (), target: Self::SyncTarget, mut tip_updates: mpsc::Receiver, mut finish: Option>, reached_target: Option>, _sync_config: SyncEngineConfig, ) -> Result { let update = tip_updates.recv().await.expect("expected forwarded tip"); if let Some(reached_target) = reached_target.as_ref() { let _ = reached_target.send(target).await; } let finish_signal = finish.as_mut().map_or_else( || futures::future::Either::Right(futures::future::pending()), |finish_rx| futures::future::Either::Left(finish_rx.recv()), ); select! { _ = finish_signal => Ok(Self { final_target: target }), _ = context.sleep(Duration::from_millis(10)) => { if let Some(reached_target) = reached_target.as_ref() { let _ = reached_target.send(update).await; } if let Some(finish_rx) = finish.as_mut() { let _ = finish_rx.recv().await; } Ok(Self { final_target: update, }) }, } } } impl StateSyncDb> for FastSyncDb { type SyncError = Infallible; async fn sync_db( _context: E, _config: Self::Config, done: Arc, target: Self::SyncTarget, tip_updates: mpsc::Receiver, mut finish: Option>, reached_target: Option>, _sync_config: SyncEngineConfig, ) -> Result { done.store(true, Ordering::SeqCst); let mut final_target = target; let mut tip_updates = Some(tip_updates); loop { if let Some(reached_target) = reached_target.as_ref() { if reached_target.send(final_target).await.is_err() { break; } } if finish.is_none() && tip_updates.is_none() { break; } let finish_signal = finish.as_mut().map_or_else( || futures::future::Either::Right(futures::future::pending()), |finish_rx| futures::future::Either::Left(finish_rx.recv()), ); let update_signal = tip_updates.as_mut().map_or_else( || futures::future::Either::Right(futures::future::pending()), |update_rx| futures::future::Either::Left(update_rx.recv()), ); select! { _ = finish_signal => { break; }, update = update_signal => match update { Some(update) => { final_target = update; } None => { tip_updates = None; if finish.is_none() { break; } } }, } } Ok(Self { final_target }) } } #[derive(Debug)] struct TestSyncError; #[derive(Debug)] struct FinishClosedSyncError; impl StateSyncDb for FailingStateSyncDb { type SyncError = TestSyncError; async fn sync_db( _context: E, _config: Self::Config, _resolver: (), _target: Self::SyncTarget, _tip_updates: mpsc::Receiver, _finish: Option>, _reached_target: Option>, _sync_config: SyncEngineConfig, ) -> Result { Err(TestSyncError) } } impl StateSyncDb for ImmediateStateSyncDb { type SyncError = Infallible; async fn sync_db( _context: E, _config: Self::Config, _resolver: (), _target: Self::SyncTarget, _tip_updates: mpsc::Receiver, _finish: Option>, _reached_target: Option>, _sync_config: SyncEngineConfig, ) -> Result { Ok(Self) } } impl StateSyncDb for MismatchedTargetSyncDb { type SyncError = Infallible; async fn sync_db( _context: E, _config: Self::Config, _resolver: (), target: Self::SyncTarget, _tip_updates: mpsc::Receiver, mut finish: Option>, reached_target: Option>, _sync_config: SyncEngineConfig, ) -> Result { if let Some(reached_target) = reached_target.as_ref() { let _ = reached_target.send(target).await; } if let Some(finish_rx) = finish.as_mut() { let _ = finish_rx.recv().await; } Ok(Self { final_target: target + 1, }) } } impl StateSyncDb for FinishClosedSyncDb { type SyncError = FinishClosedSyncError; async fn sync_db( _context: E, _config: Self::Config, _resolver: (), target: Self::SyncTarget, _tip_updates: mpsc::Receiver, mut finish: Option>, _reached_target: Option>, _sync_config: SyncEngineConfig, ) -> Result { let Some(finish_rx) = finish.as_mut() else { panic!("finish receiver should be provided"); }; match finish_rx.recv().await { Some(()) => Ok(Self { final_target: target, }), None => Err(FinishClosedSyncError), } } } impl StateSyncDb for ObservedSlowSyncDb where E: Send + Clock, { type SyncError = Infallible; async fn sync_db( context: E, _config: Self::Config, controller: SlowSyncController, target: Self::SyncTarget, tip_updates: mpsc::Receiver, mut finish: Option>, reached_target: Option>, _sync_config: SyncEngineConfig, ) -> Result { while !controller.release.load(Ordering::SeqCst) { context.sleep(Duration::from_millis(1)).await; } let mut final_target = target; let mut tip_updates = Some(tip_updates); let mut reported_target = None; let mut observed_update = false; loop { if let Some(update_rx) = tip_updates.as_mut() { let mut drained = 0usize; loop { match update_rx.try_recv() { Ok(update) => { drained += 1; final_target = update; observed_update = true; reported_target = None; if drained.is_multiple_of(MAX_CHANNEL_DRAIN_PER_TICK) { reschedule().await; } } Err(mpsc::error::TryRecvError::Empty) => { break; } Err(mpsc::error::TryRecvError::Disconnected) => { tip_updates = None; break; } } } } if observed_update && reported_target != Some(final_target) { if let Some(reached_target) = reached_target.as_ref() { if reached_target.send(final_target).await.is_err() { break; } } reported_target = Some(final_target); } if finish.is_none() && tip_updates.is_none() { break; } let finish_signal = finish.as_mut().map_or_else( || futures::future::Either::Right(futures::future::pending()), |finish_rx| futures::future::Either::Left(finish_rx.recv()), ); let update_signal = tip_updates.as_mut().map_or_else( || futures::future::Either::Right(futures::future::pending()), |update_rx| futures::future::Either::Left(update_rx.recv()), ); select! { _ = finish_signal => { break; }, update = update_signal => match update { Some(update) => { final_target = update; observed_update = true; reported_target = None; } None => { tip_updates = None; if finish.is_none() { break; } } }, } } Ok(Self { final_target }) } } impl StateSyncDb for ObservedFastSyncDb { type SyncError = Infallible; async fn sync_db( _context: E, _config: Self::Config, observer: FastSyncObserver, target: Self::SyncTarget, tip_updates: mpsc::Receiver, mut finish: Option>, reached_target: Option>, _sync_config: SyncEngineConfig, ) -> Result { let mut final_target = target; let mut tip_updates = Some(tip_updates); let mut reported_target = None; observer.ready.store(true, Ordering::SeqCst); loop { if reported_target != Some(final_target) { if let Some(reached_target) = reached_target.as_ref() { if reached_target.send(final_target).await.is_err() { break; } } reported_target = Some(final_target); } if finish.is_none() && tip_updates.is_none() { break; } let finish_signal = finish.as_mut().map_or_else( || futures::future::Either::Right(futures::future::pending()), |finish_rx| futures::future::Either::Left(finish_rx.recv()), ); let update_signal = tip_updates.as_mut().map_or_else( || futures::future::Either::Right(futures::future::pending()), |update_rx| futures::future::Either::Left(update_rx.recv()), ); select! { _ = finish_signal => { break; }, update = update_signal => match update { Some(update) => { observer.update_count.fetch_add(1, Ordering::SeqCst); final_target = update; reported_target = None; } None => { tip_updates = None; if finish.is_none() { break; } } }, } } Ok(Self { final_target }) } } impl StateSyncDb for DistinctObservedFastSyncDb { type SyncError = Infallible; async fn sync_db( _context: E, _config: Self::Config, observer: FastSyncObserver, target: Self::SyncTarget, tip_updates: mpsc::Receiver, mut finish: Option>, reached_target: Option>, _sync_config: SyncEngineConfig, ) -> Result { let mut final_target = target; let mut tip_updates = Some(tip_updates); let mut reported_target = None; observer.ready.store(true, Ordering::SeqCst); loop { if reported_target != Some(final_target) { if let Some(reached_target) = reached_target.as_ref() { if reached_target.send(final_target).await.is_err() { break; } } reported_target = Some(final_target); } if finish.is_none() && tip_updates.is_none() { break; } let finish_signal = finish.as_mut().map_or_else( || futures::future::Either::Right(futures::future::pending()), |finish_rx| futures::future::Either::Left(finish_rx.recv()), ); let update_signal = tip_updates.as_mut().map_or_else( || futures::future::Either::Right(futures::future::pending()), |update_rx| futures::future::Either::Left(update_rx.recv()), ); select! { _ = finish_signal => { break; }, update = update_signal => match update { Some(update) => { observer.update_count.fetch_add(1, Ordering::SeqCst); if update != final_target { final_target = update; reported_target = None; } } None => { tip_updates = None; if finish.is_none() { break; } } }, } } Ok(Self { final_target }) } } #[test] fn tuple_new_batches_queues_reads_concurrently() { deterministic::Runner::default().start(|_context| async move { let db1 = Arc::new(AsyncRwLock::new(TestDb)); let db2 = Arc::new(AsyncRwLock::new(TestDb)); let databases = (db1.clone(), db2.clone()); let writer1 = db1.write().await; let writer2 = db2.write().await; let new_batches = <(Arc>, Arc>) as DatabaseSet< deterministic::Context, >>::new_batches(&databases); pin_mut!(new_batches); assert!(new_batches.as_mut().now_or_never().is_none()); drop(writer2); { let writer2_again = db2.write(); pin_mut!(writer2_again); assert!( writer2_again.as_mut().now_or_never().is_none(), "tuple new_batches should queue reads for all databases concurrently" ); } drop(writer1); let _ = new_batches.await; }); } #[test] fn tuple_finalize_runs_databases_in_parallel() { deterministic::Runner::default().start(|_context| async move { let (started1_tx, started1_rx) = oneshot::channel(); let (started2_tx, started2_rx) = oneshot::channel(); let (release1_tx, release1_rx) = oneshot::channel(); let (release2_tx, release2_rx) = oneshot::channel(); let databases = ( Arc::new(AsyncRwLock::new(BlockingFinalizeDb::new( started1_tx, release1_rx, ))), Arc::new(AsyncRwLock::new(BlockingFinalizeDb::new( started2_tx, release2_rx, ))), ); let finalize = <( Arc>, Arc>, ) as DatabaseSet>::finalize( &databases, (TestMerkleized, TestMerkleized), ); pin_mut!(finalize); assert!(finalize.as_mut().now_or_never().is_none()); let started1 = started1_rx; let started2 = started2_rx; pin_mut!(started1); pin_mut!(started2); assert!(matches!(started1.as_mut().now_or_never(), Some(Ok(())))); assert!( matches!(started2.as_mut().now_or_never(), Some(Ok(()))), "tuple finalize should start all database finalizations concurrently" ); let _ = release1_tx.send(()); let _ = release2_tx.send(()); finalize.await; }); } #[test] #[should_panic( expected = "database finalize failed (index 1, type commonware_glue::stateful::db::tests::FailingFinalizeDb)" )] fn tuple_finalize_panic_identifies_failing_database() { deterministic::Runner::default().start(|_context| async move { let databases = ( Arc::new(AsyncRwLock::new(TestDb)), Arc::new(AsyncRwLock::new(FailingFinalizeDb)), ); <( Arc>, Arc>, ) as DatabaseSet>::finalize( &databases, (TestMerkleized, TestMerkleized), ) .await; }); } type TestAnchor = Anchor; fn anchor(n: u64) -> TestAnchor { mock_anchor(n, n as u8) } #[test] fn single_tip_update_drain_keeps_highest_recorded_target() { deterministic::Runner::default().start(|_context| async move { let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap()); let (target_tx, mut target_rx) = mpsc::channel(4); let (newer_update, newer_observed) = TipUpdate::with_observation(anchor(2), 2u64); let (older_update, older_observed) = TipUpdate::with_observation(anchor(1), 1u64); let _ = tip_tx.send(newer_update).await; let _ = tip_tx.send(older_update).await; let mut tip_updates = Some(tip_rx); let mut current_anchor = anchor(0); let mut current_target = 0u64; assert!( drain_single_tip_updates( &mut tip_updates, &target_tx, &mut current_anchor, &mut current_target, ) .await ); newer_observed .await .expect("newer update should be observed"); older_observed .await .expect("older update should also be observed"); assert_eq!(current_anchor, anchor(2)); assert_eq!(current_target, 2); assert_eq!(target_rx.recv().await, Some(2)); assert!(matches!( target_rx.try_recv(), Err(mpsc::error::TryRecvError::Empty) )); }); } #[test] fn single_tip_update_drain_advances_anchor_without_duplicate_target() { deterministic::Runner::default().start(|_context| async move { let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap()); let (target_tx, mut target_rx) = mpsc::channel(1); let (update, observed) = TipUpdate::with_observation(anchor(3), 7u64); let _ = tip_tx.send(update).await; let mut tip_updates = Some(tip_rx); let mut current_anchor = anchor(2); let mut current_target = 7u64; assert!( drain_single_tip_updates( &mut tip_updates, &target_tx, &mut current_anchor, &mut current_target, ) .await ); observed.await.expect("update should be observed"); assert_eq!(current_anchor, anchor(3)); assert_eq!(current_target, 7); assert!(matches!( target_rx.try_recv(), Err(mpsc::error::TryRecvError::Empty) )); }); } #[test] fn single_state_sync_handles_closed_tip_updates_channel() { deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move { let (tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap()); let release = Arc::new(AtomicBool::new(false)); let release_for_sync = release.clone(); let sync = context.child("single_state_sync_closed_tip_updates").spawn( move |context| async move { > as StateSyncSet< deterministic::Context, Arc, sha256::Digest, >>::sync( context, (), release_for_sync, anchor(0), 0, tip_rx, SyncEngineConfig { fetch_batch_size: NonZeroU64::new(1).unwrap(), apply_batch_size: 1, max_outstanding_requests: 1, update_channel_size: NonZeroUsize::new(1).unwrap(), max_retained_roots: 0, }, ) .await .expect("single state sync should succeed") }, ); drop(tip_tx); context.sleep(Duration::from_millis(1)).await; release.store(true, Ordering::SeqCst); let (_database, converged_anchor) = sync.await.expect("sync task should complete"); assert_eq!(converged_anchor, anchor(0)); }); } #[test] fn single_state_sync_preserves_db_error_when_target_channel_closes() { deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move { let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap()); let _ = tip_tx.send(TipUpdate::new(anchor(1), 1u64)).await; let result = > as StateSyncSet< deterministic::Context, (), sha256::Digest, >>::sync( context, (), (), anchor(0), 0, tip_rx, SyncEngineConfig { fetch_batch_size: NonZeroU64::new(1).unwrap(), apply_batch_size: 1, max_outstanding_requests: 1, update_channel_size: NonZeroUsize::new(1).unwrap(), max_retained_roots: 0, }, ) .await; assert!(matches!(result, Err(TestSyncError))); }); } #[test] fn single_state_sync_ignores_backward_tip_updates() { deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move { let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap()); let release = Arc::new(AtomicBool::new(true)); let resolver = SlowSyncController { release: release.clone(), }; let sync = context .child("single_state_sync_ignores_backward_tip_updates") .spawn(move |context| async move { > as StateSyncSet< deterministic::Context, SlowSyncController, sha256::Digest, >>::sync( context, (), resolver, anchor(0), 0, tip_rx, SyncEngineConfig { fetch_batch_size: NonZeroU64::new(1).unwrap(), apply_batch_size: 1, max_outstanding_requests: 1, update_channel_size: NonZeroUsize::new(4).unwrap(), max_retained_roots: 0, }, ) .await .expect("single state sync should succeed") }); let _ = tip_tx.send(TipUpdate::new(anchor(2), 2)).await; let _ = tip_tx.send(TipUpdate::new(anchor(1), 1)).await; drop(tip_tx); let (database, converged_anchor) = sync.await.expect("sync task should complete"); let final_target = database.read().await.final_target; assert_eq!( final_target, 2, "single-db sync target must never move backward" ); assert_eq!( converged_anchor, anchor(2), "converged anchor must remain on the highest seen tip" ); }); } #[test] fn single_state_sync_advances_anchor_without_duplicate_target_update() { deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move { let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap()); let release = Arc::new(AtomicBool::new(false)); let release_for_sync = release.clone(); let sync = context.child("single_state_sync_noop_target_update").spawn( move |context| async move { > as StateSyncSet< deterministic::Context, Arc, sha256::Digest, >>::sync( context, (), release_for_sync, anchor(7), 7, tip_rx, SyncEngineConfig { fetch_batch_size: NonZeroU64::new(1).unwrap(), apply_batch_size: 1, max_outstanding_requests: 1, update_channel_size: NonZeroUsize::new(4).unwrap(), max_retained_roots: 0, }, ) .await .expect("single state sync should succeed") }, ); let (update, observed) = TipUpdate::with_observation(anchor(9), 7); let _ = tip_tx.send(update).await; observed .await .expect("single-db coordinator should record noop target update"); release.store(true, Ordering::SeqCst); drop(tip_tx); let (database, converged_anchor) = sync.await.expect("sync task should complete"); assert_eq!(database.read().await.final_target, 7); assert_eq!(converged_anchor, anchor(9)); }); } #[test] fn single_state_sync_ignores_stale_reached_after_forwarded_tip() { deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move { let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap()); let sync = context .child("single_state_sync_stale_reached") .spawn(move |context| async move { > as StateSyncSet< deterministic::Context, (), sha256::Digest, >>::sync( context, (), (), anchor(0), 0, tip_rx, SyncEngineConfig { fetch_batch_size: NonZeroU64::new(1).unwrap(), apply_batch_size: 1, max_outstanding_requests: 1, update_channel_size: NonZeroUsize::new(4).unwrap(), max_retained_roots: 0, }, ) .await .expect("single state sync should succeed") }); let _ = tip_tx.send(TipUpdate::new(anchor(2), 2)).await; let (database, converged_anchor) = sync.await.expect("sync task should complete"); let final_target = database.read().await.final_target; assert_eq!( final_target, 2, "single-db sync must not finish on a stale reached target", ); assert_eq!( converged_anchor, anchor(2), "converged anchor must match the target the database reached", ); }); } #[test] fn tuple_state_sync_converges_before_finish() { deterministic::Runner::default().start(|context| async move { let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap()); let slow_release = Arc::new(AtomicBool::new(false)); let fast_done = Arc::new(AtomicBool::new(false)); let slow_release_for_sync = slow_release.clone(); let fast_done_for_sync = fast_done.clone(); let sync = context .child("tuple_state_sync") .spawn(move |context| async move { <(Arc>, Arc>) as StateSyncSet< deterministic::Context, (Arc, Arc), sha256::Digest, >>::sync( context, ((), ()), (slow_release_for_sync, fast_done_for_sync), anchor(0), (0, 0), tip_rx, SyncEngineConfig { fetch_batch_size: NonZeroU64::new(1).unwrap(), apply_batch_size: 1, max_outstanding_requests: 1, update_channel_size: NonZeroUsize::new(4).unwrap(), max_retained_roots: 0, }, ) .await .expect("tuple state sync should succeed") }); while !fast_done.load(Ordering::SeqCst) { context.sleep(Duration::from_millis(1)).await; } let _ = tip_tx.send(TipUpdate::new(anchor(1), (1, 1))).await; let _ = tip_tx.send(TipUpdate::new(anchor(2), (2, 2))).await; slow_release.store(true, Ordering::SeqCst); drop(tip_tx); let (synced, converged_anchor) = sync.await.expect("sync task should complete"); let slow_target = synced.0.read().await.final_target; let fast_target = synced.1.read().await.final_target; assert_eq!( slow_target, fast_target, "all databases should finish on the same converged target set" ); assert_eq!( converged_anchor.height.get(), slow_target, "returned anchor height should match the converged generation" ); }); } #[test] fn tuple_state_sync_ignores_backward_tip_updates() { deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move { let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(8).unwrap()); let slow_release = Arc::new(AtomicBool::new(false)); let fast_done = Arc::new(AtomicBool::new(false)); let slow_release_for_sync = slow_release.clone(); let fast_done_for_sync = fast_done.clone(); let sync = context .child("tuple_state_sync_ignores_backward_tip_updates") .spawn(move |context| async move { <(Arc>, Arc>) as StateSyncSet< deterministic::Context, (Arc, Arc), sha256::Digest, >>::sync( context, ((), ()), (slow_release_for_sync, fast_done_for_sync), anchor(0), (0, 0), tip_rx, SyncEngineConfig { fetch_batch_size: NonZeroU64::new(1).unwrap(), apply_batch_size: 1, max_outstanding_requests: 1, update_channel_size: NonZeroUsize::new(8).unwrap(), max_retained_roots: 0, }, ) .await .expect("tuple state sync should succeed") }); while !fast_done.load(Ordering::SeqCst) { context.sleep(Duration::from_millis(1)).await; } let _ = tip_tx.send(TipUpdate::new(anchor(2), (2, 2))).await; let _ = tip_tx.send(TipUpdate::new(anchor(1), (1, 1))).await; drop(tip_tx); context.sleep(Duration::from_millis(1)).await; slow_release.store(true, Ordering::SeqCst); let (synced, converged_anchor) = sync.await.expect("sync task should complete"); let slow_target = synced.0.read().await.final_target; let fast_target = synced.1.read().await.final_target; assert_eq!( slow_target, 2, "slow database target must never move backward" ); assert_eq!( fast_target, 2, "fast database target must never move backward" ); assert_eq!( converged_anchor, anchor(2), "converged anchor must remain on the highest seen tip" ); }); } #[test] fn tuple_state_sync_rejects_database_target_mismatch() { deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move { let (_tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap()); let fast_done = Arc::new(AtomicBool::new(false)); let result = <( Arc>, Arc>, ) as StateSyncSet< deterministic::Context, ((), Arc), sha256::Digest, >>::sync( context, ((), ()), ((), fast_done), anchor(7), (7, 7), tip_rx, SyncEngineConfig { fetch_batch_size: NonZeroU64::new(1).unwrap(), apply_batch_size: 1, max_outstanding_requests: 1, update_channel_size: NonZeroUsize::new(1).unwrap(), max_retained_roots: 0, }, ) .await; let err = match result { Ok(_) => panic!("tuple state sync should reject a mismatched database target"), Err(err) => err, }; assert!( err.contains("database targets do not match"), "error should identify the target mismatch, got: {err}" ); }); } #[test] fn tuple_state_sync_returns_db_error_instead_of_panicking_when_anchor_missing() { deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move { let (_tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap()); let result = <( Arc>, Arc>, ) as StateSyncSet>::sync( context, ((), ()), ((), ()), anchor(0), (0, 0), tip_rx, SyncEngineConfig { fetch_batch_size: NonZeroU64::new(1).unwrap(), apply_batch_size: 1, max_outstanding_requests: 1, update_channel_size: NonZeroUsize::new(1).unwrap(), max_retained_roots: 0, }, ) .await; let err = match result { Ok(_) => panic!("tuple state sync should return the database sync error"), Err(err) => err, }; assert!( err.contains("state sync failed (index 1, db"), "error should include failing database index: {err}" ); assert!( err.contains("FailingStateSyncDb"), "error should include failing database type: {err}" ); }); } #[test] fn tuple_state_sync_returns_db_error_when_other_database_waits_for_finish() { deterministic::Runner::timed(Duration::from_secs(1)).start(|context| async move { let (_tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap()); let release = Arc::new(AtomicBool::new(true)); let result = <( Arc>, Arc>, ) as StateSyncSet< deterministic::Context, (Arc, ()), sha256::Digest, >>::sync( context, ((), ()), (release, ()), anchor(0), (0, 0), tip_rx, SyncEngineConfig { fetch_batch_size: NonZeroU64::new(1).unwrap(), apply_batch_size: 1, max_outstanding_requests: 1, update_channel_size: NonZeroUsize::new(1).unwrap(), max_retained_roots: 0, }, ) .await; let err = match result { Ok(_) => panic!("tuple state sync should return the database sync error"), Err(err) => err, }; assert!( err.contains("state sync failed (index 1, db"), "error should include failing database index: {err}" ); assert!( err.contains("FailingStateSyncDb"), "error should include failing database type: {err}" ); }); } #[test] fn tuple_state_sync_preserves_original_failure_when_peer_finish_channel_closes() { deterministic::Runner::timed(Duration::from_secs(1)).start(|context| async move { let (_tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(1).unwrap()); let result = <( Arc>, Arc>, ) as StateSyncSet>::sync( context, ((), ()), ((), ()), anchor(0), (0, 0), tip_rx, SyncEngineConfig { fetch_batch_size: NonZeroU64::new(1).unwrap(), apply_batch_size: 1, max_outstanding_requests: 1, update_channel_size: NonZeroUsize::new(1).unwrap(), max_retained_roots: 0, }, ) .await; let err = match result { Ok(_) => panic!("tuple state sync should return the database sync error"), Err(err) => err, }; assert!( err.contains("state sync failed (index 1, db"), "error should include failing database index, got: {err}", ); assert!( err.contains("FailingStateSyncDb"), "error should include failing database type, got: {err}", ); }); } #[test] fn coordinator_rejects_stale_reached_event_from_older_generation() { let mut state = CoordinatorState::new(2, anchor(0), (0u64, 0u64)); state.record_tip_update(anchor(1), (1, 1)); match state.next_action() { CoordinatorAction::Dispatch { generation, targets: (left, right), } => { assert_eq!(generation, 1, "coordinator should dispatch generation 1"); assert_eq!((left, right), (1, 1)); } CoordinatorAction::Wait => panic!("coordinator should dispatch the newer tip"), CoordinatorAction::Converged { anchor, .. } => { panic!("coordinator converged too early at {anchor:?}") } } // This reached event belongs to generation 0 but arrives after the // coordinator has already advanced the database to generation 1. state.record_reached(1, 0); // Only database 0 has actually reached generation 1 so far. state.record_reached(0, 1); match state.next_action() { CoordinatorAction::Wait => {} CoordinatorAction::Dispatch { targets, .. } => { panic!( "coordinator should wait for a fresh reached event, got dispatch {targets:?}" ) } CoordinatorAction::Converged { anchor, .. } => { panic!("stale reached event must not allow convergence at {anchor:?}") } } } #[test] fn coordinator_dispatches_pending_tip_before_converging() { let mut state = CoordinatorState::new(2, anchor(0), (0u64, 0u64)); state.record_tip_update(anchor(1), (1, 1)); match state.next_action() { CoordinatorAction::Dispatch { generation, targets: (left, right), } => { assert_eq!(generation, 1, "coordinator should dispatch generation 1"); assert_eq!((left, right), (1, 1)); } CoordinatorAction::Wait => panic!("coordinator should dispatch the newer tip"), CoordinatorAction::Converged { anchor, .. } => { panic!("coordinator converged too early at {anchor:?}") } } state.record_reached(0, 1); state.record_reached(1, 1); state.record_tip_update(anchor(2), (2, 2)); match state.next_action() { CoordinatorAction::Dispatch { generation, targets: (left, right), } => { assert_eq!(generation, 2, "coordinator should advance to generation 2"); assert_eq!((left, right), (2, 2)); } CoordinatorAction::Wait => panic!("coordinator should dispatch the pending tip"), CoordinatorAction::Converged { anchor, .. } => { panic!("coordinator should not converge with a pending tip: {anchor:?}") } } } #[test] fn tuple_state_sync_stops_updates_after_reached_until_regroup() { deterministic::Runner::default().start(|context| async move { let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(32).unwrap()); let slow_release = Arc::new(AtomicBool::new(true)); let fast_ready = Arc::new(AtomicBool::new(false)); let fast_update_count = Arc::new(AtomicUsize::new(0)); let slow_resolver = SlowSyncController { release: slow_release.clone(), }; let fast_resolver = FastSyncObserver { ready: fast_ready.clone(), update_count: fast_update_count.clone(), }; let sync = context.child("tuple_state_sync_algorithm").spawn( move |context| async move { <( Arc>, Arc>, ) as StateSyncSet< deterministic::Context, (SlowSyncController, FastSyncObserver), sha256::Digest, >>::sync( context, ((), ()), (slow_resolver, fast_resolver), anchor(0), (0, 0), tip_rx, SyncEngineConfig { fetch_batch_size: NonZeroU64::new(1).unwrap(), apply_batch_size: 1, max_outstanding_requests: 1, update_channel_size: NonZeroUsize::new(1).unwrap(), max_retained_roots: 0, }, ) .await .expect("tuple state sync should succeed") }, ); while !fast_ready.load(Ordering::SeqCst) { context.sleep(Duration::from_millis(1)).await; } for target in 1..=16u64 { let _ = tip_tx.send(TipUpdate::new(anchor(target), (target, target))).await; } drop(tip_tx); let (synced, converged_anchor) = sync.await.expect("sync task should complete"); let slow_target = synced.0.read().await.final_target; let fast_target = synced.1.read().await.final_target; assert_eq!( slow_target, fast_target, "all databases should finish on the same converged target set" ); assert_eq!( converged_anchor.height.get(), slow_target, "returned anchor height should match the converged generation" ); assert_eq!( fast_update_count.load(Ordering::SeqCst), 1, "a reached database must not receive tip updates before regroup; only regroup retarget should be observed" ); }); } #[test] fn tuple_state_sync_allows_noop_database_while_other_catches_up() { deterministic::Runner::default().start(|context| async move { let (tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap()); let slow_release = Arc::new(AtomicBool::new(false)); let fast_ready = Arc::new(AtomicBool::new(false)); let fast_update_count = Arc::new(AtomicUsize::new(0)); let target = 7u64; let sync = context.child("tuple_state_sync_noop").spawn({ let slow_resolver = slow_release.clone(); let fast_resolver = FastSyncObserver { ready: fast_ready.clone(), update_count: fast_update_count.clone(), }; move |context| async move { <( Arc>, Arc>, ) as StateSyncSet< deterministic::Context, (Arc, FastSyncObserver), sha256::Digest, >>::sync( context, ((), ()), (slow_resolver, fast_resolver), anchor(target), (target, target), tip_rx, SyncEngineConfig { fetch_batch_size: NonZeroU64::new(1).unwrap(), apply_batch_size: 1, max_outstanding_requests: 1, update_channel_size: NonZeroUsize::new(1).unwrap(), max_retained_roots: 0, }, ) .await .expect("tuple state sync should succeed") } }); while !fast_ready.load(Ordering::SeqCst) { context.sleep(Duration::from_millis(1)).await; } drop(tip_tx); slow_release.store(true, Ordering::SeqCst); let (synced, converged_anchor) = sync.await.expect("sync task should complete"); let slow_target = synced.0.read().await.final_target; let fast_target = synced.1.read().await.final_target; assert_eq!(slow_target, target); assert_eq!(fast_target, target); assert_eq!(converged_anchor, anchor(target)); assert_eq!( fast_update_count.load(Ordering::SeqCst), 0, "already-at-target database should not receive tip updates" ); }); } #[test] fn tuple_state_sync_regroup_completes_when_database_target_is_unchanged() { deterministic::Runner::timed(Duration::from_secs(5)).start(|context| async move { let (mut tip_tx, tip_rx) = ring::channel(NonZeroUsize::new(4).unwrap()); let slow_release = Arc::new(AtomicBool::new(false)); let fast_ready = Arc::new(AtomicBool::new(false)); let fast_update_count = Arc::new(AtomicUsize::new(0)); let sync = context .child("tuple_state_sync_regroup_unchanged_target") .spawn({ let slow_resolver = slow_release.clone(); let fast_resolver = FastSyncObserver { ready: fast_ready.clone(), update_count: fast_update_count.clone(), }; move |context| async move { <( Arc>, Arc>, ) as StateSyncSet< deterministic::Context, (Arc, FastSyncObserver), sha256::Digest, >>::sync( context, ((), ()), (slow_resolver, fast_resolver), anchor(0), (0, 7), tip_rx, SyncEngineConfig { fetch_batch_size: NonZeroU64::new(1).unwrap(), apply_batch_size: 1, max_outstanding_requests: 1, update_channel_size: NonZeroUsize::new(4).unwrap(), max_retained_roots: 0, }, ) .await .expect("tuple state sync should succeed") } }); while !fast_ready.load(Ordering::SeqCst) { context.sleep(Duration::from_millis(1)).await; } let _ = tip_tx.send(TipUpdate::new(anchor(9), (9, 7))).await; context.sleep(Duration::from_millis(1)).await; slow_release.store(true, Ordering::SeqCst); drop(tip_tx); let (synced, converged_anchor) = sync.await.expect("sync task should complete"); let slow_target = synced.0.read().await.final_target; let fast_target = synced.1.read().await.final_target; assert_eq!(slow_target, 9); assert_eq!(fast_target, 7); assert_eq!(converged_anchor, anchor(9)); assert_eq!( fast_update_count.load(Ordering::SeqCst), 0, "the unchanged-target database should not receive duplicate target updates", ); }); } #[derive(Default)] struct AttachDb1; #[derive(Default)] struct AttachDb2; #[derive(Clone)] struct RecordingResolver { id: &'static str, log: Arc>>, } impl RecordingResolver { fn new( id: &'static str, log: Arc>>, ) -> Self { Self { id, log } } } impl AttachableResolver for RecordingResolver { async fn attach_database(&self, _db: Arc>) { self.log.lock().push(self.id); } } #[test] fn single_db_attach_calls_single_resolver() { deterministic::Runner::default().start(|_| async move { let log = Arc::new(commonware_utils::sync::Mutex::new(Vec::new())); let resolver = RecordingResolver::new("db1", log.clone()); let db = Arc::new(AsyncRwLock::new(AttachDb1)); resolver.attach_databases(db).await; assert_eq!(&*log.lock(), &["db1"]); }); } #[test] fn tuple_attach_is_index_stable() { deterministic::Runner::default().start(|_| async move { let log = Arc::new(commonware_utils::sync::Mutex::new(Vec::new())); let resolvers = ( RecordingResolver::new("resolver_0", log.clone()), RecordingResolver::new("resolver_1", log.clone()), ); let databases = ( Arc::new(AsyncRwLock::new(AttachDb1)), Arc::new(AsyncRwLock::new(AttachDb2)), ); resolvers.attach_databases(databases).await; assert_eq!(&*log.lock(), &["resolver_0", "resolver_1"]); }); } #[test] fn heterogeneous_tuple_attach_compiles() { deterministic::Runner::default().start(|_| async move { let log = Arc::new(commonware_utils::sync::Mutex::new(Vec::new())); let resolvers = ( RecordingResolver::new("db1", log.clone()), RecordingResolver::new("db2", log.clone()), ); let databases = ( Arc::new(AsyncRwLock::new(AttachDb1)), Arc::new(AsyncRwLock::new(AttachDb2)), ); resolvers.attach_databases(databases).await; assert_eq!(&*log.lock(), &["db1", "db2"]); }); } }