//! An authenticated database that provides succinct proofs of _any_ value ever associated //! with a key, where values can have varying sizes. //! //! _If the values you wish to store all have the same size, use [crate::qmdb::any::unordered::fixed] //! instead for better performance._ use crate::{ index::unordered::Index, journal::contiguous::variable::Journal, mmr::Location, qmdb::{ any::{init_variable, unordered, value::VariableEncoding, VariableConfig, VariableValue}, operation::Key, Error, }, translator::Translator, }; use commonware_codec::{Codec, Read}; use commonware_cryptography::Hasher; use commonware_runtime::{Clock, Metrics, Storage}; pub type Update = unordered::Update>; pub type Operation = unordered::Operation>; /// A key-value QMDB based on an authenticated log of operations, supporting authentication of any /// value ever associated with a key. pub type Db = super::Db>, Index, H, Update>; impl Db where Operation: Codec, { /// Returns a [Db] QMDB initialized from `cfg`. Uncommitted log operations will be /// discarded and the state of the db will be as of the last committed operation. pub async fn init( context: E, cfg: VariableConfig as Read>::Cfg>, ) -> Result { Self::init_with_callback(context, cfg, None, |_, _| {}).await } /// Initialize the DB, invoking `callback` for each operation processed during recovery. /// /// If `known_inactivity_floor` is provided and is less than the log's actual inactivity floor, /// `callback` is invoked with `(false, None)` for each location in the gap. Then, as the /// snapshot is built from the log, `callback` is invoked for each operation with its activity /// status and previous location (if any). pub(crate) async fn init_with_callback( context: E, cfg: VariableConfig as Read>::Cfg>, known_inactivity_floor: Option, callback: impl FnMut(bool, Option), ) -> Result { init_variable(context, cfg, known_inactivity_floor, callback, |ctx, t| { Index::new(ctx, t) }) .await } } /// Partitioned index variants that divide the key space into `2^(P*8)` partitions. /// /// See [partitioned::Db] for the generic type, or use the convenience aliases: /// - [partitioned::p256::Db] for 256 partitions (P=1) /// - [partitioned::p64k::Db] for 65,536 partitions (P=2) pub mod partitioned { pub use super::{Operation, Update}; use crate::{ index::partitioned::unordered::Index, journal::contiguous::variable::Journal, mmr::Location, qmdb::{ any::{init_variable, VariableConfig, VariableValue}, operation::Key, Error, }, translator::Translator, }; use commonware_codec::{Codec, Read}; use commonware_cryptography::Hasher; use commonware_runtime::{Clock, Metrics, Storage}; /// A key-value QMDB with a partitioned snapshot index and variable-size values. /// /// This is the partitioned variant of [super::Db]. The const generic `P` specifies /// the number of prefix bytes used for partitioning: /// - `P = 1`: 256 partitions /// - `P = 2`: 65,536 partitions /// /// Use partitioned indices when you have a large number of keys (>> 2^(P*8)) and memory /// efficiency is important. Keys should be uniformly distributed across the prefix space. pub type Db = crate::qmdb::any::unordered::Db< E, Journal>, Index, H, Update, >; impl< E: Storage + Clock + Metrics, K: Key, V: VariableValue, H: Hasher, T: Translator, const P: usize, > Db where Operation: Codec, { /// Returns a [Db] QMDB initialized from `cfg`. Uncommitted log operations will be /// discarded and the state of the db will be as of the last committed operation. pub async fn init( context: E, cfg: VariableConfig as Read>::Cfg>, ) -> Result { Self::init_with_callback(context, cfg, None, |_, _| {}).await } /// Initialize the DB, invoking `callback` for each operation processed during recovery. /// /// If `known_inactivity_floor` is provided and is less than the log's actual inactivity floor, /// `callback` is invoked with `(false, None)` for each location in the gap. Then, as the /// snapshot is built from the log, `callback` is invoked for each operation with its activity /// status and previous location (if any). pub(crate) async fn init_with_callback( context: E, cfg: VariableConfig as Read>::Cfg>, known_inactivity_floor: Option, callback: impl FnMut(bool, Option), ) -> Result { init_variable(context, cfg, known_inactivity_floor, callback, |ctx, t| { Index::new(ctx, t) }) .await } } /// Convenience type aliases for 256 partitions (P=1). pub mod p256 { /// Variable-value DB with 256 partitions. pub type Db = super::Db; } /// Convenience type aliases for 65,536 partitions (P=2). pub mod p64k { /// Variable-value DB with 65,536 partitions. pub type Db = super::Db; } } #[cfg(test)] pub(crate) mod test { use super::*; use crate::{ index::Unordered as _, kv::tests::{assert_gettable, assert_send}, qmdb::store::{ tests::{assert_log_store, assert_merkleized_store, assert_prunable_store}, LogStore, }, translator::TwoCap, }; use commonware_cryptography::{sha256::Digest, Hasher, Sha256}; use commonware_macros::test_traced; use commonware_math::algebra::Random; use commonware_runtime::{ buffer::paged::CacheRef, deterministic::{self, Context}, BufferPooler, Runner as _, }; use commonware_utils::{test_rng_seeded, NZUsize, NZU16, NZU64}; use rand::RngCore; use std::num::{NonZeroU16, NonZeroUsize}; const PAGE_SIZE: NonZeroU16 = NZU16!(77); const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9); pub(crate) fn create_test_config(seed: u64, pooler: &impl BufferPooler) -> VarConfig { VariableConfig { mmr_journal_partition: format!("journal-{seed}"), mmr_metadata_partition: format!("metadata-{seed}"), mmr_items_per_blob: NZU64!(13), mmr_write_buffer: NZUsize!(1024), log_partition: format!("log-journal-{seed}"), log_items_per_blob: NZU64!(7), log_write_buffer: NZUsize!(1024), log_compression: None, log_codec_config: ((), ((0..=10000).into(), ())), translator: TwoCap, thread_pool: None, page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE), } } pub(crate) type VarConfig = VariableConfig, ()))>; /// A type alias for the concrete [Db] type used in these unit tests. pub(crate) type AnyTest = Db, Sha256, TwoCap>; /// Create a test database with unique partition names pub(crate) async fn create_test_db(mut context: Context) -> AnyTest { let seed = context.next_u64(); let config = create_test_config(seed, &context); AnyTest::init(context, config).await.unwrap() } /// Deterministic byte vector generator for variable-value tests. fn to_bytes(i: u64) -> Vec { let len = ((i % 13) + 7) as usize; vec![(i % 255) as u8; len] } /// Create n random operations using the default seed (0). Some portion of /// the updates are deletes. create_test_ops(n) is a prefix of /// create_test_ops(n') for n < n'. pub(crate) fn create_test_ops( n: usize, ) -> Vec>>> { create_test_ops_seeded(n, 0) } /// Create n random operations using a specific seed. Use different seeds /// when you need non-overlapping keys in the same test. pub(crate) fn create_test_ops_seeded( n: usize, seed: u64, ) -> Vec>>> { let mut rng = test_rng_seeded(seed); let mut prev_key = Digest::random(&mut rng); let mut ops = Vec::new(); for i in 0..n { let key = Digest::random(&mut rng); if i % 10 == 0 && i > 0 { ops.push(unordered::Operation::Delete(prev_key)); } else { let value = to_bytes(rng.next_u64()); ops.push(unordered::Operation::Update(unordered::Update(key, value))); prev_key = key; } } ops } /// Applies the given operations to the database. pub(crate) async fn apply_ops( db: &mut AnyTest, ops: Vec>>>, ) { let finalized = { let mut batch = db.new_batch(); for op in ops { match op { unordered::Operation::Update(unordered::Update(key, value)) => { batch.write(key, Some(value)); } unordered::Operation::Delete(key) => { batch.write(key, None); } unordered::Operation::CommitFloor(_, _) => { panic!("CommitFloor not supported in apply_ops"); } } } batch.merkleize(None).await.unwrap().finalize() }; db.apply_batch(finalized).await.unwrap(); } /// Return an `Any` database initialized with a fixed config. async fn open_db(context: deterministic::Context) -> AnyTest { let cfg = create_test_config(0, &context); AnyTest::init(context, cfg).await.unwrap() } #[test_traced("WARN")] pub fn test_any_variable_db_build_and_authenticate() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let db = open_db(context.clone()).await; crate::qmdb::any::test::test_any_db_build_and_authenticate( context, db, |ctx| Box::pin(open_db(ctx)), to_bytes, ) .await; }); } #[test_traced("WARN")] pub fn test_any_variable_db_recovery() { let executor = deterministic::Runner::default(); // Build a db with 1000 keys, some of which we update and some of which we delete. const ELEMENTS: u64 = 1000; executor.start(|context| async move { let db = open_db(context.with_label("open1")).await; let root = db.root(); // Build a batch but don't apply it (simulate failure before commit). { let mut batch = db.new_batch(); for i in 0..ELEMENTS { batch.write( Sha256::hash(&i.to_be_bytes()), Some(vec![(i % 255) as u8; ((i % 13) + 7) as usize]), ); } let _ = batch.merkleize(None).await.unwrap().finalize(); } // Simulate a failure and test that we rollback to the previous root. drop(db); let mut db = open_db(context.with_label("open2")).await; assert_eq!(root, db.root()); // Re-apply the updates and commit them this time. let finalized = { let mut batch = db.new_batch(); for i in 0u64..ELEMENTS { let k = Sha256::hash(&i.to_be_bytes()); let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize]; batch.write(k, Some(v)); } batch.merkleize(None).await.unwrap().finalize() }; db.apply_batch(finalized).await.unwrap(); let root = db.root(); // Update every 3rd key but don't apply (simulate failure). { let mut batch = db.new_batch(); for i in 0u64..ELEMENTS { if i % 3 != 0 { continue; } let k = Sha256::hash(&i.to_be_bytes()); let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize]; batch.write(k, Some(v)); } let _ = batch.merkleize(None).await.unwrap().finalize(); } // Simulate a failure and test that we rollback to the previous root. drop(db); let mut db = open_db(context.with_label("open3")).await; assert_eq!(root, db.root()); // Re-apply updates for every 3rd key and commit them this time. let finalized = { let mut batch = db.new_batch(); for i in 0u64..ELEMENTS { if i % 3 != 0 { continue; } let k = Sha256::hash(&i.to_be_bytes()); let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize]; batch.write(k, Some(v)); } batch.merkleize(None).await.unwrap().finalize() }; db.apply_batch(finalized).await.unwrap(); let root = db.root(); // Delete every 7th key but don't apply (simulate failure). { let mut batch = db.new_batch(); for i in 0u64..ELEMENTS { if i % 7 != 1 { continue; } let k = Sha256::hash(&i.to_be_bytes()); batch.write(k, None); } let _ = batch.merkleize(None).await.unwrap().finalize(); } // Simulate a failure and test that we rollback to the previous root. drop(db); let mut db = open_db(context.with_label("open4")).await; assert_eq!(root, db.root()); // Re-delete every 7th key and commit this time. let finalized = { let mut batch = db.new_batch(); for i in 0u64..ELEMENTS { if i % 7 != 1 { continue; } let k = Sha256::hash(&i.to_be_bytes()); batch.write(k, None); } batch.merkleize(None).await.unwrap().finalize() }; db.apply_batch(finalized).await.unwrap(); let root = db.root(); let inactivity_floor = db.inactivity_floor_loc(); db.sync().await.unwrap(); // test pruning boundary after sync w/ prune db.prune(inactivity_floor).await.unwrap(); let bounds = db.bounds().await; let snapshot_items = db.snapshot.items(); db.sync().await.unwrap(); drop(db); // Confirm state is preserved after reopen. let db = open_db(context.with_label("open5")).await; assert_eq!(root, db.root()); assert_eq!(db.bounds().await, bounds); assert_eq!(db.inactivity_floor_loc(), inactivity_floor); assert_eq!(db.snapshot.items(), snapshot_items); db.destroy().await.unwrap(); }); } #[test_traced] fn test_any_variable_db_prune_beyond_inactivity_floor() { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let mut db = open_db(context.clone()).await; // Add some operations let key1 = Digest::random(&mut context); let key2 = Digest::random(&mut context); let key3 = Digest::random(&mut context); let finalized = { let mut batch = db.new_batch(); batch.write(key1, Some(vec![10])); batch.write(key2, Some(vec![20])); batch.write(key3, Some(vec![30])); batch.merkleize(None).await.unwrap().finalize() }; db.apply_batch(finalized).await.unwrap(); // inactivity_floor should be at some location < op_count let inactivity_floor = db.inactivity_floor_loc(); let beyond_floor = Location::new(*inactivity_floor + 1); // Try to prune beyond the inactivity floor let result = db.prune(beyond_floor).await; assert!( matches!(result, Err(Error::PruneBeyondMinRequired(loc, floor)) if loc == beyond_floor && floor == inactivity_floor) ); db.destroy().await.unwrap(); }); } #[test_traced] fn test_stale_changeset_rejected() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let mut db = open_db(context.clone()).await; let key1 = Sha256::hash(&[1]); let key2 = Sha256::hash(&[2]); // Create two batches from the same DB state. let changeset_a = { let mut batch = db.new_batch(); batch.write(key1, Some(vec![10])); batch.merkleize(None).await.unwrap().finalize() }; let changeset_b = { let mut batch = db.new_batch(); batch.write(key2, Some(vec![20])); batch.merkleize(None).await.unwrap().finalize() }; // Apply the first -- should succeed. db.apply_batch(changeset_a).await.unwrap(); let expected_root = db.root(); let expected_bounds = db.bounds().await; assert_eq!(db.get(&key1).await.unwrap(), Some(vec![10])); assert_eq!(db.get(&key2).await.unwrap(), None); // Apply the second -- should fail because the DB was modified. let result = db.apply_batch(changeset_b).await; assert!( matches!(result, Err(Error::StaleChangeset { .. })), "expected StaleChangeset error, got {result:?}" ); assert_eq!(db.root(), expected_root); assert_eq!(db.bounds().await, expected_bounds); assert_eq!(db.get(&key1).await.unwrap(), Some(vec![10])); assert_eq!(db.get(&key2).await.unwrap(), None); db.destroy().await.unwrap(); }); } #[test_traced] fn test_stale_changeset_chained() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let mut db = open_db(context.clone()).await; let key1 = Sha256::hash(&[1]); let key2 = Sha256::hash(&[2]); let key3 = Sha256::hash(&[3]); // Commit initial state. let finalized = { let mut batch = db.new_batch(); batch.write(key1, Some(vec![10])); batch.merkleize(None).await.unwrap().finalize() }; db.apply_batch(finalized).await.unwrap(); // Create a parent batch, then fork two children. let parent = { let mut batch = db.new_batch(); batch.write(key2, Some(vec![20])); batch.merkleize(None).await.unwrap() }; let child_a = { let mut batch = parent.new_batch(); batch.write(key3, Some(vec![30])); batch.merkleize(None).await.unwrap().finalize() }; let child_b = { let mut batch = parent.new_batch(); batch.write(key3, Some(vec![40])); batch.merkleize(None).await.unwrap().finalize() }; // Apply child_a, then child_b should be stale. db.apply_batch(child_a).await.unwrap(); let result = db.apply_batch(child_b).await; assert!( matches!(result, Err(Error::StaleChangeset { .. })), "expected StaleChangeset error for sibling, got {result:?}" ); db.destroy().await.unwrap(); }); } #[test_traced] fn test_stale_changeset_parent_applied_before_child() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let mut db = open_db(context.clone()).await; let key1 = Sha256::hash(&[1]); let key2 = Sha256::hash(&[2]); // Create parent, then child. let parent = { let mut batch = db.new_batch(); batch.write(key1, Some(vec![10])); batch.merkleize(None).await.unwrap() }; let child = { let mut batch = parent.new_batch(); batch.write(key2, Some(vec![20])); batch.merkleize(None).await.unwrap().finalize() }; let parent = parent.finalize(); // Apply parent first -- child should now be stale. db.apply_batch(parent).await.unwrap(); let result = db.apply_batch(child).await; assert!( matches!(result, Err(Error::StaleChangeset { .. })), "expected StaleChangeset for child after parent applied, got {result:?}" ); db.destroy().await.unwrap(); }); } #[test_traced] fn test_stale_changeset_child_applied_before_parent() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let mut db = open_db(context.clone()).await; let key1 = Sha256::hash(&[1]); let key2 = Sha256::hash(&[2]); // Create parent, then child. let parent = { let mut batch = db.new_batch(); batch.write(key1, Some(vec![10])); batch.merkleize(None).await.unwrap() }; let child = { let mut batch = parent.new_batch(); batch.write(key2, Some(vec![20])); batch.merkleize(None).await.unwrap().finalize() }; let parent = parent.finalize(); // Apply child first -- parent should now be stale. db.apply_batch(child).await.unwrap(); let result = db.apply_batch(parent).await; assert!( matches!(result, Err(Error::StaleChangeset { .. })), "expected StaleChangeset for parent after child applied, got {result:?}" ); db.destroy().await.unwrap(); }); } // FromSyncTestable implementation for from_sync_result tests mod from_sync_testable { use super::*; use crate::{ mmr::{iterator::nodes_to_pin, journaled::Mmr, Position}, qmdb::any::sync::tests::FromSyncTestable, }; use futures::future::join_all; type TestMmr = Mmr; impl FromSyncTestable for AnyTest { type Mmr = TestMmr; fn into_log_components(self) -> (Self::Mmr, Self::Journal) { (self.log.mmr, self.log.journal) } async fn pinned_nodes_at(&self, pos: Position) -> Vec { join_all(nodes_to_pin(pos).map(|p| self.log.mmr.get_node(p))) .await .into_iter() .map(|n| n.unwrap().unwrap()) .collect() } fn pinned_nodes_from_map(&self, pos: Position) -> Vec { let map = self.log.mmr.get_pinned_nodes(); nodes_to_pin(pos).map(|p| *map.get(&p).unwrap()).collect() } } } /// Regression test for https://github.com/commonwarexyz/monorepo/issues/2787 #[allow(dead_code, clippy::manual_async_fn)] fn issue_2787_regression( db: &crate::qmdb::immutable::Immutable< deterministic::Context, Digest, Vec, Sha256, TwoCap, >, key: Digest, ) -> impl std::future::Future + Send + use<'_> { async move { let _ = db.get(&key).await; } } #[allow(dead_code)] fn assert_read_futures_are_send(db: &mut AnyTest, key: Digest, loc: Location) { assert_gettable(db, &key); assert_log_store(db); assert_prunable_store(db, loc); assert_merkleized_store(db, loc); assert_send(db.sync()); } #[allow(dead_code)] fn assert_batch_futures_are_send(db: &AnyTest, key: Digest, value: Vec) { assert_gettable(db, &key); assert_log_store(db); let mut batch = db.new_batch(); batch.write(key, Some(value)); assert_send(batch.merkleize(None)); assert_send(db.get_with_loc(&key)); } }