//! Compact sync for compact-storage qmdbs. //! //! Compact sync does not transfer or reconstruct the full historical operation log. Instead, the //! source serves the minimum authenticated state needed to recreate the latest committed compact db //! state: //! //! - the total committed leaf count, //! - the compact frontier's pinned nodes for that leaf count, //! - the final commit operation, and //! - a proof authenticating that final commit against the requested root. //! //! # What compact dbs store //! //! A compact db persists two pieces of state that must always describe the same committed tip: //! //! 1. the compact Merkle frontier (persisted by [`crate::merkle::compact`]), and //! 2. a db-level witness for the last commit (persisted by `qmdb::compact::witness`). //! //! The witness exists because only the db layer knows how to encode and decode the typed commit //! operation. Without it, a compact db could recover its root and continue appending, but it could //! not serve compact sync to another node. //! //! # When compact state changes //! //! The servable compact state advances only on durable persistence: //! //! - [`sync`] verifies the final commit proof and compact frontier before database construction. //! - [`Database::from_validated_state`] reconstructs the already-validated state in memory only. //! - Compact db-local commits persist the frontier and witness together during `sync`/`commit`. //! - `rewind` restores both the frontier and the witness from the previous slot together. //! //! Unsynced in-memory mutations are therefore intentionally not servable: `current_target()` and //! compact-state responses lag behind `apply_batch()` until the next durable sync. //! //! # Safety and invariants //! //! The compact path relies on these invariants: //! //! - the served commit proof must authenticate the final commit at `leaf_count - 1`, //! - the frontier pins and witness must move together in the same ping-pong slot, //! - reopen and rewind must re-verify the persisted witness against the root restored from that //! slot, and //! - reconstructed state must not be persisted until the db recomputes the requested root locally. //! //! If those invariants are violated by missing or corrupted persisted data, compact db reopen fails //! with `DataCorrupted` rather than silently serving or restoring mismatched state. use crate::{ merkle::{Family, Location, Proof}, qmdb::{ self, any::{value::ValueEncoding, FixedValue, VariableValue}, immutable::{ fixed::{Db as ImmutableFixedDb, Operation as ImmutableFixedOp}, variable::{Db as ImmutableVariableDb, Operation as ImmutableVariableOp}, CompactDb as ImmutableCompactDb, Operation as ImmutableOp, }, keyless::{ fixed::{Db as KeylessFixedDb, Operation as KeylessFixedOp}, variable::{Db as KeylessVariableDb, Operation as KeylessVariableOp}, CompactDb as KeylessCompactDb, Operation as KeylessOp, }, operation::Key, sync::{EngineError, Error}, verify_proof, }, translator::Translator, }; use commonware_codec::{ Encode, EncodeSize, Error as CodecError, RangeCfg, Read, ReadExt as _, Write, }; use commonware_cryptography::{Digest, Hasher}; use commonware_parallel::Strategy; use commonware_runtime::{Buf, BufMut, Clock, Metrics, Storage, Supervisor}; use commonware_utils::{channel::oneshot, sync::AsyncRwLock, Array}; use std::{future::Future, num::NonZeroU64, sync::Arc}; /// Compact-sync target for a compact-storage database. /// /// Compact sync authenticates only the final committed root and total leaf count. Unlike replay /// sync, there is no lower replay bound here because compact sync does not transfer or reconstruct /// historical operations. #[derive(Debug)] pub struct Target { /// Authenticated root of the committed compact state. pub root: D, /// Total committed operations/leaves in that state. pub leaf_count: Location, } impl Target { const INVALID_LEAF_COUNT: &'static str = "leaf_count must be in 1..=MAX_LEAVES"; /// Create a compact-sync target. pub const fn new(root: D, leaf_count: Location) -> Self { Self { root, leaf_count } } /// Validate a compact target that may have been constructed programmatically. pub fn validate(&self) -> Result<(), &'static str> { if !self.leaf_count.is_valid() || self.leaf_count == 0 { return Err(Self::INVALID_LEAF_COUNT); } Ok(()) } } impl Clone for Target { fn clone(&self) -> Self { Self { root: self.root, leaf_count: self.leaf_count, } } } impl PartialEq for Target { fn eq(&self, other: &Self) -> bool { self.root == other.root && self.leaf_count == other.leaf_count } } impl Eq for Target {} impl Write for Target { fn write(&self, buf: &mut impl BufMut) { self.root.write(buf); self.leaf_count.write(buf); } } impl EncodeSize for Target { fn encode_size(&self) -> usize { self.root.encode_size() + self.leaf_count.encode_size() } } impl Read for Target { type Cfg = (); fn read_cfg(buf: &mut impl Buf, _: &()) -> Result { let root = D::read(buf)?; let leaf_count = Location::::read(buf)?; let target = Self { root, leaf_count }; target.validate().map_err(|reason| { CodecError::Invalid("storage::qmdb::sync::compact::Target", reason) })?; Ok(target) } } #[cfg(feature = "arbitrary")] impl arbitrary::Arbitrary<'_> for Target where D: for<'a> arbitrary::Arbitrary<'a>, { fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result { let root = u.arbitrary()?; let leaf_count = Location::new(u.int_in_range(1..=*F::MAX_LEAVES)?); Ok(Self { root, leaf_count }) } } /// Authenticated state for initializing a compact-storage database at a target root. #[derive(Clone, Debug)] pub struct State { /// Total number of operations/leaves in the target database. pub leaf_count: Location, /// Pinned Merkle nodes for the current frontier. pub pinned_nodes: Vec, /// The final commit operation at `leaf_count - 1`. pub last_commit_op: Op, /// Proof authenticating `last_commit_op` against the target root. pub last_commit_proof: Proof, } /// Compact state that has been validated against a target root. /// /// This carries the original compact state plus the values derived while validating it. Compact /// database constructors still build their storage-backed Merkle state and witness cache, but they /// should use these values instead of re-deriving them from peer-provided state. #[derive(Clone, Debug)] pub struct ValidatedState { /// The compact state fetched from a peer after validation. pub state: State, /// The target root that `state` was validated against. pub root: D, /// The inactivity floor derived from the final commit operation. pub inactivity_floor: Location, } impl ValidatedState { const fn new(state: State, root: D, inactivity_floor: Location) -> Self { Self { state, root, inactivity_floor, } } } impl Write for State where Op: Write, { fn write(&self, buf: &mut impl BufMut) { self.leaf_count.write(buf); self.pinned_nodes.write(buf); self.last_commit_op.write(buf); self.last_commit_proof.write(buf); } } /// Result from a compact-state fetch. pub struct FetchResult { /// The fetched compact state. pub state: State, /// Callback used to report whether downstream accepted the state. pub callback: Option>, } impl std::fmt::Debug for FetchResult { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("FetchResult") .field("state", &self.state) .field("callback", &self.callback.as_ref().map(|_| "")) .finish() } } impl From> for FetchResult { fn from(state: State) -> Self { Self { state, callback: None, } } } impl EncodeSize for State where Op: EncodeSize, { fn encode_size(&self) -> usize { self.leaf_count.encode_size() + self.pinned_nodes.encode_size() + self.last_commit_op.encode_size() + self.last_commit_proof.encode_size() } } impl Read for State where Op: Read, { type Cfg = (RangeCfg, Op::Cfg, usize); fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result { let (pinned_nodes_cfg, op_cfg, max_proof_digests) = cfg; Ok(Self { leaf_count: Location::::read(buf)?, pinned_nodes: Vec::::read_cfg(buf, &(*pinned_nodes_cfg, ()))?, last_commit_op: Op::read_cfg(buf, op_cfg)?, last_commit_proof: Proof::::read_cfg(buf, max_proof_digests)?, }) } } /// Resolver-side errors for compact state serving. #[derive(Debug, thiserror::Error)] pub enum ServeError { /// The source database returned an error while building compact state. #[error("compact source database error: {0}")] Database(#[from] qmdb::Error), /// The caller requested a target that compact sync cannot serve. #[error("invalid compact target: {0}")] InvalidTarget(&'static str), /// The resolver wrapper did not currently hold a database. #[error("compact source missing")] MissingSource, /// The caller requested a target different from the source's current witness. #[error("stale compact target - requested {requested:?}, current {current:?}")] StaleTarget { requested: Target, current: Target, }, } /// Trait for compact sync fetches from a source database. #[allow(clippy::type_complexity)] pub trait Resolver: Send + Sync + Clone + 'static { /// The merkle family backing the resolver's proofs. type Family: Family; /// The digest type used in proofs returned by the resolver. type Digest: Digest; /// The type of operations returned by the resolver. type Op; /// The error type returned by the resolver. type Error: std::error::Error + Send + 'static; /// Fetch the authenticated state for `target`. fn get_compact_state<'a>( &'a self, target: Target, ) -> impl Future, Self::Error>> + Send + 'a; } /// Marker trait for resolvers whose associated types match a specific compact-sync database. /// /// This is a trait-alias pattern used to avoid repeating /// `Resolver`. /// Blanket-implemented for any matching [`Resolver`], so callers never implement this directly. pub trait CompactDbResolver: Resolver { } impl CompactDbResolver for R where DB: Database, R: Resolver, { } /// Database types that can be initialized directly from compact state. pub trait Database: Sized + Send { type Family: Family; type Op: Encode + Send; type Config: Clone; type Digest: Digest; type Context: Storage + Clock + Metrics; type Hasher: Hasher; /// Build a database from authenticated state in memory. /// /// The caller has already validated `last_commit_proof` and the compact frontier against the /// requested target root and passes the derived validation artifacts with the state. This /// constructor must not durably persist anything; persistence happens only after the caller /// re-checks that `Self::root()` matches the target root. fn from_validated_state( context: Self::Context, config: Self::Config, state: ValidatedState, ) -> impl Future>> + Send; /// Return the inactivity floor if the operation is a commit. fn inactivity_floor(op: &Self::Op) -> Option>; /// Get the root digest for final verification. fn root(&self) -> Self::Digest; /// Persist the compact-initialized state once the caller has verified its root. fn persist_compact_state( &self, ) -> impl Future>> + Send; } /// Configuration for compact synchronization into a compact-storage database. pub struct Config where DB: Database, R: CompactDbResolver, { /// Runtime context for creating database components. pub context: DB::Context, /// Source resolver for fetching compact authenticated state. pub resolver: R, /// Sync target (root digest and total leaf count). pub target: Target, /// Database-specific configuration. pub db_config: DB::Config, } /// Create/open a compact-storage database and initialize it from compact authenticated state. /// /// Unlike streaming sync, compact sync jumps directly to `target.leaf_count`. This path /// authenticates the final commit and frontier state for the target root rather than replaying a /// retained operation range. /// /// Verification order: /// 1. Fetch the proposed compact state for `target`. /// 2. Verify the final commit proof against `target.root`. /// 3. Rebuild the compact frontier in memory and compare its root against `target.root`. /// 4. Build the compact db from that already-validated state. /// 5. Assert the db root still matches and persist the state. /// /// Any failure leaves the local compact db unopened or unchanged on disk. pub async fn sync( config: Config, ) -> Result> where DB: Database, R: CompactDbResolver, { let target = config.target; target .validate() .map_err(|reason| Error::Engine(EngineError::InvalidCompactTarget(reason)))?; // Compact sync has no request scheduler, so this loop is its retry boundary for bad peer // responses. Resolver errors and local construction failures remain terminal. loop { let FetchResult { state, callback } = config .resolver .get_compact_state(target.clone()) .await .map_err(Error::Resolver)?; // Validation failures describe a bad compact response. Reject it if the resolver supplied // feedback, then fetch another candidate. let validated_state = match validate_compact_state::(&target, state) { Ok(state) => state, Err(err) => { if let Some(callback) = callback { let _ = callback.send(false); } tracing::debug!(error = ?err, "compact state failed validation, will retry"); continue; } }; // The peer response has already authenticated the final commit and frontier. From here, // construction should only fail for local database/storage reasons; a root mismatch is a // bug in this path. let db = DB::from_validated_state( config.context.child("compact"), config.db_config.clone(), validated_state, ) .await .map_err(Error::Database)?; assert_eq!( db.root(), target.root, "validated compact state reconstructed unexpected root", ); if let Some(callback) = callback { let _ = callback.send(true); } db.persist_compact_state().await?; return Ok(db); } } /// Validate the peer-provided compact state before constructing local database storage. fn validate_compact_state( target: &Target, state: State, ) -> CompactFrontierValidation where DB: Database, { if state.leaf_count != target.leaf_count { return Err(EngineError::UnexpectedLeafCount { expected: target.leaf_count, actual: state.leaf_count, }); } let hasher = qmdb::hasher::(); let last_commit_loc = Location::new(*state.leaf_count - 1); if !verify_proof( &hasher, &state.last_commit_proof, last_commit_loc, std::slice::from_ref(&state.last_commit_op), &target.root, ) { return Err(EngineError::InvalidProof); } validate_compact_frontier::(target, state) } /// Result of validating a peer-provided compact frontier. type CompactFrontierValidation = Result< ValidatedState<::Family, ::Op, ::Digest>, EngineError<::Family, ::Digest>, >; /// Validate that a peer-provided compact frontier authenticates the requested target root. fn validate_compact_frontier( target: &Target, state: State, ) -> CompactFrontierValidation where DB: Database, { // The final commit is the only operation carried in compact state. Its floor determines which // peaks are inactive when authenticating the compact frontier root. let last_commit_loc = Location::new(*state.leaf_count - 1); let Some(inactivity_floor_loc) = DB::inactivity_floor(&state.last_commit_op) else { return Err(EngineError::InvalidProof); }; if inactivity_floor_loc > last_commit_loc { return Err(EngineError::InvalidProof); } // Rebuild a disposable Merkle view from the pinned frontier before opening any database // storage. Invalid pin counts or inactive peak layouts are treated as bad peer proofs. let mem = crate::merkle::mem::Mem::::init(crate::merkle::mem::Config { nodes: Vec::new(), pruning_boundary: state.leaf_count, pinned_nodes: state.pinned_nodes.clone(), }) .map_err(|_| EngineError::InvalidProof)?; let hasher = qmdb::hasher::(); let inactive_peaks = DB::Family::inactive_peaks( DB::Family::location_to_position(state.leaf_count), inactivity_floor_loc, ); let actual = mem .root(&hasher, inactive_peaks) .map_err(|_| EngineError::InvalidProof)?; if actual != target.root { return Err(EngineError::RootMismatch { expected: target.root, actual, }); } Ok(ValidatedState::new( state, target.root, inactivity_floor_loc, )) } async fn fetch_state_from_full_source( target: Target, current_target: Current, historical_proof: Hist, pinned_nodes_at: Pins, ) -> Result, ServeError> where F: Family, D: Digest, Current: FnOnce() -> CurrentFut, CurrentFut: Future>, Hist: FnOnce(Location, Location) -> HistFut, HistFut: Future, Vec), qmdb::Error>>, Pins: FnOnce(Location) -> PinsFut, PinsFut: Future, qmdb::Error>>, { // Full sources do not cache a compact witness. Instead, derive the compact payload on demand // from the current tip commit plus the frontier pins at the requested tree size. target.validate().map_err(ServeError::InvalidTarget)?; let current = current_target().await; if target.root != current.root || target.leaf_count != current.leaf_count { return Err(ServeError::StaleTarget { requested: target, current, }); } let leaf_count = target.leaf_count; let last_commit_loc = Location::new(*leaf_count - 1); let (last_commit_proof, mut operations) = historical_proof(leaf_count, last_commit_loc) .await .map_err(ServeError::Database)?; // Compact sync always authenticates exactly the final commit leaf. let last_commit_op = operations .pop() .ok_or(ServeError::Database(qmdb::Error::DataCorrupted( "missing last commit operation", )))?; let pinned_nodes = pinned_nodes_at(leaf_count) .await .map_err(ServeError::Database)?; Ok(State { leaf_count, pinned_nodes, last_commit_op, last_commit_proof, }) } // Resolver impls for full keyless databases. These synthesize compact state by querying the // historical tip proof and current frontier pins from the full source. macro_rules! impl_compact_resolver_keyless { ($db:ident, $op:ident, $val_bound:ident) => { impl Resolver for Arc<$db> where F: Family, E: crate::Context, V: $val_bound + Send + Sync + 'static, H: Hasher, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = ServeError; async fn get_compact_state( &self, target: Target, ) -> Result, Self::Error> { fetch_state_from_full_source( target, || async { Target::new(self.root(), self.bounds().await.end) }, |leaf_count, last_commit_loc| { self.historical_proof( leaf_count, last_commit_loc, NonZeroU64::new(1).unwrap(), ) }, |leaf_count| self.pinned_nodes_at(leaf_count), ) .await .map(Into::into) } } impl Resolver for Arc>> where F: Family, E: crate::Context, V: $val_bound + Send + Sync + 'static, H: Hasher, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = ServeError; async fn get_compact_state( &self, target: Target, ) -> Result, Self::Error> { let db = self.read().await; fetch_state_from_full_source( target, || async { Target::new(db.root(), db.bounds().await.end) }, |leaf_count, last_commit_loc| { db.historical_proof( leaf_count, last_commit_loc, NonZeroU64::new(1).unwrap(), ) }, |leaf_count| db.pinned_nodes_at(leaf_count), ) .await .map(Into::into) } } impl Resolver for Arc>>> where F: Family, E: crate::Context, V: $val_bound + Send + Sync + 'static, H: Hasher, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = ServeError; async fn get_compact_state( &self, target: Target, ) -> Result, Self::Error> { let guard = self.read().await; let db = guard.as_ref().ok_or(ServeError::MissingSource)?; fetch_state_from_full_source( target, || async { Target::new(db.root(), db.bounds().await.end) }, |leaf_count, last_commit_loc| { db.historical_proof( leaf_count, last_commit_loc, NonZeroU64::new(1).unwrap(), ) }, |leaf_count| db.pinned_nodes_at(leaf_count), ) .await .map(Into::into) } } }; } // Resolver impls for full immutable databases. Same pattern as keyless, but with the extra key and // translator parameters carried by immutable variants. macro_rules! impl_compact_resolver_immutable { ($db:ident, $op:ident, $val_bound:ident, $key_bound:path) => { impl Resolver for Arc<$db> where F: Family, E: crate::Context, K: $key_bound, V: $val_bound + Send + Sync + 'static, H: Hasher, T: Translator + Send + Sync + 'static, T::Key: Send + Sync, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = ServeError; async fn get_compact_state( &self, target: Target, ) -> Result, Self::Error> { fetch_state_from_full_source( target, || async { Target::new(self.root(), self.bounds().await.end) }, |leaf_count, last_commit_loc| { self.historical_proof( leaf_count, last_commit_loc, NonZeroU64::new(1).unwrap(), ) }, |leaf_count| self.pinned_nodes_at(leaf_count), ) .await .map(Into::into) } } impl Resolver for Arc>> where F: Family, E: crate::Context, K: $key_bound, V: $val_bound + Send + Sync + 'static, H: Hasher, T: Translator + Send + Sync + 'static, T::Key: Send + Sync, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = ServeError; async fn get_compact_state( &self, target: Target, ) -> Result, Self::Error> { let db = self.read().await; fetch_state_from_full_source( target, || async { Target::new(db.root(), db.bounds().await.end) }, |leaf_count, last_commit_loc| { db.historical_proof( leaf_count, last_commit_loc, NonZeroU64::new(1).unwrap(), ) }, |leaf_count| db.pinned_nodes_at(leaf_count), ) .await .map(Into::into) } } impl Resolver for Arc>>> where F: Family, E: crate::Context, K: $key_bound, V: $val_bound + Send + Sync + 'static, H: Hasher, T: Translator + Send + Sync + 'static, T::Key: Send + Sync, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = ServeError; async fn get_compact_state( &self, target: Target, ) -> Result, Self::Error> { let guard = self.read().await; let db = guard.as_ref().ok_or(ServeError::MissingSource)?; fetch_state_from_full_source( target, || async { Target::new(db.root(), db.bounds().await.end) }, |leaf_count, last_commit_loc| { db.historical_proof( leaf_count, last_commit_loc, NonZeroU64::new(1).unwrap(), ) }, |leaf_count| db.pinned_nodes_at(leaf_count), ) .await .map(Into::into) } } }; } // Resolver impls for compact keyless databases. These already persist a compact witness, so serving // is just a target check over the current witness rather than reconstructing anything from history. macro_rules! impl_compact_resolver_compact_keyless { ($db:ident, $op:ident) => { impl Resolver for Arc<$db> where F: Family, E: crate::Context, V: ValueEncoding + Send + Sync + 'static, H: Hasher, $op: Encode + Read, C: Clone + Send + Sync + 'static, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = ServeError; async fn get_compact_state( &self, target: Target, ) -> Result, Self::Error> { self.compact_state(target).map(Into::into) } } impl Resolver for Arc>> where F: Family, E: crate::Context, V: ValueEncoding + Send + Sync + 'static, H: Hasher, $op: Encode + Read, C: Clone + Send + Sync + 'static, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = ServeError; async fn get_compact_state( &self, target: Target, ) -> Result, Self::Error> { let db = self.read().await; db.compact_state(target).map(Into::into) } } impl Resolver for Arc>>> where F: Family, E: crate::Context, V: ValueEncoding + Send + Sync + 'static, H: Hasher, $op: Encode + Read, C: Clone + Send + Sync + 'static, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = ServeError; async fn get_compact_state( &self, target: Target, ) -> Result, Self::Error> { let guard = self.read().await; let db = guard.as_ref().ok_or(ServeError::MissingSource)?; db.compact_state(target).map(Into::into) } } }; } // Resolver impls for compact immutable databases. Like the keyless compact path, these read the // persisted witness directly instead of rebuilding it from a full operation log. macro_rules! impl_compact_resolver_compact_immutable { ($db:ident, $op:ident) => { impl Resolver for Arc<$db> where F: Family, E: crate::Context, K: Key, V: ValueEncoding + Send + Sync + 'static, H: Hasher, $op: Encode + Read, C: Clone + Send + Sync + 'static, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = ServeError; async fn get_compact_state( &self, target: Target, ) -> Result, Self::Error> { self.compact_state(target).map(Into::into) } } impl Resolver for Arc>> where F: Family, E: crate::Context, K: Key, V: ValueEncoding + Send + Sync + 'static, H: Hasher, $op: Encode + Read, C: Clone + Send + Sync + 'static, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = ServeError; async fn get_compact_state( &self, target: Target, ) -> Result, Self::Error> { let db = self.read().await; db.compact_state(target).map(Into::into) } } impl Resolver for Arc>>> where F: Family, E: crate::Context, K: Key, V: ValueEncoding + Send + Sync + 'static, H: Hasher, $op: Encode + Read, C: Clone + Send + Sync + 'static, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = ServeError; async fn get_compact_state( &self, target: Target, ) -> Result, Self::Error> { let guard = self.read().await; let db = guard.as_ref().ok_or(ServeError::MissingSource)?; db.compact_state(target).map(Into::into) } } }; } impl_compact_resolver_compact_keyless!(KeylessCompactDb, KeylessOp); impl_compact_resolver_compact_immutable!(ImmutableCompactDb, ImmutableOp); impl_compact_resolver_keyless!(KeylessFixedDb, KeylessFixedOp, FixedValue); impl_compact_resolver_keyless!(KeylessVariableDb, KeylessVariableOp, VariableValue); impl_compact_resolver_immutable!(ImmutableFixedDb, ImmutableFixedOp, FixedValue, Array); impl_compact_resolver_immutable!(ImmutableVariableDb, ImmutableVariableOp, VariableValue, Key); #[cfg(test)] mod tests { use super::{Config, Database, FetchResult, Resolver, State, Target}; use crate::{ merkle::{mmr, Location}, qmdb, }; use commonware_codec::{DecodeExt as _, Encode as _, RangeCfg}; use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256}; use commonware_parallel::Rayon; use commonware_runtime::{deterministic, Runner as _}; use commonware_utils::sync::AsyncRwLock; use std::{ collections::VecDeque, convert::Infallible, sync::{ atomic::{AtomicUsize, Ordering}, Arc, }, }; macro_rules! assert_resolver_variants { ($db:ty) => { assert_resolver::>(); assert_resolver::>>(); assert_resolver::>>>(); }; } fn assert_resolver() {} struct TestDb { root: Digest, } impl Database for TestDb { type Family = mmr::Family; type Op = u8; type Config = (Digest, Arc); type Digest = Digest; type Context = deterministic::Context; type Hasher = Sha256; async fn from_validated_state( _context: Self::Context, (root, constructions): Self::Config, _state: super::ValidatedState, ) -> Result> { constructions.fetch_add(1, Ordering::SeqCst); Ok(Self { root }) } fn inactivity_floor(_op: &Self::Op) -> Option> { Some(Location::new(0)) } fn root(&self) -> Self::Digest { self.root } async fn persist_compact_state(&self) -> Result<(), qmdb::Error> { Ok(()) } } #[derive(Clone)] struct SequenceResolver { states: Arc>>>, } impl Resolver for SequenceResolver { type Family = mmr::Family; type Digest = Digest; type Op = u8; type Error = Infallible; async fn get_compact_state( &self, _target: Target, ) -> Result, Self::Error> { Ok(self .states .lock() .pop_front() .expect("missing compact fetch result")) } } fn valid_state_and_target() -> (State, Target) { let hasher = qmdb::hasher::(); let mut merkle = crate::merkle::mem::Mem::::new(); let op = 0u8; let first_op = 1u8; let batch = merkle .new_batch() .add(&hasher, &first_op.encode()) .add(&hasher, &op.encode()); let batch = batch.merkleize(&merkle, &hasher); merkle.apply_batch(&batch).unwrap(); let root = merkle.root(&hasher, 0).unwrap(); let leaf_count = Location::new(2); let pinned_nodes = merkle .nodes_to_pin(leaf_count) .into_values() .collect::>(); let proof = merkle.proof(&hasher, Location::new(1), 0).unwrap(); ( State { leaf_count, pinned_nodes, last_commit_op: op, last_commit_proof: proof, }, Target:: { root, leaf_count }, ) } #[test] fn test_all_compact_qmdb_variants_implement_strategy_resolvers() { type KeylessFixedCompactDb = crate::qmdb::keyless::fixed::CompactDb< mmr::Family, deterministic::Context, Digest, commonware_cryptography::Sha256, Rayon, >; type KeylessVariableCompactDb = crate::qmdb::keyless::variable::CompactDb< mmr::Family, deterministic::Context, Vec, commonware_cryptography::Sha256, (RangeCfg, ()), Rayon, >; type ImmutableFixedCompactDb = crate::qmdb::immutable::fixed::CompactDb< mmr::Family, deterministic::Context, Digest, Digest, commonware_cryptography::Sha256, Rayon, >; type ImmutableVariableCompactDb = crate::qmdb::immutable::variable::CompactDb< mmr::Family, deterministic::Context, Digest, Vec, commonware_cryptography::Sha256, ((), (RangeCfg, ())), Rayon, >; assert_resolver_variants!(KeylessFixedCompactDb); assert_resolver_variants!(KeylessVariableCompactDb); assert_resolver_variants!(ImmutableFixedCompactDb); assert_resolver_variants!(ImmutableVariableCompactDb); } #[test] fn test_target_decode_rejects_zero_leaf_count() { let unused_root = commonware_cryptography::Sha256::hash(b"unused"); let encoded = Target:: { root: unused_root, leaf_count: crate::merkle::Location::new(0), } .encode(); assert!(Target::::decode(encoded).is_err()); } #[test] fn test_compact_sync_retries_invalid_state_without_feedback() { deterministic::Runner::default().start(|context| async move { let (good_state, target) = valid_state_and_target(); let mut bad_state = good_state.clone(); bad_state.pinned_nodes.push(Sha256::hash(b"extra pin")); let (good_tx, good_rx) = commonware_utils::channel::oneshot::channel(); let constructions = Arc::new(AtomicUsize::new(0)); let db = super::sync::(Config { context, resolver: SequenceResolver { states: Arc::new(commonware_utils::sync::Mutex::new(VecDeque::from([ FetchResult { state: bad_state, callback: None, }, FetchResult { state: good_state, callback: Some(good_tx), }, ]))), }, target: target.clone(), db_config: (target.root, constructions.clone()), }) .await .unwrap(); assert!(good_rx.await.expect("valid feedback should arrive")); assert_eq!(constructions.load(Ordering::SeqCst), 1); assert_eq!(db.root(), target.root); }); } } #[cfg(all(test, feature = "arbitrary"))] mod conformance { use super::*; use crate::merkle::{mmb, mmr}; use commonware_codec::conformance::CodecConformance; use commonware_cryptography::sha256::Digest as Sha256Digest; commonware_conformance::conformance_tests! { CodecConformance>, CodecConformance>, } }