//! A shared, generic implementation of the _Any_ QMDB. //! //! The impl blocks in this file define shared functionality across all Any QMDB variants. use super::operation::{update::Update, Operation}; use crate::{ index::Unordered as UnorderedIndex, journal::{ authenticated, contiguous::{Contiguous, Mutable, Reader}, Error as JournalError, }, mmr::{Location, Proof}, qmdb::{ any::ValueEncoding, build_snapshot_from_log, operation::{Key, Operation as OperationTrait}, store::{LogStore, MerkleizedStore, PrunableStore}, Error, }, Persistable, }; use commonware_codec::{Codec, CodecShared}; use commonware_cryptography::Hasher; use commonware_runtime::{Clock, Metrics, Storage}; use core::num::NonZeroU64; /// Type alias for the authenticated journal used by [Db]. pub(crate) type AuthenticatedLog = authenticated::Journal; /// An "Any" QMDB implementation generic over ordered/unordered keys and variable/fixed values. /// Consider using one of the following specialized variants instead, which may be more ergonomic: /// - [crate::qmdb::any::ordered::fixed::Db] /// - [crate::qmdb::any::ordered::variable::Db] /// - [crate::qmdb::any::unordered::fixed::Db] /// - [crate::qmdb::any::unordered::variable::Db] pub struct Db< E: Storage + Clock + Metrics, C: Contiguous, I: UnorderedIndex, H: Hasher, U: Send + Sync, > { /// A (pruned) log of all operations in order of their application. The index of each /// operation in the log is called its _location_, which is a stable identifier. /// /// # Invariants /// /// - The log is never pruned beyond the inactivity floor. /// - There is always at least one commit operation in the log. pub(crate) log: AuthenticatedLog, /// A location before which all operations are "inactive" (that is, operations before this point /// are over keys that have been updated by some operation at or after this point). pub(crate) inactivity_floor_loc: Location, /// The location of the last commit operation. pub(crate) last_commit_loc: Location, /// A snapshot of all currently active operations in the form of a map from each key to the /// location in the log containing its most recent update. /// /// # Invariant /// /// - Only references `Operation::Update`s. pub(crate) snapshot: I, /// The number of active keys in the snapshot. pub(crate) active_keys: usize, /// Marker for the update type parameter. pub(crate) _update: core::marker::PhantomData, } // Shared read-only functionality. impl Db where E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, U: Update, C: Contiguous>, I: UnorderedIndex, H: Hasher, Operation: Codec, { /// Return the inactivity floor location. This is the location before which all operations are /// known to be inactive. Operations before this point can be safely pruned. pub const fn inactivity_floor_loc(&self) -> Location { self.inactivity_floor_loc } /// Whether the snapshot currently has no active keys. pub const fn is_empty(&self) -> bool { self.active_keys == 0 } /// Get the metadata associated with the last commit. pub async fn get_metadata(&self) -> Result, Error> { match self.log.reader().await.read(*self.last_commit_loc).await? { Operation::CommitFloor(metadata, _) => Ok(metadata), _ => unreachable!("last commit is not a CommitFloor operation"), } } pub fn root(&self) -> H::Digest { self.log.root() } } // Functionality requiring Mutable journal. impl Db where E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, U: Update, C: Mutable>, I: UnorderedIndex, H: Hasher, Operation: Codec, { /// Prunes historical operations prior to `prune_loc`. This does not affect the db's root or /// snapshot. /// /// # Errors /// /// - Returns [Error::PruneBeyondMinRequired] if `prune_loc` > inactivity floor. /// - Returns [crate::mmr::Error::LocationOverflow] if `prune_loc` > [crate::mmr::MAX_LOCATION]. pub async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> { if prune_loc > self.inactivity_floor_loc { return Err(Error::PruneBeyondMinRequired( prune_loc, self.inactivity_floor_loc, )); } self.log.prune(prune_loc).await?; Ok(()) } } // Functionality requiring Mutable + Persistable journal. impl Db where E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, U: Update, C: Mutable> + Persistable, I: UnorderedIndex, H: Hasher, Operation: Codec, { /// Returns a [Db] initialized from `log`, using `callback` to report snapshot /// building events. /// /// # Panics /// /// Panics if the log is empty or the last operation is not a commit floor operation. pub async fn init_from_log( mut index: I, log: AuthenticatedLog, known_inactivity_floor: Option, mut callback: F, ) -> Result where F: FnMut(bool, Option), { // If the last-known inactivity floor is behind the current floor, then invoke the callback // appropriately to report the inactive bits. let (last_commit_loc, inactivity_floor_loc, active_keys) = { let reader = log.reader().await; let last_commit_loc = reader .bounds() .end .checked_sub(1) .expect("commit should exist"); let last_commit = reader.read(last_commit_loc).await?; let inactivity_floor_loc = last_commit.has_floor().expect("should be a commit"); if let Some(known_inactivity_floor) = known_inactivity_floor { (*known_inactivity_floor..*inactivity_floor_loc) .for_each(|_| callback(false, None)); } let active_keys = build_snapshot_from_log(inactivity_floor_loc, &reader, &mut index, callback) .await?; ( Location::new(last_commit_loc), inactivity_floor_loc, active_keys, ) }; Ok(Self { log, inactivity_floor_loc, snapshot: index, last_commit_loc, active_keys, _update: core::marker::PhantomData, }) } /// Sync all database state to disk. pub async fn sync(&self) -> Result<(), Error> { self.log.sync().await.map_err(Into::into) } /// Destroy the db, removing all data from disk. pub async fn destroy(self) -> Result<(), Error> { self.log.destroy().await.map_err(Into::into) } pub async fn historical_proof( &self, historical_size: Location, start_loc: Location, max_ops: NonZeroU64, ) -> Result<(Proof, Vec>), Error> { self.log .historical_proof(historical_size, start_loc, max_ops) .await .map_err(Into::into) } pub async fn proof( &self, loc: Location, max_ops: NonZeroU64, ) -> Result<(Proof, Vec>), Error> { self.historical_proof(self.log.size().await, loc, max_ops) .await } } impl Persistable for Db where E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, U: Update, C: Mutable> + Persistable, I: UnorderedIndex, H: Hasher, Operation: Codec, { type Error = Error; async fn commit(&self) -> Result<(), Error> { // No-op, DB already in recoverable state. Ok(()) } async fn sync(&self) -> Result<(), Error> { Self::sync(self).await } async fn destroy(self) -> Result<(), Error> { self.destroy().await } } impl MerkleizedStore for Db where E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, U: Update, C: Mutable>, I: UnorderedIndex, H: Hasher, Operation: Codec, { type Digest = H::Digest; type Operation = Operation; fn root(&self) -> H::Digest { self.root() } async fn historical_proof( &self, historical_size: Location, start_loc: Location, max_ops: NonZeroU64, ) -> Result<(Proof, Vec>), Error> { self.log .historical_proof(historical_size, start_loc, max_ops) .await .map_err(Into::into) } } impl LogStore for Db where E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, U: Update, C: Contiguous>, I: UnorderedIndex, H: Hasher, Operation: Codec, { type Value = V::Value; async fn bounds(&self) -> std::ops::Range { let bounds = self.log.reader().await.bounds(); Location::new(bounds.start)..Location::new(bounds.end) } async fn get_metadata(&self) -> Result, Error> { self.get_metadata().await } } impl PrunableStore for Db where E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, U: Update, C: Mutable>, I: UnorderedIndex, H: Hasher, Operation: Codec, { async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> { self.prune(prune_loc).await } async fn inactivity_floor_loc(&self) -> Location { self.inactivity_floor_loc() } }