//! An authenticated database that only supports adding new keyed values (no updates or //! deletions), where values can have varying sizes. use crate::{ index::{unordered::Index, Unordered as _}, journal::{ authenticated, contiguous::variable::{self, Config as JournalConfig}, }, mmr::{ journaled::{Config as MmrConfig, Mmr}, mem::{Clean, Dirty, State}, Location, Position, Proof, StandardHasher as Standard, }, qmdb::{any::VariableValue, build_snapshot_from_log, Error}, translator::Translator, }; use commonware_codec::Read; use commonware_cryptography::{DigestOf, Hasher as CHasher}; use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage, ThreadPool}; use commonware_utils::Array; use std::num::{NonZeroU64, NonZeroUsize}; use tracing::warn; mod operation; pub use operation::Operation; type Journal = authenticated::Journal>, H, S>; pub mod sync; /// Configuration for an [Immutable] authenticated db. #[derive(Clone)] pub struct Config { /// The name of the [RStorage] partition used for the MMR's backing journal. pub mmr_journal_partition: String, /// The items per blob configuration value used by the MMR journal. pub mmr_items_per_blob: NonZeroU64, /// The size of the write buffer to use for each blob in the MMR journal. pub mmr_write_buffer: NonZeroUsize, /// The name of the [RStorage] partition used for the MMR's metadata. pub mmr_metadata_partition: String, /// The name of the [RStorage] partition used to persist the log of operations. pub log_partition: String, /// The size of the write buffer to use for each blob in the log journal. pub log_write_buffer: NonZeroUsize, /// Optional compression level (using `zstd`) to apply to log data before storing. pub log_compression: Option, /// The codec configuration to use for encoding and decoding log items. pub log_codec_config: C, /// The number of items to put in each section of the journal. pub log_items_per_section: NonZeroU64, /// The translator used by the compressed index. pub translator: T, /// An optional thread pool to use for parallelizing batch operations. pub thread_pool: Option, /// The buffer pool to use for caching data. pub buffer_pool: PoolRef, } /// An authenticated database that only supports adding new keyed values (no updates or /// deletions), where values can have varying sizes. pub struct Immutable< E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator, S: State> = Clean>, > { /// Authenticated journal of operations. journal: Journal, /// A map from each active key to the location of the operation that set its value. /// /// # Invariant /// /// Only references operations of type [Operation::Set]. snapshot: Index, /// The location of the last commit operation. last_commit_loc: Location, } impl< E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator, S: State>, > Immutable { /// Return the oldest location that remains retrievable. pub fn oldest_retained_loc(&self) -> Location { self.journal .oldest_retained_loc() .expect("at least one operation should exist") } /// Get the value of `key` in the db, or None if it has no value or its corresponding operation /// has been pruned. pub async fn get(&self, key: &K) -> Result, Error> { let oldest = self.oldest_retained_loc(); let iter = self.snapshot.get(key); for &loc in iter { if loc < oldest { continue; } if let Some(v) = self.get_from_loc(key, loc).await? { return Ok(Some(v)); } } Ok(None) } /// Get the value of the operation with location `loc` in the db if it matches `key`. Returns /// [Error::OperationPruned] if loc precedes the oldest retained location. The location is /// otherwise assumed valid. async fn get_from_loc(&self, key: &K, loc: Location) -> Result, Error> { if loc < self.oldest_retained_loc() { return Err(Error::OperationPruned(loc)); } let Operation::Set(k, v) = self.journal.read(loc).await? else { return Err(Error::UnexpectedData(loc)); }; if k != *key { Ok(None) } else { Ok(Some(v)) } } /// Get the number of operations that have been applied to this db, including those that are not /// yet committed. pub fn op_count(&self) -> Location { self.journal.size() } /// Get the metadata associated with the last commit. pub async fn get_metadata(&self) -> Result, Error> { let last_commit_loc = self.last_commit_loc; let Operation::Commit(metadata) = self.journal.read(last_commit_loc).await? else { unreachable!("no commit operation at location of last commit {last_commit_loc}"); }; Ok(metadata) } /// Update the operations MMR with the given operation, and append the operation to the log. The /// `commit` method must be called to make any applied operation persistent & recoverable. pub(super) async fn apply_op(&mut self, op: Operation) -> Result<(), Error> { self.journal.append(op).await?; Ok(()) } } impl Immutable> { /// Returns an [Immutable] qmdb initialized from `cfg`. Any uncommitted log operations will be /// discarded and the state of the db will be as of the last committed operation. pub async fn init( context: E, cfg: Config as Read>::Cfg>, ) -> Result { let mmr_cfg = MmrConfig { journal_partition: cfg.mmr_journal_partition, metadata_partition: cfg.mmr_metadata_partition, items_per_blob: cfg.mmr_items_per_blob, write_buffer: cfg.mmr_write_buffer, thread_pool: cfg.thread_pool, buffer_pool: cfg.buffer_pool.clone(), }; let journal_cfg = JournalConfig { partition: cfg.log_partition, items_per_section: cfg.log_items_per_section, compression: cfg.log_compression, codec_config: cfg.log_codec_config, buffer_pool: cfg.buffer_pool.clone(), write_buffer: cfg.log_write_buffer, }; let mut journal = Journal::new( context.clone(), mmr_cfg, journal_cfg, Operation::::is_commit, ) .await?; if journal.size() == 0 { warn!("Authenticated log is empty, initialized new db."); journal.append(Operation::Commit(None)).await?; journal.sync().await?; } let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator.clone()); // Get the start of the log. let start_loc = journal.pruning_boundary(); // Build snapshot from the log. build_snapshot_from_log(start_loc, &journal.journal, &mut snapshot, |_, _| {}).await?; let last_commit_loc = journal.size().checked_sub(1).expect("commit should exist"); Ok(Self { journal, snapshot, last_commit_loc, }) } /// The number of operations to apply to the MMR in a single batch. const APPLY_BATCH_SIZE: u64 = 1 << 16; /// Returns an [Immutable] built from the config and sync data in `cfg`. #[allow(clippy::type_complexity)] pub async fn init_synced( context: E, cfg: sync::Config as Read>::Cfg>, ) -> Result { let mut hasher = Standard::new(); // Initialize MMR for sync let mmr = Mmr::init_sync( context.with_label("mmr"), crate::mmr::journaled::SyncConfig { config: MmrConfig { journal_partition: cfg.db_config.mmr_journal_partition, metadata_partition: cfg.db_config.mmr_metadata_partition, items_per_blob: cfg.db_config.mmr_items_per_blob, write_buffer: cfg.db_config.mmr_write_buffer, thread_pool: cfg.db_config.thread_pool.clone(), buffer_pool: cfg.db_config.buffer_pool.clone(), }, range: Position::try_from(cfg.range.start)? ..Position::try_from(cfg.range.end.saturating_add(1))?, pinned_nodes: cfg.pinned_nodes, }, &mut hasher, ) .await?; let journal = Journal::<_, _, _, _, Clean>>::from_components( mmr, cfg.log, hasher, Self::APPLY_BATCH_SIZE, ) .await?; let mut snapshot: Index = Index::new( context.with_label("snapshot"), cfg.db_config.translator.clone(), ); // Get the start of the log. let start_loc = journal.pruning_boundary(); // Build snapshot from the log build_snapshot_from_log(start_loc, &journal.journal, &mut snapshot, |_, _| {}).await?; let last_commit_loc = journal.size().checked_sub(1).expect("commit should exist"); let mut db = Self { journal, snapshot, last_commit_loc, }; db.sync().await?; Ok(db) } /// Prune historical operations prior to `prune_loc`. This does not affect the db's root or /// current snapshot. /// /// # Errors /// /// - Returns [Error::PruneBeyondMinRequired] if `prune_loc` > inactivity floor. /// - Returns [crate::mmr::Error::LocationOverflow] if `prune_loc` > [crate::mmr::MAX_LOCATION]. pub async fn prune(&mut self, loc: Location) -> Result<(), Error> { if loc > self.last_commit_loc { return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc)); } self.journal.prune(loc).await?; Ok(()) } /// Sets `key` to have value `value`, assuming `key` hasn't already been assigned. The operation /// is reflected in the snapshot, but will be subject to rollback until the next successful /// `commit`. Attempting to set an already-set key results in undefined behavior. /// /// Any keys that have been pruned and map to the same translated key will be dropped /// during this call. pub async fn set(&mut self, key: K, value: V) -> Result<(), Error> { let op_count = self.op_count(); let oldest = self.oldest_retained_loc(); self.snapshot .insert_and_prune(&key, op_count, |v| *v < oldest); let op = Operation::Set(key, value); self.apply_op(op).await } /// Return the root of the db. pub const fn root(&self) -> H::Digest { self.journal.root() } /// Generate and return: /// 1. a proof of all operations applied to the db in the range starting at (and including) /// location `start_loc`, and ending at the first of either: /// - the last operation performed, or /// - the operation `max_ops` from the start. /// 2. the operations corresponding to the leaves in this range. pub async fn proof( &self, start_index: Location, max_ops: NonZeroU64, ) -> Result<(Proof, Vec>), Error> { let op_count = self.op_count(); self.historical_proof(op_count, start_index, max_ops).await } /// Analogous to proof but with respect to the state of the database when it had `op_count` /// operations. /// /// # Errors /// /// Returns [crate::mmr::Error::LocationOverflow] if `op_count` or `start_loc` > /// [crate::mmr::MAX_LOCATION]. /// Returns [crate::mmr::Error::RangeOutOfBounds] if `op_count` > number of operations, or /// if `start_loc` >= `op_count`. /// Returns [`Error::OperationPruned`] if `start_loc` has been pruned. pub async fn historical_proof( &self, op_count: Location, start_loc: Location, max_ops: NonZeroU64, ) -> Result<(Proof, Vec>), Error> { Ok(self .journal .historical_proof(op_count, start_loc, max_ops) .await?) } /// Commit any pending operations to the database, ensuring their durability upon return from /// this function. Caller can associate an arbitrary `metadata` value with the commit. /// /// Failures after commit (but before `sync` or `close`) may still require reprocessing to /// recover the database on restart. pub async fn commit(&mut self, metadata: Option) -> Result<(), Error> { let loc = self.journal.append(Operation::Commit(metadata)).await?; self.journal.commit().await?; self.last_commit_loc = loc; Ok(()) } /// Sync all database state to disk. While this isn't necessary to ensure durability of /// committed operations, periodic invocation may reduce memory usage and the time required to /// recover the database on restart. pub(super) async fn sync(&mut self) -> Result<(), Error> { Ok(self.journal.sync().await?) } /// Close the db. Operations that have not been committed will be lost. pub async fn close(self) -> Result<(), Error> { Ok(self.journal.close().await?) } /// Destroy the db, removing all data from disk. pub async fn destroy(self) -> Result<(), Error> { Ok(self.journal.destroy().await?) } /// Convert this database into its dirty counterpart for batched updates. pub fn into_dirty(self) -> Immutable { Immutable { journal: self.journal.into_dirty(), snapshot: self.snapshot, last_commit_loc: self.last_commit_loc, } } /// Simulate a failed commit that successfully writes the log to the commit point, but without /// fully committing the MMR's cached elements to trigger MMR node recovery on reopening. #[cfg(test)] pub async fn simulate_failed_commit_mmr(mut self, write_limit: usize) -> Result<(), Error> where V: Default, { self.apply_op(Operation::Commit(None)).await?; self.journal.journal.close().await?; self.journal.mmr.simulate_partial_sync(write_limit).await?; Ok(()) } /// Simulate a failed commit that successfully writes the MMR to the commit point, but without /// fully committing the log, requiring rollback of the MMR and log upon reopening. #[cfg(test)] pub async fn simulate_failed_commit_log(mut self) -> Result<(), Error> where V: Default, { self.apply_op(Operation::Commit(None)).await?; let log_size = self.journal.journal.size(); self.journal.mmr.close().await?; // Rewind the operation log over the commit op to force rollback to the previous commit. if log_size > 0 { self.journal.journal.rewind(log_size - 1).await?; } self.journal.journal.close().await?; Ok(()) } } impl Immutable { /// Merkleize the database and compute the root digest. pub fn merkleize(self) -> Immutable> { Immutable { journal: self.journal.merkleize(), snapshot: self.snapshot, last_commit_loc: self.last_commit_loc, } } } impl crate::store::Store for Immutable> { type Key = K; type Value = V; type Error = Error; async fn get(&self, key: &Self::Key) -> Result, Self::Error> { self.get(key).await } } impl< E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator, S: State>, > crate::qmdb::store::LogStore for Immutable { type Value = V; fn op_count(&self) -> Location { self.op_count() } // All unpruned operations are active in an immutable store. fn inactivity_floor_loc(&self) -> Location { self.journal.pruning_boundary() } fn is_empty(&self) -> bool { self.op_count() == 0 } async fn get_metadata(&self) -> Result, Error> { self.get_metadata().await } } impl crate::qmdb::store::CleanStore for Immutable> { type Digest = H::Digest; type Operation = Operation; type Dirty = Immutable; fn root(&self) -> Self::Digest { self.root() } async fn proof( &self, start_loc: Location, max_ops: NonZeroU64, ) -> Result<(Proof, Vec), Error> { self.proof(start_loc, max_ops).await } async fn historical_proof( &self, historical_size: Location, start_loc: Location, max_ops: NonZeroU64, ) -> Result<(Proof, Vec), Error> { self.historical_proof(historical_size, start_loc, max_ops) .await } fn into_dirty(self) -> Self::Dirty { self.into_dirty() } } impl crate::qmdb::store::DirtyStore for Immutable { type Digest = H::Digest; type Operation = Operation; type Clean = Immutable>; async fn merkleize(self) -> Result { Ok(self.merkleize()) } } #[cfg(test)] pub(super) mod test { use super::*; use crate::{qmdb::verify_proof, translator::TwoCap}; use commonware_cryptography::{sha256::Digest, Sha256}; use commonware_macros::test_traced; use commonware_runtime::{ deterministic::{self}, Runner as _, }; use commonware_utils::{NZUsize, NZU64}; const PAGE_SIZE: usize = 77; const PAGE_CACHE_SIZE: usize = 9; const ITEMS_PER_SECTION: u64 = 5; pub(crate) fn db_config( suffix: &str, ) -> Config, ())> { Config { mmr_journal_partition: format!("journal_{suffix}"), mmr_metadata_partition: format!("metadata_{suffix}"), mmr_items_per_blob: NZU64!(11), mmr_write_buffer: NZUsize!(1024), log_partition: format!("log_{suffix}"), log_items_per_section: NZU64!(ITEMS_PER_SECTION), log_compression: None, log_codec_config: ((0..=10000).into(), ()), log_write_buffer: NZUsize!(1024), translator: TwoCap, thread_pool: None, buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)), } } /// A type alias for the concrete [Immutable] type used in these unit tests. type ImmutableTest = Immutable, Sha256, TwoCap>; /// Return an [Immutable] database initialized with a fixed config. async fn open_db(context: deterministic::Context) -> ImmutableTest { ImmutableTest::init(context, db_config("partition")) .await .unwrap() } #[test_traced("WARN")] pub fn test_immutable_db_empty() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let mut db = open_db(context.clone()).await; assert_eq!(db.op_count(), 1); assert_eq!(db.oldest_retained_loc(), Location::new_unchecked(0)); assert!(db.get_metadata().await.unwrap().is_none()); // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op. let k1 = Sha256::fill(1u8); let v1 = vec![4, 5, 6, 7]; let root = db.root(); db.set(k1, v1).await.unwrap(); db.close().await.unwrap(); let mut db = open_db(context.clone()).await; assert_eq!(db.root(), root); assert_eq!(db.op_count(), 1); // Test calling commit on an empty db which should make it (durably) non-empty. db.commit(None).await.unwrap(); assert_eq!(db.op_count(), 2); // commit op added let root = db.root(); db.close().await.unwrap(); let db = open_db(context.clone()).await; assert_eq!(db.root(), root); db.destroy().await.unwrap(); }); } #[test_traced("DEBUG")] pub fn test_immutable_db_build_basic() { let executor = deterministic::Runner::default(); executor.start(|context| async move { // Build a db with 2 keys. let mut db = open_db(context.clone()).await; let k1 = Sha256::fill(1u8); let k2 = Sha256::fill(2u8); let v1 = vec![1, 2, 3]; let v2 = vec![4, 5, 6, 7, 8]; assert!(db.get(&k1).await.unwrap().is_none()); assert!(db.get(&k2).await.unwrap().is_none()); // Set the first key. db.set(k1, v1.clone()).await.unwrap(); assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1); assert!(db.get(&k2).await.unwrap().is_none()); assert_eq!(db.op_count(), 2); // Commit the first key. let metadata = Some(vec![99, 100]); db.commit(metadata.clone()).await.unwrap(); assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1); assert!(db.get(&k2).await.unwrap().is_none()); assert_eq!(db.op_count(), 3); assert_eq!(db.get_metadata().await.unwrap(), metadata.clone()); // Set the second key. db.set(k2, v2.clone()).await.unwrap(); assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1); assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2); assert_eq!(db.op_count(), 4); // Make sure we can still get metadata. assert_eq!(db.get_metadata().await.unwrap(), metadata); // Commit the second key. db.commit(None).await.unwrap(); assert_eq!(db.op_count(), 5); assert_eq!(db.get_metadata().await.unwrap(), None); // Capture state. let root = db.root(); // Add an uncommitted op then close the db. let k3 = Sha256::fill(3u8); let v3 = vec![9, 10, 11]; db.set(k3, v3).await.unwrap(); assert_eq!(db.op_count(), 6); assert_ne!(db.root(), root); // Close & reopen, make sure state is restored to last commit point. db.close().await.unwrap(); let db = open_db(context.clone()).await; assert!(db.get(&k3).await.unwrap().is_none()); assert_eq!(db.op_count(), 5); assert_eq!(db.root(), root); assert_eq!(db.get_metadata().await.unwrap(), None); // Cleanup. db.destroy().await.unwrap(); }); } #[test_traced("WARN")] pub fn test_immutable_db_build_and_authenticate() { let executor = deterministic::Runner::default(); // Build a db with `ELEMENTS` key/value pairs and prove ranges over them. const ELEMENTS: u64 = 2_000; executor.start(|context| async move { let mut hasher = Standard::::new(); let mut db = open_db(context.clone()).await; for i in 0u64..ELEMENTS { let k = Sha256::hash(&i.to_be_bytes()); let v = vec![i as u8; 100]; db.set(k, v).await.unwrap(); } assert_eq!(db.op_count(), ELEMENTS + 1); db.commit(None).await.unwrap(); assert_eq!(db.op_count(), ELEMENTS + 2); // Close & reopen the db, making sure the re-opened db has exactly the same state. let root = db.root(); db.close().await.unwrap(); let db = open_db(context.clone()).await; assert_eq!(root, db.root()); assert_eq!(db.op_count(), ELEMENTS + 2); for i in 0u64..ELEMENTS { let k = Sha256::hash(&i.to_be_bytes()); let v = vec![i as u8; 100]; assert_eq!(db.get(&k).await.unwrap().unwrap(), v); } // Make sure all ranges of 5 operations are provable, including truncated ranges at the // end. let max_ops = NZU64!(5); for i in 0..*db.op_count() { let (proof, log) = db.proof(Location::new_unchecked(i), max_ops).await.unwrap(); assert!(verify_proof( &mut hasher, &proof, Location::new_unchecked(i), &log, &root )); } db.destroy().await.unwrap(); }); } #[test_traced("WARN")] pub fn test_immutable_db_recovery_from_failed_mmr_sync() { let executor = deterministic::Runner::default(); executor.start(|context| async move { // Insert 1000 keys then sync. const ELEMENTS: u64 = 1000; let mut db = open_db(context.clone()).await; for i in 0u64..ELEMENTS { let k = Sha256::hash(&i.to_be_bytes()); let v = vec![i as u8; 100]; db.set(k, v).await.unwrap(); } assert_eq!(db.op_count(), ELEMENTS + 1); db.sync().await.unwrap(); let halfway_root = db.root(); // Insert another 1000 keys then simulate a failed close and test recovery. for i in 0u64..ELEMENTS { let k = Sha256::hash(&i.to_be_bytes()); let v = vec![i as u8; 100]; db.set(k, v).await.unwrap(); } // We partially write only 101 of the cached MMR nodes to simulate a failure. db.simulate_failed_commit_mmr(101).await.unwrap(); // Recovery should replay the log to regenerate the mmr. let db = open_db(context.clone()).await; assert_eq!(db.op_count(), 2002); let root = db.root(); assert_ne!(root, halfway_root); // Close & reopen could preserve the final commit. db.close().await.unwrap(); let db = open_db(context.clone()).await; assert_eq!(db.op_count(), 2002); assert_eq!(db.root(), root); db.destroy().await.unwrap(); }); } #[test_traced("WARN")] pub fn test_immutable_db_recovery_from_failed_log_sync() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let mut db = open_db(context.clone()).await; // Insert a single key and then commit to create a first commit point. let k1 = Sha256::fill(1u8); let v1 = vec![1, 2, 3]; db.set(k1, v1).await.unwrap(); db.commit(None).await.unwrap(); let first_commit_root = db.root(); // Insert 1000 keys then sync. const ELEMENTS: u64 = 1000; for i in 0u64..ELEMENTS { let k = Sha256::hash(&i.to_be_bytes()); let v = vec![i as u8; 100]; db.set(k, v).await.unwrap(); } assert_eq!(db.op_count(), ELEMENTS + 3); db.sync().await.unwrap(); // Insert another 1000 keys then simulate a failed close and test recovery. for i in 0u64..ELEMENTS { let k = Sha256::hash(&i.to_be_bytes()); let v = vec![i as u8; 100]; db.set(k, v).await.unwrap(); } // Simulate failure to write the full locations map. db.simulate_failed_commit_log().await.unwrap(); // Recovery should back up to previous commit point. let db = open_db(context.clone()).await; assert_eq!(db.op_count(), 3); let root = db.root(); assert_eq!(root, first_commit_root); db.destroy().await.unwrap(); }); } #[test_traced("WARN")] pub fn test_immutable_db_pruning() { let executor = deterministic::Runner::default(); // Build a db with `ELEMENTS` key/value pairs then prune some of them. const ELEMENTS: u64 = 2_000; executor.start(|context| async move { let mut db = open_db(context.clone()).await; for i in 1u64..ELEMENTS+1 { let k = Sha256::hash(&i.to_be_bytes()); let v = vec![i as u8; 100]; db.set(k, v).await.unwrap(); } assert_eq!(db.op_count(), ELEMENTS + 1); db.commit(None).await.unwrap(); assert_eq!(db.op_count(), ELEMENTS + 2); // Prune the db to the first half of the operations. db.prune(Location::new_unchecked((ELEMENTS+2) / 2)) .await .unwrap(); assert_eq!(db.op_count(), ELEMENTS + 2); // items_per_section is 5, so half should be exactly at a blob boundary, in which case // the actual pruning location should match the requested. let oldest_retained_loc = db.oldest_retained_loc(); assert_eq!(oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2)); // Try to fetch a pruned key. let pruned_loc = oldest_retained_loc - 1; let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes()); assert!(db.get(&pruned_key).await.unwrap().is_none()); // Try to fetch unpruned key. let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes()); assert!(db.get(&unpruned_key).await.unwrap().is_some()); // Close & reopen the db, making sure the re-opened db has exactly the same state. let root = db.root(); db.close().await.unwrap(); let mut db = open_db(context.clone()).await; assert_eq!(root, db.root()); assert_eq!(db.op_count(), ELEMENTS + 2); let oldest_retained_loc = db.oldest_retained_loc(); assert_eq!(oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2)); // Prune to a non-blob boundary. let loc = Location::new_unchecked(ELEMENTS / 2 + (ITEMS_PER_SECTION * 2 - 1)); db.prune(loc).await.unwrap(); // Actual boundary should be a multiple of 5. let oldest_retained_loc = db.oldest_retained_loc(); assert_eq!( oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2 + ITEMS_PER_SECTION) ); // Confirm boundary persists across restart. db.close().await.unwrap(); let db = open_db(context.clone()).await; let oldest_retained_loc = db.oldest_retained_loc(); assert_eq!( oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2 + ITEMS_PER_SECTION) ); // Try to fetch a pruned key. let pruned_loc = oldest_retained_loc - 3; let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes()); assert!(db.get(&pruned_key).await.unwrap().is_none()); // Try to fetch unpruned key. let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes()); assert!(db.get(&unpruned_key).await.unwrap().is_some()); // Confirm behavior of trying to create a proof of pruned items is as expected. let pruned_pos = ELEMENTS / 2; let proof_result = db .proof( Location::new_unchecked(pruned_pos), NZU64!(pruned_pos + 100), ) .await; assert!(matches!(proof_result, Err(Error::Journal(crate::journal::Error::ItemPruned(pos))) if pos == pruned_pos)); db.destroy().await.unwrap(); }); } #[test_traced("INFO")] pub fn test_immutable_db_prune_beyond_commit() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let mut db = open_db(context.clone()).await; // Test pruning empty database (no commits) let result = db.prune(Location::new_unchecked(1)).await; assert!( matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc)) if prune_loc == Location::new_unchecked(1) && commit_loc == Location::new_unchecked(0)) ); // Add key-value pairs and commit let k1 = Digest::from(*b"12345678901234567890123456789012"); let k2 = Digest::from(*b"abcdefghijklmnopqrstuvwxyz123456"); let k3 = Digest::from(*b"99999999999999999999999999999999"); let v1 = vec![1u8; 16]; let v2 = vec![2u8; 16]; let v3 = vec![3u8; 16]; db.set(k1, v1.clone()).await.unwrap(); db.set(k2, v2.clone()).await.unwrap(); db.commit(None).await.unwrap(); db.set(k3, v3.clone()).await.unwrap(); // op_count is 4 (k1, k2, commit, k3), last_commit is at location 2 assert_eq!(*db.last_commit_loc, 3); // Test valid prune (at last commit) assert!(db.prune(db.last_commit_loc).await.is_ok()); // Add more and commit again db.commit(None).await.unwrap(); let new_last_commit = db.last_commit_loc; // Test pruning beyond last commit let beyond = new_last_commit + 1; let result = db.prune(beyond).await; assert!( matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc)) if prune_loc == beyond && commit_loc == new_last_commit) ); db.destroy().await.unwrap(); }); } }