//! A shared, generic implementation of the "Any" QMDB. //! The impl block in this file defines shared functionality across all "Any" QMDB variants. use super::operation::{update::Update, Operation}; use crate::{ index::Unordered as UnorderedIndex, journal::{ authenticated, contiguous::{Contiguous, MutableContiguous, PersistableContiguous}, }, mmr::{ mem::{Clean, Dirty, State}, Location, Proof, }, qmdb::{ any::ValueEncoding, build_snapshot_from_log, operation::{Committable, Operation as OperationTrait}, store::LogStore, Error, FloorHelper, }, AuthenticatedBitMap, }; use commonware_codec::Codec; use commonware_cryptography::{Digest, DigestOf, Hasher}; use commonware_runtime::{Clock, Metrics, Storage}; use commonware_utils::Array; use core::{num::NonZeroU64, ops::Range}; use tracing::debug; /// Type alias for the authenticated journal used by [Db]. pub(crate) type AuthenticatedLog>> = authenticated::Journal; /// An "Any" QMDB implementation generic over ordered/unordered keys and variable/fixed values. /// Consider using one of the following specialized variants instead, which may be more ergonomic: /// - [crate::qmdb::any::ordered::fixed::Db] /// - [crate::qmdb::any::ordered::variable::Db] /// - [crate::qmdb::any::unordered::fixed::Db] /// - [crate::qmdb::any::unordered::variable::Db] pub struct Db< E: Storage + Clock + Metrics, C: Contiguous, I, H: Hasher, U, S: State> = Clean>, > where C::Item: Codec, { /// A (pruned) log of all operations in order of their application. The index of each /// operation in the log is called its _location_, which is a stable identifier. /// /// # Invariants /// /// - The log is never pruned beyond the inactivity floor. /// - There is always at least one commit operation in the log. pub(crate) log: AuthenticatedLog, /// A location before which all operations are "inactive" (that is, operations before this point /// are over keys that have been updated by some operation at or after this point). pub(crate) inactivity_floor_loc: Location, /// The location of the last commit operation. pub(crate) last_commit_loc: Location, /// A snapshot of all currently active operations in the form of a map from each key to the /// location in the log containing its most recent update. /// /// # Invariant /// /// - Only references `Operation::Update`s. pub(crate) snapshot: I, /// The number of _steps_ to raise the inactivity floor. Each step involves moving exactly one /// active operation to tip. pub(crate) steps: u64, /// The number of active keys in the snapshot. pub(crate) active_keys: usize, /// Marker for the update type parameter. pub(crate) _update: core::marker::PhantomData, } impl Db where E: Storage + Clock + Metrics, K: Array, V: ValueEncoding, U: Update, C: Contiguous>, I: UnorderedIndex, H: Hasher, S: State>, Operation: Codec, { /// The number of operations that have been applied to this db, including those that have been /// pruned and those that are not yet committed. pub fn op_count(&self) -> Location { self.log.size() } /// Return the inactivity floor location. This is the location before which all operations are /// known to be inactive. Operations before this point can be safely pruned. pub const fn inactivity_floor_loc(&self) -> Location { self.inactivity_floor_loc } /// Whether the snapshot currently has no active keys. pub const fn is_empty(&self) -> bool { self.active_keys == 0 } /// Returns the location of the oldest operation that remains retrievable. pub fn oldest_retained_loc(&self) -> Location { self.log .oldest_retained_loc() .expect("at least one operation should exist") } /// Get the metadata associated with the last commit. pub async fn get_metadata(&self) -> Result, Error> { match self.log.read(self.last_commit_loc).await? { Operation::CommitFloor(metadata, _) => Ok(metadata), _ => unreachable!("last commit is not a CommitFloor operation"), } } } impl Db where E: Storage + Clock + Metrics, K: Array, V: ValueEncoding, U: Update, C: MutableContiguous>, I: UnorderedIndex, H: Hasher, Operation: Codec, { /// Returns a [Db] initialized from `log`, using `callback` to report snapshot /// building events. /// /// # Panics /// /// Panics if the log is empty or the last operation is not a commit floor operation. pub async fn init_from_log( mut index: I, log: AuthenticatedLog, known_inactivity_floor: Option, mut callback: F, ) -> Result where F: FnMut(bool, Option), { // If the last-known inactivity floor is behind the current floor, then invoke the callback // appropriately to report the inactive bits. let last_commit_loc = log.size().checked_sub(1).expect("commit should exist"); let last_commit = log.read(last_commit_loc).await?; let inactivity_floor_loc = last_commit.has_floor().expect("should be a commit"); if let Some(known_inactivity_floor) = known_inactivity_floor { (*known_inactivity_floor..*inactivity_floor_loc).for_each(|_| callback(false, None)); } // Build snapshot from the log let active_keys = build_snapshot_from_log(inactivity_floor_loc, &log, &mut index, callback).await?; Ok(Self { log, inactivity_floor_loc, snapshot: index, last_commit_loc, steps: 0, active_keys, _update: core::marker::PhantomData, }) } /// Raises the inactivity floor by exactly one step, moving the first active operation to tip. /// Raises the floor to the tip if the db is empty. pub(crate) async fn raise_floor(&mut self) -> Result { if self.is_empty() { self.inactivity_floor_loc = self.op_count(); debug!(tip = ?self.inactivity_floor_loc, "db is empty, raising floor to tip"); } else { let steps_to_take = self.steps + 1; for _ in 0..steps_to_take { let loc = self.inactivity_floor_loc; self.inactivity_floor_loc = self.as_floor_helper().raise_floor(loc).await?; } } self.steps = 0; Ok(self.inactivity_floor_loc) } /// Same as `raise_floor` but uses the status bitmap to more efficiently find the first active /// operation above the inactivity floor. pub(crate) async fn raise_floor_with_bitmap( &mut self, status: &mut AuthenticatedBitMap, ) -> Result { if self.is_empty() { self.inactivity_floor_loc = self.op_count(); debug!(tip = ?self.inactivity_floor_loc, "db is empty, raising floor to tip"); } else { let steps_to_take = self.steps + 1; for _ in 0..steps_to_take { let loc = self.inactivity_floor_loc; self.inactivity_floor_loc = self .as_floor_helper() .raise_floor_with_bitmap(status, loc) .await?; } } self.steps = 0; Ok(self.inactivity_floor_loc) } /// Returns a FloorHelper wrapping the current state of the log. pub(crate) const fn as_floor_helper( &mut self, ) -> FloorHelper<'_, I, AuthenticatedLog> { FloorHelper { snapshot: &mut self.snapshot, log: &mut self.log, } } /// Prunes historical operations prior to `prune_loc`. This does not affect the db's root or /// snapshot. /// /// # Errors /// /// - Returns [Error::PruneBeyondMinRequired] if `prune_loc` > inactivity floor. /// - Returns [crate::mmr::Error::LocationOverflow] if `prune_loc` > [crate::mmr::MAX_LOCATION]. pub async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> { if prune_loc > self.inactivity_floor_loc { return Err(Error::PruneBeyondMinRequired( prune_loc, self.inactivity_floor_loc, )); } self.log.prune(prune_loc).await?; Ok(()) } } impl Db where E: Storage + Clock + Metrics, K: Array, V: ValueEncoding, U: Update, C: Contiguous>, I: UnorderedIndex, H: Hasher, Operation: Codec, { /// Convert this database into its dirty counterpart for batched updates. pub fn into_dirty(self) -> Db { Db { log: self.log.into_dirty(), inactivity_floor_loc: self.inactivity_floor_loc, last_commit_loc: self.last_commit_loc, snapshot: self.snapshot, steps: self.steps, active_keys: self.active_keys, _update: core::marker::PhantomData, } } } impl Db where E: Storage + Clock + Metrics, K: Array, V: ValueEncoding, U: Update, C: Contiguous>, I: UnorderedIndex, H: Hasher, Operation: Codec, { /// Merkleize the database and compute the root digest. pub fn merkleize(self) -> Db> { Db { log: self.log.merkleize(), inactivity_floor_loc: self.inactivity_floor_loc, last_commit_loc: self.last_commit_loc, snapshot: self.snapshot, steps: self.steps, active_keys: self.active_keys, _update: core::marker::PhantomData, } } } impl Db where E: Storage + Clock + Metrics, K: Array, V: ValueEncoding, U: Update, C: PersistableContiguous>, I: UnorderedIndex, H: Hasher, Operation: Codec, { /// Applies the given commit operation to the log and commits it to disk. Does not raise the /// inactivity floor. /// /// # Panics /// /// Panics if the given operation is not a commit operation. pub(crate) async fn apply_commit_op(&mut self, op: Operation) -> Result<(), Error> { assert!(op.is_commit(), "commit operation expected"); self.last_commit_loc = self.op_count(); self.log.append(op).await?; self.log.commit().await.map_err(Into::into) } /// Simulate an unclean shutdown by consuming the db. If commit_log is true, the underlying /// authenticated log will be be committed before consuming. #[cfg(any(test, feature = "fuzzing"))] pub async fn simulate_failure(mut self, commit_log: bool) -> Result<(), Error> { if commit_log { self.log.commit().await?; } Ok(()) } /// Commit any pending operations to the database, ensuring their durability upon return from /// this function. Also raises the inactivity floor according to the schedule. Returns the /// `(start_loc, end_loc]` location range of committed operations. pub async fn commit(&mut self, metadata: Option) -> Result, Error> { let start_loc = self.last_commit_loc + 1; // Raise the inactivity floor by taking `self.steps` steps, plus 1 to account for the // previous commit becoming inactive. let inactivity_floor_loc = self.raise_floor().await?; // Append the commit operation with the new inactivity floor. self.apply_commit_op(Operation::CommitFloor(metadata, inactivity_floor_loc)) .await?; Ok(start_loc..self.op_count()) } /// Sync all database state to disk. pub async fn sync(&mut self) -> Result<(), Error> { self.log.sync().await.map_err(Into::into) } /// Close the db. Operations that have not been committed will be lost or rolled back on /// restart. pub async fn close(self) -> Result<(), Error> { self.log.close().await.map_err(Into::into) } /// Destroy the db, removing all data from disk. pub async fn destroy(self) -> Result<(), Error> { self.log.destroy().await.map_err(Into::into) } } impl crate::qmdb::store::LogStorePrunable for Db where E: Storage + Clock + Metrics, K: Array, V: ValueEncoding, U: Update, C: MutableContiguous>, I: UnorderedIndex, H: Hasher, Operation: Codec, { async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> { self.prune(prune_loc).await } } impl crate::qmdb::store::CleanStore for Db where E: Storage + Clock + Metrics, K: Array, V: ValueEncoding, U: Update, C: Contiguous>, I: UnorderedIndex, H: Hasher, Operation: Codec, { type Digest = H::Digest; type Operation = Operation; type Dirty = Db; fn into_dirty(self) -> Self::Dirty { self.into_dirty() } fn root(&self) -> H::Digest { self.log.root() } async fn proof( &self, start_loc: Location, max_ops: NonZeroU64, ) -> Result<(Proof, Vec), Error> { let size = self.op_count(); self.historical_proof(size, start_loc, max_ops).await } async fn historical_proof( &self, historical_size: Location, start_loc: Location, max_ops: NonZeroU64, ) -> Result<(Proof, Vec), Error> { self.log .historical_proof(historical_size, start_loc, max_ops) .await .map_err(Into::into) } } impl LogStore for Db where E: Storage + Clock + Metrics, K: Array, V: ValueEncoding, U: Update, C: Contiguous>, I: UnorderedIndex, H: Hasher, S: State>, Operation: Codec, { type Value = V::Value; fn op_count(&self) -> Location { self.op_count() } fn inactivity_floor_loc(&self) -> Location { self.inactivity_floor_loc() } async fn get_metadata(&self) -> Result, Error> { self.get_metadata().await } fn is_empty(&self) -> bool { self.is_empty() } } impl crate::qmdb::store::DirtyStore for Db where E: Storage + Clock + Metrics, K: Array, V: ValueEncoding, U: Update, C: Contiguous>, I: UnorderedIndex, H: Hasher, Operation: Codec, { type Digest = H::Digest; type Operation = Operation; type Clean = Db; async fn merkleize(self) -> Result { Ok(self.merkleize()) } }