//! Journaled [`ManagedDb`] implementation for QMDB //! [`keyless`](commonware_storage::qmdb::keyless) databases. //! //! Keyless databases are append-only. Operations are addressed by //! [`Location`] rather than by key. //! The wrapper types here capture `Arc>` so the batch API //! can read through to committed state. use crate::stateful::db::{ ManagedDb, Merkleized as MerkleizedTrait, StateSyncDb, SyncEngineConfig, Unmerkleized as UnmerkleizedTrait, }; use commonware_codec::{EncodeShared, Read as CodecRead}; use commonware_cryptography::Hasher; use commonware_parallel::Strategy; use commonware_runtime::{Clock, Metrics, Storage}; use commonware_storage::{ journal::{ contiguous::{ fixed::Journal as FixedJournal, variable::Journal as VariableJournal, Mutable, }, Error as JournalError, }, merkle::{Family, Location}, qmdb::{ any::value::{FixedEncoding, FixedValue, ValueEncoding, VariableEncoding, VariableValue}, keyless::{ batch::{MerkleizedBatch, UnmerkleizedBatch}, fixed, variable, Keyless, Operation, }, sync::{self, resolver::Resolver, Target as AnySyncTarget}, Error, }, Persistable, }; use commonware_utils::{channel::mpsc, non_empty_range, sync::AsyncRwLock}; use std::{ops::Deref, sync::Arc}; type KeylessDbHandle = Arc>>; /// Wraps a keyless [`UnmerkleizedBatch`] with a reference to the parent /// database, implementing the [`Unmerkleized`](crate::stateful::db::Unmerkleized) trait. pub struct KeylessUnmerkleized where F: Family, E: Storage + Clock + Metrics, V: ValueEncoding, C: Mutable> + Persistable, H: Hasher, S: Strategy, Operation: EncodeShared, { batch: UnmerkleizedBatch, db: KeylessDbHandle, metadata: Option, inactivity_floor: Option>, } impl Deref for KeylessUnmerkleized where F: Family, E: Storage + Clock + Metrics, V: ValueEncoding, C: Mutable> + Persistable, H: Hasher, S: Strategy, Operation: EncodeShared, { type Target = UnmerkleizedBatch; fn deref(&self) -> &Self::Target { &self.batch } } impl KeylessUnmerkleized where F: Family, E: Storage + Clock + Metrics, V: ValueEncoding, C: Mutable> + Persistable, H: Hasher, S: Strategy, Operation: EncodeShared, { /// Set commit metadata included in the next /// [`merkleize`](UnmerkleizedTrait::merkleize) call. pub fn with_metadata(mut self, metadata: V::Value) -> Self { self.metadata = Some(metadata); self } /// Set the inactivity floor to include within the next [`merkleize`](UnmerkleizedTrait::merkleize) call. /// /// If unset, [`merkleize`](UnmerkleizedTrait::merkleize) will use the [`Default`] of [`Location`]. pub const fn with_inactivity_floor(mut self, floor: Location) -> Self { self.inactivity_floor = Some(floor); self } /// Read a value by location, falling back to committed state. pub async fn get(&self, location: Location) -> Result, Error> { let db = self.db.read().await; self.batch.get(location, &*db).await } /// Read multiple values by location, falling back to committed state. /// /// Locations must be sorted in ascending order. Returns results in the same /// order as the input locations. pub async fn get_many( &self, locations: &[Location], ) -> Result>, Error> { let db = self.db.read().await; self.batch.get_many(locations, &*db).await } /// Append a value to the speculative batch. pub fn append(mut self, value: V::Value) -> Self { self.batch = self.batch.append(value); self } } /// Wraps a keyless [`MerkleizedBatch`] with a reference to the parent /// database, implementing the [`Merkleized`](crate::stateful::db::Merkleized) trait. pub struct KeylessMerkleized where F: Family, E: Storage + Clock + Metrics, V: ValueEncoding, C: Mutable> + Persistable, H: Hasher, S: Strategy, Operation: EncodeShared, { inner: Arc>, db: KeylessDbHandle, } impl Deref for KeylessMerkleized where F: Family, E: Storage + Clock + Metrics, V: ValueEncoding, C: Mutable> + Persistable, H: Hasher, S: Strategy, Operation: EncodeShared, { type Target = MerkleizedBatch; fn deref(&self) -> &Self::Target { &self.inner } } impl KeylessMerkleized where F: Family, E: Storage + Clock + Metrics, V: ValueEncoding, C: Mutable> + Persistable, H: Hasher, S: Strategy, Operation: EncodeShared, { /// Read a value by location, falling back to committed state. pub async fn get(&self, location: Location) -> Result, Error> { let db = self.db.read().await; self.inner.get(location, &*db).await } /// Read multiple values by location, falling back to committed state. /// /// Locations must be sorted in ascending order. Returns results in the same /// order as the input locations. pub async fn get_many( &self, locations: &[Location], ) -> Result>, Error> { let db = self.db.read().await; self.inner.get_many(locations, &*db).await } } impl UnmerkleizedTrait for KeylessUnmerkleized where F: Family, E: Storage + Clock + Metrics, V: ValueEncoding, C: Mutable> + Persistable, H: Hasher, S: Strategy, Operation: EncodeShared, { type Merkleized = KeylessMerkleized; type Error = Error; async fn merkleize(self) -> Result> { let db = self.db.read().await; let merkleized = self.batch.merkleize( &*db, self.metadata, self.inactivity_floor.unwrap_or_default(), ); Ok(KeylessMerkleized { inner: merkleized, db: self.db.clone(), }) } } impl MerkleizedTrait for KeylessMerkleized where F: Family, E: Storage + Clock + Metrics, V: ValueEncoding, C: Mutable> + Persistable, H: Hasher, S: Strategy, Operation: EncodeShared, { type Digest = H::Digest; type Unmerkleized = KeylessUnmerkleized; fn root(&self) -> H::Digest { self.inner.root() } fn new_batch(&self) -> Self::Unmerkleized { KeylessUnmerkleized { batch: self.inner.new_batch::(), db: self.db.clone(), metadata: None, inactivity_floor: None, } } } impl ManagedDb for fixed::Db where F: Family, E: Storage + Clock + Metrics, V: FixedValue + 'static, H: Hasher + 'static, S: Strategy, { type Unmerkleized = KeylessUnmerkleized, FixedJournal>, H, S>; type Merkleized = KeylessMerkleized, FixedJournal>, H, S>; type Error = Error; type Config = fixed::Config; type SyncTarget = AnySyncTarget; async fn init(context: E, config: Self::Config) -> Result> { ::init(context, config).await } async fn new_batch(db: &Arc>) -> Self::Unmerkleized { let inner = db.read().await; KeylessUnmerkleized { batch: inner.new_batch(), db: db.clone(), metadata: None, inactivity_floor: None, } } fn matches_sync_target(batch: &Self::Merkleized, target: &Self::SyncTarget) -> bool { batch.root() == target.root && *target.range.start() == batch.bounds().inactivity_floor && *target.range.end() == Location::::new(batch.bounds().total_size) } async fn finalize(&mut self, batch: Self::Merkleized) -> Result<(), Error> { self.apply_batch(batch.inner).await?; self.commit().await?; Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { let bounds = self.bounds().await; AnySyncTarget::new( self.root(), non_empty_range!(self.sync_boundary(), bounds.end), ) } async fn rewind_to_target(&mut self, target: Self::SyncTarget) -> Result<(), Error> { self.rewind(target.range.end()).await?; self.commit().await?; let rewound_target = self.sync_target().await; assert_eq!( rewound_target, target, "rewound database target mismatch after rewind", ); Ok(()) } } impl ManagedDb for variable::Db where F: Family, E: Storage + Clock + Metrics, V: VariableValue + 'static, H: Hasher + 'static, S: Strategy, { type Unmerkleized = KeylessUnmerkleized< F, E, VariableEncoding, VariableJournal>, H, S, >; type Merkleized = KeylessMerkleized< F, E, VariableEncoding, VariableJournal>, H, S, >; type Error = Error; type Config = variable::Config< as CodecRead>::Cfg, S>; type SyncTarget = AnySyncTarget; async fn init(context: E, config: Self::Config) -> Result> { ::init(context, config).await } async fn new_batch(db: &Arc>) -> Self::Unmerkleized { let inner = db.read().await; KeylessUnmerkleized { batch: inner.new_batch(), db: db.clone(), metadata: None, inactivity_floor: None, } } fn matches_sync_target(batch: &Self::Merkleized, target: &Self::SyncTarget) -> bool { batch.root() == target.root && *target.range.start() == batch.bounds().inactivity_floor && *target.range.end() == Location::::new(batch.bounds().total_size) } async fn finalize(&mut self, batch: Self::Merkleized) -> Result<(), Error> { self.apply_batch(batch.inner).await?; self.commit().await?; Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { let bounds = self.bounds().await; AnySyncTarget::new( self.root(), non_empty_range!(self.sync_boundary(), bounds.end), ) } async fn rewind_to_target(&mut self, target: Self::SyncTarget) -> Result<(), Error> { self.rewind(target.range.end()).await?; self.commit().await?; let rewound_target = self.sync_target().await; assert_eq!( rewound_target, target, "rewound database target mismatch after rewind", ); Ok(()) } } impl StateSyncDb for fixed::Db where F: Family, E: Storage + Clock + Metrics, V: FixedValue + 'static, H: Hasher + 'static, S: Strategy, R: Resolver, Digest = H::Digest>, { type SyncError = sync::Error; async fn sync_db( context: E, config: Self::Config, resolver: R, target: Self::SyncTarget, tip_updates: mpsc::Receiver, finish: Option>, reached_target: Option>, sync_config: SyncEngineConfig, ) -> Result { sync::sync(sync::engine::Config { context, resolver, target, max_outstanding_requests: sync_config.max_outstanding_requests, fetch_batch_size: sync_config.fetch_batch_size, apply_batch_size: sync_config.apply_batch_size, db_config: config, update_rx: Some(tip_updates), finish_rx: finish, reached_target_tx: reached_target, max_retained_roots: sync_config.max_retained_roots, }) .await } } impl StateSyncDb for variable::Db where F: Family, E: Storage + Clock + Metrics, V: VariableValue + 'static, H: Hasher + 'static, S: Strategy, R: Resolver, Digest = H::Digest>, { type SyncError = sync::Error; async fn sync_db( context: E, config: Self::Config, resolver: R, target: Self::SyncTarget, tip_updates: mpsc::Receiver, finish: Option>, reached_target: Option>, sync_config: SyncEngineConfig, ) -> Result { sync::sync(sync::engine::Config { context, resolver, target, max_outstanding_requests: sync_config.max_outstanding_requests, fetch_batch_size: sync_config.fetch_batch_size, apply_batch_size: sync_config.apply_batch_size, db_config: config, update_rx: Some(tip_updates), finish_rx: finish, reached_target_tx: reached_target, max_retained_roots: sync_config.max_retained_roots, }) .await } } #[cfg(test)] mod tests { use super::*; use commonware_cryptography::Sha256; use commonware_parallel::Sequential; use commonware_runtime::{ buffer::paged::CacheRef, deterministic, BufferPooler, Runner as _, Supervisor as _, }; use commonware_storage::{ journal::contiguous::fixed::Config as FixedJournalConfig, merkle::full::Config as MerkleConfig, mmr, qmdb::keyless as storage_keyless, }; use commonware_utils::{non_empty_range, sequence::U64, NZUsize, NZU16, NZU64}; use std::num::{NonZeroU16, NonZeroUsize}; type FixedDb = fixed::Db; type VariableDb = variable::Db, Sha256, Sequential>; const PAGE_SIZE: NonZeroU16 = NZU16!(101); const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11); fn fixed_config(suffix: &str, pooler: &impl BufferPooler) -> fixed::Config { let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE); storage_keyless::Config { merkle: MerkleConfig { journal_partition: format!("journal-{suffix}"), metadata_partition: format!("metadata-{suffix}"), items_per_blob: NZU64!(11), write_buffer: NZUsize!(1024), strategy: Sequential, page_cache: page_cache.clone(), }, log: FixedJournalConfig { partition: format!("log-{suffix}"), items_per_blob: NZU64!(7), page_cache, write_buffer: NZUsize!(1024), }, } } fn assert_managed_db>() {} fn assert_state_sync_db() where T: StateSyncDb, { } #[test] fn keyless_trait_impls_compile() { assert_managed_db::(); assert_managed_db::(); assert_state_sync_db::>(); assert_state_sync_db::>(); } #[test] fn managed_db_finalize_commits_fixed_keyless_batches() { deterministic::Runner::default().start(|context| async move { let config = fixed_config("stateful-keyless-managed-db", &context); let db = FixedDb::init(context.child("db"), config).await.unwrap(); let db = Arc::new(AsyncRwLock::new(db)); let batch = >::new_batch(&db) .await .append(U64::new(7)) .with_inactivity_floor(mmr::Location::new(1)) .with_metadata(U64::new(9)); let merkleized = crate::stateful::db::Unmerkleized::merkleize(batch) .await .unwrap(); { let mut guard = db.write().await; >::finalize(&mut *guard, merkleized) .await .unwrap(); } let guard = db.read().await; assert_eq!( guard.get(mmr::Location::new(1)).await.unwrap(), Some(U64::new(7)) ); assert_eq!(guard.get_metadata().await.unwrap(), Some(U64::new(9))); let target = >::sync_target(&*guard).await; assert_eq!(target.root, guard.root()); assert_eq!(target.range.start(), mmr::Location::new(1)); assert_eq!(target.range.end(), mmr::Location::new(3)); }); } #[test] fn managed_db_matches_sync_target_rejects_wrong_replay_range() { deterministic::Runner::default().start(|context| async move { let config = fixed_config("stateful-keyless-matches-sync-target", &context); let db = FixedDb::init(context.child("db"), config).await.unwrap(); let db = Arc::new(AsyncRwLock::new(db)); let batch = >::new_batch(&db) .await .append(U64::new(7)) .with_inactivity_floor(mmr::Location::new(1)) .with_metadata(U64::new(9)); let merkleized = crate::stateful::db::Unmerkleized::merkleize(batch) .await .unwrap(); let valid_target = AnySyncTarget::new( merkleized.root(), non_empty_range!( merkleized.bounds().inactivity_floor, mmr::Location::new(merkleized.bounds().total_size) ), ); assert!(>::matches_sync_target( &merkleized, &valid_target, )); let wrong_start = AnySyncTarget::new( merkleized.root(), non_empty_range!( mmr::Location::new(0), mmr::Location::new(merkleized.bounds().total_size) ), ); assert!(!>::matches_sync_target( &merkleized, &wrong_start, )); let wrong_end = AnySyncTarget::new( merkleized.root(), non_empty_range!( merkleized.bounds().inactivity_floor, mmr::Location::new(merkleized.bounds().total_size - 1) ), ); assert!(!>::matches_sync_target( &merkleized, &wrong_end, )); }); } }