//! A shared, generic implementation of the _Any_ QMDB. //! //! The impl blocks in this file define shared functionality across all Any QMDB variants. use super::operation::{update::Update, Operation}; use crate::{ index::Unordered as UnorderedIndex, journal::{ authenticated, contiguous::{Contiguous, Mutable, Reader}, Error as JournalError, }, merkle::{Family, Location, Proof}, qmdb::{ build_snapshot_from_log, delete_known_loc, operation::Operation as OperationTrait, update_known_loc, Error, }, Context, Persistable, }; use commonware_codec::{Codec, CodecShared}; use commonware_cryptography::Hasher; use core::num::NonZeroU64; use std::collections::HashMap; /// Type alias for the authenticated journal used by [Db]. pub(crate) type AuthenticatedLog = authenticated::Journal; /// Snapshot mutation needed to undo one operation while rewinding. enum SnapshotUndo { Replace { key: K, old_loc: Location, new_loc: Location, }, Remove { key: K, old_loc: Location, }, Insert { key: K, new_loc: Location, }, } /// An "Any" QMDB implementation generic over ordered/unordered keys and variable/fixed values. /// Consider using one of the following specialized variants instead, which may be more ergonomic: /// - [crate::qmdb::any::ordered::fixed::Db] /// - [crate::qmdb::any::ordered::variable::Db] /// - [crate::qmdb::any::unordered::fixed::Db] /// - [crate::qmdb::any::unordered::variable::Db] pub struct Db< F: Family, E: Context, C: Contiguous, I: UnorderedIndex>, H: Hasher, U: Send + Sync, > { /// A (pruned) log of all operations in order of their application. The index of each /// operation in the log is called its _location_, which is a stable identifier. /// /// # Invariants /// /// - The log is never pruned beyond the inactivity floor. /// - There is always at least one commit operation in the log. pub(crate) log: AuthenticatedLog, /// A location before which all operations are "inactive" (that is, operations before this point /// are over keys that have been updated by some operation at or after this point). pub(crate) inactivity_floor_loc: Location, /// The location of the last commit operation. pub(crate) last_commit_loc: Location, /// A snapshot of all currently active operations in the form of a map from each key to the /// location in the log containing its most recent update. /// /// # Invariant /// /// - Only references `Operation::Update`s. pub(crate) snapshot: I, /// The number of active keys in the snapshot. pub(crate) active_keys: usize, /// Marker for the update type parameter. pub(crate) _update: core::marker::PhantomData, } // Shared read-only functionality. impl Db where F: Family, E: Context, U: Update, C: Contiguous>, I: UnorderedIndex>, H: Hasher, Operation: Codec, { /// Return the inactivity floor location. This is the location before which all operations are /// known to be inactive. Operations before this point can be safely pruned. pub const fn inactivity_floor_loc(&self) -> Location { self.inactivity_floor_loc } /// Whether the snapshot currently has no active keys. pub const fn is_empty(&self) -> bool { self.active_keys == 0 } /// Get the metadata associated with the last commit. pub async fn get_metadata(&self) -> Result, crate::qmdb::Error> { match self.log.reader().await.read(*self.last_commit_loc).await? { Operation::CommitFloor(metadata, _) => Ok(metadata), _ => unreachable!("last commit is not a CommitFloor operation"), } } pub fn root(&self) -> H::Digest { self.log.root() } /// Get the value of `key` in the db, or None if it has no value. pub async fn get(&self, key: &U::Key) -> Result, crate::qmdb::Error> { // Collect to avoid holding a borrow across await points (rust-lang/rust#100013). let locs: Vec> = self.snapshot.get(key).copied().collect(); let reader = self.log.reader().await; for loc in locs { let op = reader.read(*loc).await?; let Operation::Update(data) = op else { panic!("location does not reference update operation. loc={loc}"); }; if data.key() == key { return Ok(Some(data.value().clone())); } } Ok(None) } /// Return [start, end) where `start` and `end - 1` are the Locations of the oldest and newest /// retained operations respectively. pub async fn bounds(&self) -> std::ops::Range> { let bounds = self.log.reader().await.bounds(); Location::new(bounds.start)..Location::new(bounds.end) } /// Return the pinned Merkle nodes for a lower operation boundary of `loc`. pub async fn pinned_nodes_at( &self, loc: Location, ) -> Result, crate::qmdb::Error> { if !loc.is_valid() { return Err(crate::merkle::Error::LocationOverflow(loc).into()); } let futs: Vec<_> = F::nodes_to_pin(loc) .map(|p| async move { self.log .merkle .get_node(p) .await? .ok_or(crate::merkle::Error::ElementPruned(p).into()) }) .collect(); futures::future::try_join_all(futs).await } } // Functionality requiring Mutable journal. impl Db where F: Family, E: Context, U: Update, C: Mutable>, I: UnorderedIndex>, H: Hasher, Operation: Codec, { /// Prunes historical operations prior to `prune_loc`. This does not affect the db's root or /// snapshot. /// /// # Errors /// /// - Returns [crate::qmdb::Error::PruneBeyondMinRequired] if `prune_loc` > inactivity floor. /// - Returns [`crate::merkle::Error::LocationOverflow`] if `prune_loc` > [`crate::merkle::Family::MAX_LEAVES`]. pub async fn prune(&mut self, prune_loc: Location) -> Result<(), crate::qmdb::Error> { if prune_loc > self.inactivity_floor_loc { return Err(crate::qmdb::Error::PruneBeyondMinRequired( prune_loc, self.inactivity_floor_loc, )); } self.log.prune(prune_loc).await?; Ok(()) } pub async fn historical_proof( &self, historical_size: Location, start_loc: Location, max_ops: NonZeroU64, ) -> Result<(Proof, Vec>), crate::qmdb::Error> { self.log .historical_proof(historical_size, start_loc, max_ops) .await .map_err(Into::into) } pub async fn proof( &self, loc: Location, max_ops: NonZeroU64, ) -> Result<(Proof, Vec>), crate::qmdb::Error> { self.historical_proof(self.log.size().await, loc, max_ops) .await } /// Rewind the database to `size` operations, where `size` is the location of the next append. /// /// This rewinds both the authenticated log and the in-memory snapshot, then restores metadata /// (`last_commit_loc`, `inactivity_floor_loc`, `active_keys`) for the new tip commit. /// /// # Errors /// /// Returns an error when: /// - `size` is not a valid rewind target /// - the target's required logical range is not fully retained (for example, the target /// inactivity floor is pruned) /// - `size - 1` is not a commit operation /// /// Any error from this method is fatal for this handle. Rewind may mutate journal state before /// all in-memory structures are rebuilt. Callers must drop this database handle after any `Err` /// from `rewind` and reopen from storage. /// /// Returns the list of locations restored to active state in the snapshot. /// /// A successful rewind is not restart-stable until a subsequent [`Db::commit`] or /// [`Db::sync`]. pub async fn rewind(&mut self, size: Location) -> Result>, Error> { let rewind_size = *size; let current_size = *self.last_commit_loc + 1; if rewind_size == current_size { return Ok(Vec::new()); } if rewind_size == 0 || rewind_size > current_size { return Err(Error::Journal(JournalError::InvalidRewind(rewind_size))); } // Read everything needed for rewind before mutating storage. let (rewind_floor, undos, active_keys_delta) = { let reader = self.log.reader().await; let bounds = reader.bounds(); let rewind_last_loc = Location::new(rewind_size - 1); if rewind_size <= bounds.start { return Err(Error::::Journal(JournalError::ItemPruned( *rewind_last_loc, ))); } let rewind_last_op = reader.read(*rewind_last_loc).await?; let Some(rewind_floor) = rewind_last_op.has_floor() else { return Err(Error::UnexpectedData(rewind_last_loc)); }; if *rewind_floor < bounds.start { return Err(Error::::Journal(JournalError::ItemPruned(*rewind_floor))); } let mut undos = Vec::with_capacity((current_size - rewind_size) as usize); let mut active_keys_delta = 0isize; let mut prior_state_by_key: HashMap>> = HashMap::new(); // Reconstruct key state once in a single pass from the rewind floor. for loc in *rewind_floor..current_size { let op = reader.read(loc).await?; let op_loc = Location::new(loc); match op { Operation::CommitFloor(_, _) => {} Operation::Update(update) => { let key = update.key().clone(); let previous_loc = prior_state_by_key.get(&key).copied().flatten(); if loc >= rewind_size { if let Some(previous_loc) = previous_loc { undos.push(SnapshotUndo::Replace { key: key.clone(), old_loc: op_loc, new_loc: previous_loc, }); } else { active_keys_delta -= 1; undos.push(SnapshotUndo::Remove { key: key.clone(), old_loc: op_loc, }); } } prior_state_by_key.insert(key, Some(op_loc)); } Operation::Delete(key) => { let previous_loc = prior_state_by_key.get(&key).copied().flatten(); if loc >= rewind_size { if let Some(previous_loc) = previous_loc { active_keys_delta += 1; undos.push(SnapshotUndo::Insert { key: key.clone(), new_loc: previous_loc, }); } } prior_state_by_key.insert(key, None); } } } // Undo operations must run from newest to oldest removed operation. undos.reverse(); (rewind_floor, undos, active_keys_delta) }; // Journal rewind happens before in-memory undo application. If any later step fails, this // handle may be internally diverged and must be dropped by the caller. This step is not // restart-stable until a later commit/sync boundary. self.log.rewind(rewind_size).await?; let mut restored_locs = Vec::new(); for undo in undos { match undo { SnapshotUndo::Replace { key, old_loc, new_loc, } => { if new_loc < rewind_size { restored_locs.push(new_loc); } update_known_loc(&mut self.snapshot, &key, old_loc, new_loc); } SnapshotUndo::Remove { key, old_loc } => { delete_known_loc(&mut self.snapshot, &key, old_loc) } SnapshotUndo::Insert { key, new_loc } => { if new_loc < rewind_size { restored_locs.push(new_loc); } self.snapshot.insert(&key, new_loc); } } } self.active_keys = self .active_keys .checked_add_signed(active_keys_delta) .ok_or(Error::DataCorrupted( "active_keys underflow while rewinding", ))?; self.last_commit_loc = Location::new(rewind_size - 1); self.inactivity_floor_loc = rewind_floor; Ok(restored_locs) } } // Functionality requiring Mutable + Persistable journal. impl Db where F: Family, E: Context, U: Update, C: Mutable> + Persistable, I: UnorderedIndex>, H: Hasher, Operation: Codec, { /// Returns a [Db] initialized from `log`, using `callback` to report snapshot /// building events. /// /// # Panics /// /// Panics if the log is empty or the last operation is not a commit floor operation. pub async fn init_from_log( mut index: I, log: AuthenticatedLog, known_inactivity_floor: Option>, mut callback: Cb, ) -> Result> where Cb: FnMut(bool, Option>), { // If the last-known inactivity floor is behind the current floor, then invoke the callback // appropriately to report the inactive bits. let (last_commit_loc, inactivity_floor_loc, active_keys) = { let reader = log.reader().await; let last_commit_loc = reader .bounds() .end .checked_sub(1) .expect("commit should exist"); let last_commit = reader.read(last_commit_loc).await?; let inactivity_floor_loc = last_commit.has_floor().expect("should be a commit"); if let Some(known_inactivity_floor) = known_inactivity_floor { (*known_inactivity_floor..*inactivity_floor_loc) .for_each(|_| callback(false, None)); } let active_keys = build_snapshot_from_log(inactivity_floor_loc, &reader, &mut index, callback) .await?; ( Location::new(last_commit_loc), inactivity_floor_loc, active_keys, ) }; Ok(Self { log, inactivity_floor_loc, snapshot: index, last_commit_loc, active_keys, _update: core::marker::PhantomData, }) } /// Sync all database state to disk. pub async fn sync(&self) -> Result<(), crate::qmdb::Error> { self.log.sync().await.map_err(Into::into) } /// Durably commit the journal state published by prior [`Db::apply_batch`] /// calls. pub async fn commit(&self) -> Result<(), crate::qmdb::Error> { self.log.commit().await.map_err(Into::into) } /// Destroy the db, removing all data from disk. pub async fn destroy(self) -> Result<(), crate::qmdb::Error> { self.log.destroy().await.map_err(Into::into) } } impl Persistable for Db where F: Family, E: Context, U: Update, C: Mutable> + Persistable, I: UnorderedIndex>, H: Hasher, Operation: Codec, { type Error = crate::qmdb::Error; async fn commit(&self) -> Result<(), crate::qmdb::Error> { Self::commit(self).await } async fn sync(&self) -> Result<(), crate::qmdb::Error> { Self::sync(self).await } async fn destroy(self) -> Result<(), crate::qmdb::Error> { self.destroy().await } }