use crate::{ index::unordered::Index, journal::{ authenticated, contiguous::{Mutable, Reader as _}, Error as JournalError, }, merkle::{ full::{self, Merkle}, Family, Location, }, qmdb::{ self, any::ValueEncoding, build_snapshot_from_log, immutable::{self, CompactDb, Metrics, Operation}, operation::Key, sync::{self}, Error, }, translator::Translator, Context, Persistable, }; use commonware_codec::{Encode, EncodeShared, Read}; use commonware_cryptography::Hasher; use commonware_parallel::Strategy; use commonware_utils::range::NonEmptyRange; impl sync::Database for immutable::Immutable where F: Family, E: Context, K: Key, V: ValueEncoding, C: Mutable> + Persistable + sync::Journal>, C::Item: EncodeShared, C::Config: Clone + Send, H: Hasher, T: Translator, S: Strategy, { type Family = F; type Op = Operation; type Journal = C; type Hasher = H; type Config = immutable::Config; type Digest = H::Digest; type Context = E; /// Returns an [Immutable](immutable::Immutable) initialized from data collected in the sync process. /// /// # Behavior /// /// This method handles different initialization scenarios based on existing data: /// - If the Merkle journal is empty or the last item is before the range start, it creates a /// fresh Merkle structure from the provided `pinned_nodes` /// - If the Merkle journal has data but is incomplete (has length < range end), missing /// operations from the log are applied to bring it up to the target state /// - If the Merkle journal has data beyond the range end, it is rewound to match the sync /// target /// /// # Returns /// /// A [super::Immutable] db populated with the state from the given range. /// The pruning boundary is set to the range start. async fn from_sync_result( context: Self::Context, db_config: Self::Config, log: Self::Journal, pinned_nodes: Option>, range: NonEmptyRange>, apply_batch_size: usize, ) -> Result> { let hasher = qmdb::hasher::(); // Initialize Merkle structure for sync let merkle = Merkle::::init_sync( context.child("merkle"), full::SyncConfig { config: db_config.merkle_config.clone(), range: range.clone(), pinned_nodes, }, ) .await?; let journal = authenticated::Journal::<_, _, _, _, S>::from_components( merkle, log, hasher, apply_batch_size as u64, ) .await?; let mut snapshot: Index> = Index::new(context.child("snapshot"), db_config.translator.clone()); let (last_commit_loc, inactivity_floor_loc) = { let reader = journal.journal.reader().await; let bounds = reader.bounds(); let last_commit_loc = Location::::new( bounds .end .checked_sub(1) .ok_or(Error::HistoricalFloorPruned(Location::new(bounds.end)))?, ); let inactivity_floor_loc = crate::qmdb::find_inactivity_floor_at::( &reader, Location::new(bounds.end), |op| op.has_floor(), ) .await?; // Replay the log from the inactivity floor to build the snapshot. build_snapshot_from_log::( inactivity_floor_loc, &reader, &mut snapshot, |_, _| {}, ) .await?; (last_commit_loc, inactivity_floor_loc) }; let inactive_peaks = F::inactive_peaks( F::location_to_position(Location::new(*last_commit_loc + 1)), inactivity_floor_loc, ); let root = journal.root(inactive_peaks)?; let metrics = Metrics::new(context); let db = Self { journal, root, snapshot, last_commit_loc, inactivity_floor_loc, metrics, }; db.update_metrics().await; db.sync().await?; Ok(db) } fn root(&self) -> Self::Digest { self.root() } } impl sync::compact::Database for CompactDb where F: Family, E: Context, K: Key, V: ValueEncoding, H: Hasher, S: Strategy, Operation: EncodeShared, Operation: Read, Cfg: Clone + Send + Sync + 'static, { type Family = F; type Op = Operation; type Config = immutable::CompactConfig; type Digest = H::Digest; type Context = E; type Hasher = H; async fn from_validated_state( context: Self::Context, config: Self::Config, state: sync::compact::ValidatedState, ) -> Result> { let sync::compact::ValidatedState { state, root, inactivity_floor: inactivity_floor_loc, } = state; let sync::compact::State { leaf_count, pinned_nodes, last_commit_op, last_commit_proof, } = state; let last_commit_loc = Location::new(*leaf_count - 1); let Operation::Commit(last_commit_metadata, op_floor) = last_commit_op else { return Err(Error::UnexpectedData(last_commit_loc)); }; assert_eq!(op_floor, inactivity_floor_loc, "inactivity floor mismatch"); let commit_codec_config = config.commit_codec_config.clone(); let last_commit_op_bytes = Operation::::Commit(last_commit_metadata.clone(), inactivity_floor_loc) .encode() .to_vec(); let merkle = crate::merkle::compact::Merkle::init_from_compact_state( context.child("merkle"), config.merkle, leaf_count, pinned_nodes.clone(), ) .await?; Self::init_from_verified_state( merkle, commit_codec_config, last_commit_metadata, inactivity_floor_loc, root, last_commit_op_bytes, last_commit_proof, pinned_nodes, ) } fn inactivity_floor(op: &Self::Op) -> Option> { op.has_floor() } fn root(&self) -> Self::Digest { self.root() } async fn persist_compact_state(&self) -> Result<(), Error> { self.persist_cached_witness().await } } #[cfg(test)] mod tests;