//! Shared synchronization logic for [crate::qmdb::current] databases. //! //! Contains implementation of [crate::qmdb::sync::Database] for all //! [Db](crate::qmdb::current::db::Db) variants (ordered/unordered, fixed/variable). //! //! The canonical root of a `current` database combines the ops root, grafted root, and optional //! pending and partial chunk digests into a single hash (see the [Root structure](super) section in //! the module documentation). The sync engine operates on the **ops root**, not the canonical root: //! it downloads operations and verifies each batch against the ops root using ops-tree range proofs //! (identical to `any` sync). Callers that verify current ops proofs directly should use //! `qmdb::hasher`. [crate::qmdb::current::proof::OpsRootWitness] can be used by callers that need //! to authenticate the synced ops root against a trusted canonical root; the sync engine does not //! perform this check itself. //! //! After all operations are synced, the bitmap and grafted tree are reconstructed deterministically //! from the operations. The canonical root is then computed from the ops root, the reconstructed //! grafted root, and any pending or partial chunk digests. //! //! The [Database]`::`[root()](crate::qmdb::sync::Database::root) implementation returns the **ops //! root** (not the canonical root) because that is what the sync engine verifies against. //! //! For pruned databases (`range.start > 0`), grafted pinned nodes for the pruned region are read //! directly from the ops tree after it is built. This works because of the zero-chunk identity: for //! all-zero bitmap chunks (which all pruned chunks are), the grafted leaf equals the ops subtree //! root, making the grafted tree structurally identical to the ops tree at and above the grafting //! height. use crate::{ index::Factory as IndexFactory, journal::{ authenticated, contiguous::{fixed, variable, Mutable, Reader as _}, }, merkle::{ full::{self, Merkle}, Graftable, Location, }, qmdb::{ self, any::{ db::{Db as AnyDb, Metrics as AnyMetrics}, operation::{update::Update, Operation}, ordered::{ fixed::{Operation as OrderedFixedOp, Update as OrderedFixedUpdate}, variable::{Operation as OrderedVariableOp, Update as OrderedVariableUpdate}, }, unordered::{ fixed::{Operation as UnorderedFixedOp, Update as UnorderedFixedUpdate}, variable::{Operation as UnorderedVariableOp, Update as UnorderedVariableUpdate}, }, FixedValue, VariableValue, }, bitmap::Shared, current::{ db, grafting, ordered::{ fixed::Db as CurrentOrderedFixedDb, variable::Db as CurrentOrderedVariableDb, }, unordered::{ fixed::Db as CurrentUnorderedFixedDb, variable::Db as CurrentUnorderedVariableDb, }, FixedConfig, VariableConfig, }, operation::{Committable, Key, Operation as _}, sync::{resolver::fetch_operations, Database, DatabaseConfig as Config}, }, translator::Translator, Context, Persistable, }; use commonware_codec::{Codec, CodecShared, Read as CodecRead}; use commonware_cryptography::{DigestOf, Hasher}; use commonware_parallel::Strategy; use commonware_utils::{ bitmap::Prunable as BitMap, channel::oneshot, range::NonEmptyRange, sync::AsyncMutex, Array, }; use std::sync::Arc; #[cfg(test)] pub(crate) mod tests; impl Config for super::Config { type JournalConfig = J; fn journal_config(&self) -> Self::JournalConfig { self.journal_config.clone() } } /// Shared helper to build a `current::db::Db` from sync components. /// /// This follows the same pattern as `any/sync/mod.rs::build_db` but additionally: /// * Builds the activity bitmap by replaying the operations log. /// * Extracts grafted pinned nodes from the ops tree (zero-chunk identity). /// * Builds the grafted tree from the bitmap and ops tree. /// * Computes and caches the canonical root. #[allow(clippy::too_many_arguments)] async fn build_db( context: E, merkle_config: full::Config, log: J, translator: T, pinned_nodes: Option>, range: NonEmptyRange>, apply_batch_size: usize, metadata_partition: String, strategy: S, ) -> Result, qmdb::Error> where F: Graftable, E: Context, U: Update + Send + Sync + 'static, I: IndexFactory>, H: Hasher, T: Translator, J: Mutable> + Persistable, S: Strategy, Operation: Codec + Committable + CodecShared, { // Build authenticated log. let hasher = qmdb::hasher::(); let merkle = Merkle::::init_sync( context.child("merkle"), full::SyncConfig { config: merkle_config, range: range.clone(), pinned_nodes, }, ) .await?; let index = I::new(context.child("index"), translator); let log = authenticated::Journal::::from_components( merkle, log, hasher, apply_batch_size as u64, ) .await?; // Initialize bitmap with pruned chunks. // // Floor division is intentional: chunks entirely below range.start are pruned. // If range.start is not chunk-aligned, the partial leading chunk is reconstructed by // init_from_log, which pads the gap between `pruned_chunks * CHUNK_SIZE_BITS` and the // journal's inactivity floor with inactive (false) bits. let pruned_chunks = (*range.start() / BitMap::::CHUNK_SIZE_BITS) as usize; let bitmap = BitMap::::new_with_pruned_chunks(pruned_chunks) .map_err(|_| qmdb::Error::::DataCorrupted("pruned chunks overflow"))?; let bitmap = Arc::new(Shared::::new(bitmap)); // Build any::Db, handing it the pre-allocated bitmap. `init_from_log` populates the bitmap // during replay. let any_metrics = AnyMetrics::new(context.child("any")); let any: AnyDb = AnyDb::init_from_log(index, log, Some(bitmap), any_metrics).await?; // Fetch grafted pinned nodes from the ops tree. For each position the grafted family // needs at its pruning boundary, source the digest from the ops tree via the zero-chunk // identity: when the covered chunks are all zero (which pruned chunks always are), the // ops-family digest at the mapped position equals the grafted digest. // // Requires `range.start <=` target's [`Db::sync_boundary`](db::Db::sync_boundary): that // bound guarantees every required ops-tree node is born at `range.end`. let grafted_pinned_nodes = { let grafted_boundary = Location::::new(pruned_chunks as u64); let grafting_height = grafting::height::(); let mut pins = Vec::new(); for grafted_pos in F::nodes_to_pin(grafted_boundary) { let ops_pos = grafting::grafted_to_ops_pos::(grafted_pos, grafting_height); let digest = any .log .merkle .get_node(ops_pos) .await? .ok_or(qmdb::Error::::DataCorrupted("missing ops pinned node"))?; pins.push(digest); } pins }; // Build grafted tree. let hasher = qmdb::hasher::(); let ops_size = any.log.merkle.size(); let ops_leaves = Location::::try_from(ops_size)?; let grafted_tree = db::build_grafted_tree::( &hasher, any.bitmap.as_ref(), &grafted_pinned_nodes, &any.log.merkle, ops_leaves, &strategy, ) .await?; // Compute the canonical root. The grafted root is deterministic from the ops // (which are authenticated by the engine) and the bitmap (which is deterministic // from the ops). let storage = grafting::Storage::new( &grafted_tree, grafting::height::(), &any.log.merkle, hasher.clone(), ); let partial = db::partial_chunk(any.bitmap.as_ref()); let grafted_root = db::compute_grafted_root( &hasher, any.bitmap.as_ref(), &storage, ops_leaves, any.inactivity_floor_loc, ) .await?; let ops_root = any.root(); let partial_digest = partial.map(|(chunk, next_bit)| { let digest = hasher.digest(&chunk); (next_bit, digest) }); let pending_digest = db::pending_chunk::(any.bitmap.as_ref(), ops_leaves, grafting::height::())? .map(|chunk| hasher.digest(&chunk)); let root = db::combine_roots( &hasher, &ops_root, &grafted_root, pending_digest.as_ref(), partial_digest.as_ref().map(|(nb, d)| (*nb, d)), ); // Initialize metadata store and construct the Db. let (metadata, _, _) = db::init_metadata::>(context.child("metadata"), &metadata_partition) .await?; let metrics = db::Metrics::new(context); let current_db = db::Db { any, grafted_tree, metadata: AsyncMutex::new(metadata), strategy, root, metrics, }; current_db.update_metrics(); // Persist metadata so the db can be reopened with init_fixed/init_variable. current_db.sync_metadata().await?; Ok(current_db) } // --- Database trait implementations --- macro_rules! impl_current_sync_database { ($db:ident, $op:ident, $update:ident, $journal:ty, $config:ty, $key_bound:path, $value_bound:ident $(; $($where_extra:tt)+)?) => { impl Database for $db where F: Graftable, E: Context, K: $key_bound, V: $value_bound + 'static, H: Hasher, T: Translator, S: Strategy, $($($where_extra)+)? { type Family = F; type Context = E; type Op = $op; type Journal = $journal; type Hasher = H; type Config = $config; type Digest = H::Digest; async fn from_sync_result( context: Self::Context, config: Self::Config, log: Self::Journal, pinned_nodes: Option>, range: NonEmptyRange>, apply_batch_size: usize, ) -> Result> { let merkle_config = config.merkle_config.clone(); let metadata_partition = config.grafted_metadata_partition.clone(); let strategy = config.merkle_config.strategy.clone(); let translator = config.translator.clone(); build_db::, _, H, _, T, N, _>( context, merkle_config, log, translator, pinned_nodes, range, apply_batch_size, metadata_partition, strategy, ) .await } async fn local_boundary_nodes( context: Self::Context, config: &Self::Config, target: &qmdb::sync::Target, journal: &Self::Journal, ) -> Result>, qmdb::Error> { if target.range.start() == Location::new(0) { return Ok(None); } let reader = journal.reader().await; let bounds = reader.bounds(); if Location::new(bounds.start) > target.range.start() || Location::new(bounds.end) != target.range.end() { return Ok(None); } let inactivity_floor = qmdb::find_inactivity_floor_at::( &reader, target.range.end(), |op| op.has_floor(), ) .await?; drop(reader); let hasher = qmdb::hasher::(); let merkle = Merkle::::init( context.child("local_boundary_merkle"), &hasher, config.merkle_config.clone(), ) .await?; let bounds = merkle.bounds(); if bounds.start > target.range.start() || bounds.end != target.range.end() { return Ok(None); } let inactive_peaks = F::inactive_peaks( F::location_to_position(target.range.end()), inactivity_floor, ); if merkle.root(&hasher, inactive_peaks)? != target.root { return Ok(None); } merkle .pinned_nodes_at(target.range.start()) .await .map(Some) .map_err(Into::into) } /// Returns the ops root (not the canonical root), since the sync engine verifies /// batches against the ops tree. fn root(&self) -> Self::Digest { self.any.root() } } }; } impl_current_sync_database!( CurrentUnorderedFixedDb, UnorderedFixedOp, UnorderedFixedUpdate, fixed::Journal, FixedConfig, Array, FixedValue ); impl_current_sync_database!( CurrentUnorderedVariableDb, UnorderedVariableOp, UnorderedVariableUpdate, variable::Journal, VariableConfig as CodecRead>::Cfg, S>, Key, VariableValue; UnorderedVariableOp: CodecShared ); impl_current_sync_database!( CurrentOrderedFixedDb, OrderedFixedOp, OrderedFixedUpdate, fixed::Journal, FixedConfig, Array, FixedValue ); impl_current_sync_database!( CurrentOrderedVariableDb, OrderedVariableOp, OrderedVariableUpdate, variable::Journal, VariableConfig as CodecRead>::Cfg, S>, Key, VariableValue; OrderedVariableOp: CodecShared ); // --- Resolver implementations --- // // The resolver for `current` databases serves ops-level proofs (not grafted proofs) from // the inner `any` db. The sync engine verifies each batch against the ops root. macro_rules! impl_current_resolver { ($db:ident, $op:ident, $val_bound:ident, $key_bound:path $(; $($where_extra:tt)+)?) => { impl crate::qmdb::sync::Resolver for std::sync::Arc<$db> where F: Graftable, E: Context, K: $key_bound, V: $val_bound + Send + Sync + 'static, H: Hasher, T: Translator + Send + Sync + 'static, T::Key: Send + Sync, S: Strategy, $($($where_extra)+)? { type Family = F; type Digest = H::Digest; type Op = $op; type Error = qmdb::Error; async fn get_operations( &self, op_count: Location, start_loc: Location, max_ops: std::num::NonZeroU64, include_pinned_nodes: bool, _cancel_rx: oneshot::Receiver<()>, ) -> Result, Self::Error> { fetch_operations( op_count, start_loc, max_ops, include_pinned_nodes, |op_count, start_loc, max_ops| { self.any.historical_proof(op_count, start_loc, max_ops) }, |start_loc| self.any.pinned_nodes_at(start_loc), ) .await } } impl crate::qmdb::sync::Resolver for std::sync::Arc< commonware_utils::sync::AsyncRwLock< $db, >, > where F: Graftable, E: Context, K: $key_bound, V: $val_bound + Send + Sync + 'static, H: Hasher, T: Translator + Send + Sync + 'static, T::Key: Send + Sync, S: Strategy, $($($where_extra)+)? { type Family = F; type Digest = H::Digest; type Op = $op; type Error = qmdb::Error; async fn get_operations( &self, op_count: Location, start_loc: Location, max_ops: std::num::NonZeroU64, include_pinned_nodes: bool, _cancel_rx: oneshot::Receiver<()>, ) -> Result, qmdb::Error> { let db = self.read().await; fetch_operations( op_count, start_loc, max_ops, include_pinned_nodes, |op_count, start_loc, max_ops| { db.any.historical_proof(op_count, start_loc, max_ops) }, |start_loc| db.any.pinned_nodes_at(start_loc), ) .await } } impl crate::qmdb::sync::Resolver for std::sync::Arc< commonware_utils::sync::AsyncRwLock< Option<$db>, >, > where F: Graftable, E: Context, K: $key_bound, V: $val_bound + Send + Sync + 'static, H: Hasher, T: Translator + Send + Sync + 'static, T::Key: Send + Sync, S: Strategy, $($($where_extra)+)? { type Family = F; type Digest = H::Digest; type Op = $op; type Error = qmdb::Error; async fn get_operations( &self, op_count: Location, start_loc: Location, max_ops: std::num::NonZeroU64, include_pinned_nodes: bool, _cancel_rx: oneshot::Receiver<()>, ) -> Result, qmdb::Error> { let guard = self.read().await; let db = guard.as_ref().ok_or(qmdb::Error::::KeyNotFound)?; fetch_operations( op_count, start_loc, max_ops, include_pinned_nodes, |op_count, start_loc, max_ops| { db.any.historical_proof(op_count, start_loc, max_ops) }, |start_loc| db.any.pinned_nodes_at(start_loc), ) .await } } }; } // Unordered Fixed impl_current_resolver!(CurrentUnorderedFixedDb, UnorderedFixedOp, FixedValue, Array); // Unordered Variable impl_current_resolver!( CurrentUnorderedVariableDb, UnorderedVariableOp, VariableValue, Key; UnorderedVariableOp: CodecShared, ); // Ordered Fixed impl_current_resolver!(CurrentOrderedFixedDb, OrderedFixedOp, FixedValue, Array); // Ordered Variable impl_current_resolver!( CurrentOrderedVariableDb, OrderedVariableOp, VariableValue, Key; OrderedVariableOp: CodecShared, );