//! A collection of authenticated databases inspired by QMDB (Quick Merkle Database). //! //! # Terminology //! //! A database's state is derived from an append-only log of state-changing _operations_. //! //! In a _keyed_ database, a _key_ either has a _value_ or it doesn't, and different types of //! operations modify the state of a specific key. A key that has a value can change to one without //! a value through the _delete_ operation. The _update_ operation gives a key a specific value. We //! sometimes call an update for a key that doesn't already have a value a _create_ operation, but //! its representation in the log is the same. //! //! Keys with values are called _active_. An operation is called _active_ if (1) its key is active, //! (2) it is an update operation, and (3) it is the most recent operation for that key. //! //! # Database Lifecycle //! //! All variants are modified through a batch API that follows a common pattern: //! 1. Create a batch from the database. //! 2. Stage mutations on the batch. //! 3. Merkleize the batch -- this resolves mutations against the current state and computes //! the Merkle root that would result from applying them. //! 4. Inspect the root or create child batches. //! 5. Finalize the batch into a changeset. //! 6. Apply the changeset to the database. //! //! A merkleized batch can spawn child batches, forming a tree of speculative states that //! share a common ancestor. Only the finalized leaf needs to be applied. //! //! The specific mutation methods vary by variant. //! See each variant's module documentation for the concrete API and usage examples. //! //! Persistence and cleanup are managed directly on the database: `sync()`, `prune()`, //! and `destroy()`. //! //! # Traits //! //! All variants implement [store::LogStore] and [store::MerkleizedStore]. Keyed mutable //! variants ([any] and [current]) also implement [store::PrunableStore] and //! [crate::Persistable]. The [immutable] variant implements [crate::kv::Gettable]. //! //! # Acknowledgments //! //! The following resources were used as references when implementing this crate: //! //! * [QMDB: Quick Merkle Database](https://arxiv.org/abs/2501.05262) //! * [Merkle Mountain //! Ranges](https://github.com/opentimestamps/opentimestamps-server/blob/master/doc/merkle-mountain-range.md) use crate::{ index::{Cursor, Unordered as Index}, journal::contiguous::{Mutable, Reader}, mmr::Location, qmdb::operation::Operation, }; use commonware_utils::NZUsize; use core::num::NonZeroUsize; use futures::{pin_mut, StreamExt as _}; use thiserror::Error; pub mod any; pub mod current; pub mod immutable; pub mod keyless; pub mod operation; pub mod store; pub mod sync; pub mod verify; pub use verify::{ create_multi_proof, create_proof_store, create_proof_store_from_digests, extract_pinned_nodes, verify_multi_proof, verify_proof, verify_proof_and_extract_digests, }; /// Errors that can occur when interacting with an authenticated database. #[derive(Error, Debug)] pub enum Error { #[error("data corrupted: {0}")] DataCorrupted(&'static str), #[error("mmr error: {0}")] Mmr(#[from] crate::mmr::Error), #[error("metadata error: {0}")] Metadata(#[from] crate::metadata::Error), #[error("journal error: {0}")] Journal(#[from] crate::journal::Error), #[error("runtime error: {0}")] Runtime(#[from] commonware_runtime::Error), #[error("operation pruned: {0}")] OperationPruned(Location), /// The requested key was not found in the snapshot. #[error("key not found")] KeyNotFound, /// The key exists in the db, so we cannot prove its exclusion. #[error("key exists")] KeyExists, #[error("unexpected data at location: {0}")] UnexpectedData(Location), #[error("location out of bounds: {0} >= {1}")] LocationOutOfBounds(Location, Location), #[error("prune location {0} beyond minimum required location {1}")] PruneBeyondMinRequired(Location, Location), /// The changeset was created from a different database state than the current one. #[error("stale changeset: batch expected db size {expected}, but db has {actual}")] StaleChangeset { expected: u64, actual: u64 }, } impl From for Error { fn from(e: crate::journal::authenticated::Error) -> Self { match e { crate::journal::authenticated::Error::Journal(j) => Self::Journal(j), crate::journal::authenticated::Error::Mmr(m) => Self::Mmr(m), } } } /// The size of the read buffer to use for replaying the operations log when rebuilding the /// snapshot. const SNAPSHOT_READ_BUFFER_SIZE: NonZeroUsize = NZUsize!(1 << 16); /// Builds the database's snapshot by replaying the log starting at the inactivity floor. Assumes /// the log is not pruned beyond the inactivity floor. The callback is invoked for each replayed /// operation, indicating activity status updates. The first argument of the callback is the /// activity status of the operation, and the second argument is the location of the operation it /// inactivates (if any). Returns the number of active keys in the db. pub(super) async fn build_snapshot_from_log( inactivity_floor_loc: Location, reader: &C, snapshot: &mut I, mut callback: F, ) -> Result where C: Reader, I: Index, F: FnMut(bool, Option), { let bounds = reader.bounds(); let stream = reader .replay(SNAPSHOT_READ_BUFFER_SIZE, *inactivity_floor_loc) .await?; pin_mut!(stream); let last_commit_loc = bounds.end.saturating_sub(1); let mut active_keys: usize = 0; while let Some(result) = stream.next().await { let (loc, op) = result?; if let Some(key) = op.key() { if op.is_delete() { let old_loc = delete_key(snapshot, reader, key).await?; callback(false, old_loc); if old_loc.is_some() { active_keys -= 1; } } else if op.is_update() { let new_loc = Location::new(loc); let old_loc = update_key(snapshot, reader, key, new_loc).await?; callback(true, old_loc); if old_loc.is_none() { active_keys += 1; } } } else if op.has_floor().is_some() { callback(loc == last_commit_loc, None); } } Ok(active_keys) } /// Delete `key` from the snapshot if it exists, using a stable log reader, and return the /// previously associated location. async fn delete_key( snapshot: &mut I, reader: &R, key: &::Key, ) -> Result, Error> where I: Index, R: Reader, R::Item: Operation, { // If the translated key is in the snapshot, get a cursor to look for the key. let Some(mut cursor) = snapshot.get_mut(key) else { return Ok(None); }; // Find the matching key among all conflicts, then delete it. let Some(loc) = find_update_op(reader, &mut cursor, key).await? else { return Ok(None); }; cursor.delete(); Ok(Some(loc)) } /// Update `key` in the snapshot using a stable log reader, returning its old location if present. async fn update_key( snapshot: &mut I, reader: &R, key: &::Key, new_loc: Location, ) -> Result, Error> where I: Index, R: Reader, R::Item: Operation, { // If the translated key is not in the snapshot, insert the new location. Otherwise, get a // cursor to look for the key. let Some(mut cursor) = snapshot.get_mut_or_insert(key, new_loc) else { return Ok(None); }; // Find the matching key among all conflicts, then update its location. if let Some(loc) = find_update_op(reader, &mut cursor, key).await? { assert!(new_loc > loc); cursor.update(new_loc); return Ok(Some(loc)); } // The key wasn't in the snapshot, so add it to the cursor. cursor.insert(new_loc); Ok(None) } /// Find and return the location of the update operation for `key`, if it exists. The cursor is /// positioned at the matching location, and can be used to update or delete the key. /// /// # Panics /// /// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor. async fn find_update_op( reader: &R, cursor: &mut impl Cursor, key: &::Key, ) -> Result, Error> where R: Reader, R::Item: Operation, { while let Some(&loc) = cursor.next() { let op = reader.read(*loc).await?; let k = op.key().expect("operation without key"); if *k == *key { return Ok(Some(loc)); } } Ok(None) } /// For the given `key` which is known to exist in the snapshot with location `old_loc`, update /// its location to `new_loc`. /// /// # Panics /// /// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor. fn update_known_loc>( snapshot: &mut I, key: &[u8], old_loc: Location, new_loc: Location, ) { let mut cursor = snapshot.get_mut(key).expect("key should be known to exist"); assert!( cursor.find(|&loc| *loc == old_loc), "known key with given old_loc should have been found" ); cursor.update(new_loc); } /// For the given `key` which is known to exist in the snapshot with location `old_loc`, delete /// it from the snapshot. /// /// # Panics /// /// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor. fn delete_known_loc>(snapshot: &mut I, key: &[u8], old_loc: Location) { let mut cursor = snapshot.get_mut(key).expect("key should be known to exist"); assert!( cursor.find(|&loc| *loc == old_loc), "known key with given old_loc should have been found" ); cursor.delete(); } /// A wrapper of DB state required for implementing inactivity floor management. pub(crate) struct FloorHelper<'a, I: Index, C: Mutable> { pub snapshot: &'a mut I, pub log: &'a mut C, } impl FloorHelper<'_, I, C> where I: Index, C: Mutable, { /// Moves the given operation to the tip of the log if it is active, rendering its old location /// inactive. If the operation was not active, then this is a no-op. Returns whether the /// operation was moved. async fn move_op_if_active(&mut self, op: C::Item, old_loc: Location) -> Result { let Some(key) = op.key() else { return Ok(false); // operations without keys cannot be active }; // If we find a snapshot entry corresponding to the operation, we know it's active. { let Some(mut cursor) = self.snapshot.get_mut(key) else { return Ok(false); }; if !cursor.find(|&loc| loc == old_loc) { return Ok(false); } // Update the operation's snapshot location to point to tip. cursor.update(Location::new(self.log.size().await)); } // Apply the operation at tip. self.log.append(&op).await?; Ok(true) } /// Raise the inactivity floor by taking one _step_, which involves searching for the first /// active operation above the inactivity floor, moving it to tip, and then setting the /// inactivity floor to the location following the moved operation. This method is therefore /// guaranteed to raise the floor by at least one. Returns the new inactivity floor location. /// /// # Panics /// /// Expects there is at least one active operation above the inactivity floor, and panics /// otherwise. async fn raise_floor(&mut self, mut inactivity_floor_loc: Location) -> Result where I: Index, { let tip_loc = Location::new(self.log.size().await); loop { assert!( *inactivity_floor_loc < tip_loc, "no active operations above the inactivity floor" ); let old_loc = inactivity_floor_loc; inactivity_floor_loc += 1; let op = { let reader = self.log.reader().await; reader.read(*old_loc).await? }; if self.move_op_if_active(op, old_loc).await? { return Ok(inactivity_floor_loc); } } } }