//! A collection of authenticated databases inspired by QMDB (Quick Merkle Database). //! //! # Terminology //! //! A database's state is derived from an append-only log of state-changing _operations_. //! //! In a _keyed_ database, a _key_ either has a _value_ or it doesn't, and different types of //! operations modify the state of a specific key. A key that has a value can change to one without //! a value through the _delete_ operation. The _update_ operation gives a key a specific value. We //! sometimes call an update for a key that doesn't already have a value a _create_ operation, but //! its representation in the log is the same. //! //! Keys with values are called _active_. An operation is called _active_ if (1) its key is active, //! (2) it is an update operation, and (3) it is the most recent operation for that key. //! //! # Database Lifecycle //! //! All variants are modified through a batch API that follows a common pattern: //! 1. Create a batch from the database. //! 2. Stage mutations on the batch. //! 3. Merkleize the batch -- this resolves mutations against the current state and computes //! the Merkle root that would result from applying them. //! 4. Inspect the root or create child batches. //! 5. Apply the batch to the database (uncommitted ancestors are applied automatically). //! //! The specific mutation methods vary by variant. //! See each variant's module documentation for the concrete API and usage examples. //! //! Persistence and cleanup are managed directly on the database: `sync()`, `prune()`, //! and `destroy()`. //! //! # Traits //! //! Keyed mutable variants ([any] and [current]) implement `any::traits::DbAny` and //! [crate::Persistable]. //! //! # Acknowledgments //! //! The following resources were used as references when implementing this crate: //! //! * [QMDB: Quick Merkle Database](https://arxiv.org/abs/2501.05262) //! * [Merkle Mountain //! Ranges](https://github.com/opentimestamps/opentimestamps-server/blob/master/doc/merkle-mountain-range.md) use crate::{ index::{Cursor, Unordered as Index}, journal::{ contiguous::{Mutable, Reader}, Error as JournalError, }, merkle::{hasher::Standard as StandardHasher, Bagging, Family, Location}, qmdb::operation::Operation, }; use commonware_cryptography::Hasher as CryptoHasher; use commonware_utils::NZUsize; use core::num::NonZeroUsize; use futures::{pin_mut, StreamExt as _}; use thiserror::Error; pub mod any; pub mod batch_chain; pub(crate) mod bitmap; pub(crate) mod compact; #[cfg(test)] mod conformance; pub mod current; pub mod immutable; pub mod keyless; mod metrics; pub mod operation; pub mod store; pub mod sync; pub mod verify; pub use verify::{ create_multi_proof, create_proof_store, verify_multi_proof, verify_proof, verify_proof_and_extract_digests, verify_proof_and_pinned_nodes, }; /// Merkle peak bagging policy used by QMDB operation roots. pub(crate) const ROOT_BAGGING: Bagging = Bagging::BackwardFold; /// Return the Merkle hasher configuration used by QMDB operation roots and proofs. pub const fn hasher() -> StandardHasher { StandardHasher::new(ROOT_BAGGING) } /// Look up the inactivity floor declared at the commit immediately preceding `op_count`. /// /// `op_count` must be a non-zero commit-boundary historical size: the operation at `op_count - 1` /// must itself be a commit op (one for which `floor_of` returns `Some`). /// /// # Errors /// /// - [`Error::HistoricalFloorPruned`] if `op_count` is zero (no preceding commit exists), or if /// `op_count - 1` is retained but is not a commit op (either because the caller passed a /// non-commit-boundary size, or because pruning removed the commit that would have governed this /// size). /// - [`JournalError::ItemPruned`] if `op_count - 1` precedes the oldest retained location. pub(crate) async fn find_inactivity_floor_at( reader: &R, op_count: Location, floor_of: impl Fn(&R::Item) -> Option>, ) -> Result, Error> where F: Family, R: Reader, { let Some(last_op) = op_count.checked_sub(1) else { return Err(Error::HistoricalFloorPruned(op_count)); }; let last_op = *last_op; let bounds = reader.bounds(); if last_op < bounds.start { return Err(JournalError::ItemPruned(last_op).into()); } let op = reader.read(last_op).await?; let floor = floor_of(&op).ok_or(Error::HistoricalFloorPruned(op_count))?; if floor > Location::new(last_op) { return Err(Error::DataCorrupted( "inactivity floor exceeds commit location", )); } Ok(floor) } /// Compute the inactive peak count for a historical operation count. pub(crate) async fn inactive_peaks_at( reader: &R, op_count: Location, floor_of: impl Fn(&R::Item) -> Option>, ) -> Result> where F: Family, R: Reader, { if op_count == Location::new(0) { return Ok(0); } let floor = find_inactivity_floor_at::(reader, op_count, floor_of).await?; Ok(F::inactive_peaks(F::location_to_position(op_count), floor)) } /// Errors that can occur when interacting with an authenticated database. #[derive(Error, Debug)] pub enum Error { #[error("data corrupted: {0}")] DataCorrupted(&'static str), #[error("merkle error: {0}")] Merkle(#[from] crate::merkle::Error), #[error("metadata error: {0}")] Metadata(#[from] crate::metadata::Error), #[error("journal error: {0}")] Journal(#[from] crate::journal::Error), #[error("runtime error: {0}")] Runtime(#[from] commonware_runtime::Error), #[error("operation pruned: {0}")] OperationPruned(Location), /// The requested key was not found in the snapshot. #[error("key not found")] KeyNotFound, /// The key exists in the db, so we cannot prove its exclusion. #[error("key exists")] KeyExists, #[error("unexpected data at location: {0}")] UnexpectedData(Location), #[error("location out of bounds: {0} >= {1}")] LocationOutOfBounds(Location, Location), #[error("prune location {0} beyond minimum required location {1}")] PruneBeyondMinRequired(Location, Location), /// The batch was created from a different database state than the current one. #[error( "stale batch: db has {db_size} ops, batch requires {batch_db_size}, {batch_base_size}, or an ancestor boundary" )] StaleBatch { db_size: u64, batch_db_size: u64, batch_base_size: u64, }, /// The batch's inactivity floor is lower than the database's current floor. #[error("floor regressed: batch floor {0} < current floor {1}")] FloorRegressed(Location, Location), /// The batch's inactivity floor exceeds its own commit operation's location. The floor /// must not sit past the commit, since a subsequent `prune(floor)` would then remove the /// last readable commit from the journal. #[error("floor beyond commit location: floor {0} > commit loc {1}")] FloorBeyondSize(Location, Location), /// The inactivity floor that governed the requested `historical_size` is not retrievable from /// the journal, so the wrapper cannot derive the `inactive_peaks` count needed to construct a /// proof matching the historical root. /// /// Historical proofs require `historical_size` to be a commit-boundary: the operation at /// `historical_size - 1` must itself be a commit op declaring the governing floor. This error /// fires when the caller passes a non-commit-boundary size, or when pruning has removed the /// commit that would have governed the size. #[error("historical floor pruned for size: {0}")] HistoricalFloorPruned(Location), } impl From> for Error { fn from(e: crate::journal::authenticated::Error) -> Self { match e { crate::journal::authenticated::Error::Journal(j) => Self::Journal(j), crate::journal::authenticated::Error::Merkle(m) => Self::Merkle(m), } } } /// The size of the read buffer to use for replaying the operations log when rebuilding the /// snapshot. const SNAPSHOT_READ_BUFFER_SIZE: NonZeroUsize = NZUsize!(1 << 16); /// Builds the database's snapshot by replaying the log starting at the inactivity floor. Assumes /// the log is not pruned beyond the inactivity floor. The callback is invoked for each replayed /// operation, indicating activity status updates. The first argument of the callback is the /// activity status of the operation, and the second argument is the location of the operation it /// inactivates (if any). Returns the number of active keys in the db. pub(super) async fn build_snapshot_from_log( inactivity_floor_loc: crate::merkle::Location, reader: &C, snapshot: &mut I, mut callback: Fn, ) -> Result> where F: crate::merkle::Family, C: Reader>, I: Index>, Fn: FnMut(bool, Option>), { let bounds = reader.bounds(); let stream = reader .replay(SNAPSHOT_READ_BUFFER_SIZE, *inactivity_floor_loc) .await?; pin_mut!(stream); let last_commit_loc = bounds.end.saturating_sub(1); let mut active_keys: usize = 0; while let Some(result) = stream.next().await { let (loc, op) = result?; if let Some(key) = op.key() { if op.is_delete() { let old_loc = delete_key(snapshot, reader, key).await?; callback(false, old_loc); if old_loc.is_some() { active_keys -= 1; } } else if op.is_update() { let new_loc = crate::merkle::Location::new(loc); let old_loc = update_key(snapshot, reader, key, new_loc).await?; callback(true, old_loc); if old_loc.is_none() { active_keys += 1; } } } else if op.has_floor().is_some() { callback(loc == last_commit_loc, None); } } Ok(active_keys) } /// Delete `key` from the snapshot if it exists, using a stable log reader, and return the /// previously associated location. async fn delete_key( snapshot: &mut I, reader: &R, key: &>::Key, ) -> Result>, Error> where F: Family, I: Index>, R: Reader, R::Item: Operation, { // If the translated key is in the snapshot, get a cursor to look for the key. let Some(mut cursor) = snapshot.get_mut(key) else { return Ok(None); }; // Find the matching key among all conflicts, then delete it. let Some(loc) = find_update_op::(reader, &mut cursor, key).await? else { return Ok(None); }; cursor.delete(); Ok(Some(loc)) } /// Update `key` in the snapshot using a stable log reader, returning its old location if present. async fn update_key( snapshot: &mut I, reader: &R, key: &>::Key, new_loc: Location, ) -> Result>, Error> where F: Family, I: Index>, R: Reader, R::Item: Operation, { // If the translated key is not in the snapshot, insert the new location. Otherwise, get a // cursor to look for the key. let Some(mut cursor) = snapshot.get_mut_or_insert(key, new_loc) else { return Ok(None); }; // Find the matching key among all conflicts, then update its location. if let Some(loc) = find_update_op::(reader, &mut cursor, key).await? { assert!(new_loc > loc); cursor.update(new_loc); return Ok(Some(loc)); } // The key wasn't in the snapshot, so add it to the cursor. cursor.insert(new_loc); Ok(None) } /// Find and return the location of the update operation for `key`, if it exists. The cursor is /// positioned at the matching location, and can be used to update or delete the key. /// /// # Panics /// /// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor. async fn find_update_op( reader: &R, cursor: &mut impl Cursor>, key: &>::Key, ) -> Result>, Error> where F: Family, R: Reader, R::Item: Operation, { while let Some(&loc) = cursor.next() { let op = reader.read(*loc).await?; let k = op.key().expect("operation without key"); if *k == *key { return Ok(Some(loc)); } } Ok(None) } /// For the given `key` which is known to exist in the snapshot with location `old_loc`, update /// its location to `new_loc`. /// /// # Panics /// /// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor. fn update_known_loc>>( snapshot: &mut I, key: &[u8], old_loc: Location, new_loc: Location, ) { let mut cursor = snapshot.get_mut(key).expect("key should be known to exist"); assert!( cursor.find(|&loc| *loc == old_loc), "known key with given old_loc should have been found" ); cursor.update(new_loc); } /// For the given `key` which is known to exist in the snapshot with location `old_loc`, delete /// it from the snapshot. /// /// # Panics /// /// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor. fn delete_known_loc>>( snapshot: &mut I, key: &[u8], old_loc: Location, ) { let mut cursor = snapshot.get_mut(key).expect("key should be known to exist"); assert!( cursor.find(|&loc| *loc == old_loc), "known key with given old_loc should have been found" ); cursor.delete(); } /// A wrapper of DB state required for implementing inactivity floor management. pub(crate) struct FloorHelper< 'a, F: Family, I: Index>, C: Mutable>, > { pub snapshot: &'a mut I, pub log: &'a mut C, } impl FloorHelper<'_, F, I, C> where F: Family, I: Index>, C: Mutable>, { /// Moves the given operation to the tip of the log if it is active, rendering its old location /// inactive. If the operation was not active, then this is a no-op. Returns whether the /// operation was moved. async fn move_op_if_active( &mut self, op: C::Item, old_loc: Location, ) -> Result> { let Some(key) = op.key() else { return Ok(false); // operations without keys cannot be active }; // If we find a snapshot entry corresponding to the operation, we know it's active. { let Some(mut cursor) = self.snapshot.get_mut(key) else { return Ok(false); }; if !cursor.find(|&loc| loc == old_loc) { return Ok(false); } // Update the operation's snapshot location to point to tip. cursor.update(Location::::new(self.log.size().await)); } // Apply the operation at tip. self.log.append(&op).await?; Ok(true) } /// Raise the inactivity floor by taking one _step_, which involves searching for the first /// active operation above the inactivity floor, moving it to tip, and then setting the /// inactivity floor to the location following the moved operation. This method is therefore /// guaranteed to raise the floor by at least one. Returns the new inactivity floor location. /// /// # Panics /// /// Expects there is at least one active operation above the inactivity floor, and panics /// otherwise. async fn raise_floor( &mut self, mut inactivity_floor_loc: Location, ) -> Result, Error> { let tip_loc: Location = Location::new(self.log.size().await); loop { assert!( *inactivity_floor_loc < tip_loc, "no active operations above the inactivity floor" ); let old_loc = inactivity_floor_loc; inactivity_floor_loc += 1; let op = { let reader = self.log.reader().await; reader.read(*old_loc).await? }; if self.move_op_if_active(op, old_loc).await? { return Ok(inactivity_floor_loc); } } } }