use crate::{ journal::{ authenticated, contiguous::{Contiguous as _, Mutable, Reader as _}, Error as JournalError, }, merkle::{ full::{self, Merkle}, Family, Location, }, qmdb::{ self, any::value::ValueEncoding, keyless::{operation::Codec, CompactDb, Keyless, Metrics, Operation}, sync, }, Context, Persistable, }; use commonware_codec::{Encode, EncodeShared, Read}; use commonware_cryptography::Hasher; use commonware_parallel::Strategy; use commonware_utils::range::NonEmptyRange; impl sync::Database for Keyless where F: Family, E: Context, V: ValueEncoding + Codec, C: Mutable> + Persistable + sync::Journal>, C::Config: Clone + Send, H: Hasher, S: Strategy, Operation: EncodeShared, { type Family = F; type Op = Operation; type Journal = C; type Hasher = H; type Config = super::Config; type Digest = H::Digest; type Context = E; /// Returns a [Keyless] db initialized from data collected in the sync process. /// /// # Behavior /// /// This method handles different initialization scenarios based on existing data: /// - If the Merkle journal is empty or the last item is before the range start, it creates /// a fresh Merkle structure from the provided `pinned_nodes` /// - If the Merkle journal has data but is incomplete (has length < range end), missing /// operations from the log are applied to bring it up to the target state /// - If the Merkle journal has data beyond the range end, it is rewound to match the sync /// target /// /// # Returns /// /// A [Keyless] db populated with the state from the given range. async fn from_sync_result( context: Self::Context, config: Self::Config, log: Self::Journal, pinned_nodes: Option>, range: NonEmptyRange>, apply_batch_size: usize, ) -> Result> { let hasher = qmdb::hasher::(); let merkle = Merkle::::init_sync( context.child("merkle"), full::SyncConfig { config: config.merkle.clone(), range: range.clone(), pinned_nodes, }, ) .await?; let journal = authenticated::Journal::::from_components( merkle, log, hasher, apply_batch_size as u64, ) .await?; let (last_commit_loc, inactivity_floor_loc) = { let reader = journal.reader().await; let bounds = reader.bounds(); let loc = bounds .end .checked_sub(1) .ok_or(qmdb::Error::HistoricalFloorPruned(Location::new( bounds.end, )))?; let floor = qmdb::find_inactivity_floor_at::(&reader, Location::new(bounds.end), |op| { op.has_floor() }) .await?; (Location::new(loc), floor) }; let inactive_peaks = F::inactive_peaks( F::location_to_position(Location::new(*last_commit_loc + 1)), inactivity_floor_loc, ); let root = journal.root(inactive_peaks)?; let metrics = Metrics::new(context); let db = Self { journal, root, last_commit_loc, inactivity_floor_loc, metrics, }; db.update_metrics().await; db.sync().await?; Ok(db) } fn root(&self) -> Self::Digest { self.root() } } impl sync::compact::Database for CompactDb where F: Family, E: Context, V: ValueEncoding + Codec, H: Hasher, S: Strategy, Operation: EncodeShared, Operation: Read, Cfg: Clone + Send + Sync + 'static, { type Family = F; type Op = Operation; type Config = super::CompactConfig; type Digest = H::Digest; type Context = E; type Hasher = H; async fn from_validated_state( context: Self::Context, config: Self::Config, state: sync::compact::ValidatedState, ) -> Result> { let sync::compact::ValidatedState { state, root, inactivity_floor: inactivity_floor_loc, } = state; let sync::compact::State { leaf_count, pinned_nodes, last_commit_op, last_commit_proof, } = state; let last_commit_loc = Location::new(*leaf_count - 1); let Operation::Commit(last_commit_metadata, op_floor) = last_commit_op else { return Err(qmdb::Error::UnexpectedData(last_commit_loc)); }; assert_eq!(op_floor, inactivity_floor_loc, "inactivity floor mismatch"); let commit_codec_config = config.commit_codec_config.clone(); let last_commit_op_bytes = Operation::::Commit(last_commit_metadata.clone(), inactivity_floor_loc) .encode() .to_vec(); let merkle = crate::merkle::compact::Merkle::init_from_compact_state( context.child("merkle"), config.merkle, leaf_count, pinned_nodes.clone(), ) .await?; Self::init_from_verified_state( merkle, commit_codec_config, last_commit_metadata, inactivity_floor_loc, root, last_commit_op_bytes, last_commit_proof, pinned_nodes, ) } fn inactivity_floor(op: &Self::Op) -> Option> { op.has_floor() } fn root(&self) -> Self::Digest { self.root() } async fn persist_compact_state(&self) -> Result<(), qmdb::Error> { self.persist_cached_witness().await } } #[cfg(test)] mod tests;