//! Compact [`ManagedDb`] implementation for QMDB //! [`immutable`](commonware_storage::qmdb::immutable) databases. //! //! These compact databases retain only the current Merkle peaks, so the glue //! adapters expose set and merkleization operations but no historical reads. use crate::stateful::db::{ ManagedDb, Merkleized as MerkleizedTrait, StateSyncDb, SyncEngineConfig, Unmerkleized as UnmerkleizedTrait, MAX_CHANNEL_DRAIN_PER_TICK, }; use commonware_codec::{EncodeShared, Read as CodecRead}; use commonware_cryptography::Hasher; use commonware_macros::select; use commonware_parallel::Strategy; use commonware_runtime::{reschedule, Clock, Metrics, Storage}; use commonware_storage::{ merkle::{Family, Location}, qmdb::{ any::value::{FixedEncoding, FixedValue, ValueEncoding, VariableEncoding, VariableValue}, immutable::{ fixed, variable, CompactDb, CompactMerkleizedBatch, CompactUnmerkleizedBatch, Operation, }, operation::Key, sync::{self}, Error, }, }; use commonware_utils::{channel::mpsc, sync::AsyncRwLock, Array}; use futures::future::{pending, Either}; use std::{ops::Deref, sync::Arc}; type ImmutableUnjournaledDbHandle = Arc>>; async fn drain_latest_target(tip_updates: &mut mpsc::Receiver) -> Option { let mut latest = None; let mut drained = 0usize; loop { match tip_updates.try_recv() { Ok(update) => { latest = Some(update); drained += 1; if drained.is_multiple_of(MAX_CHANNEL_DRAIN_PER_TICK) { reschedule().await; } } Err(mpsc::error::TryRecvError::Empty | mpsc::error::TryRecvError::Disconnected) => { return latest; } } } } /// Wraps an unjournaled immutable batch before merkleization. pub struct ImmutableUnjournaledUnmerkleized where F: Family, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, H: Hasher, Operation: EncodeShared, Operation: CodecRead, C: Clone + Send + Sync + 'static, S: Strategy, { batch: CompactUnmerkleizedBatch, db: ImmutableUnjournaledDbHandle, metadata: Option, inactivity_floor: Option>, } impl Deref for ImmutableUnjournaledUnmerkleized where F: Family, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, H: Hasher, Operation: EncodeShared, Operation: CodecRead, C: Clone + Send + Sync + 'static, S: Strategy, { type Target = CompactUnmerkleizedBatch; fn deref(&self) -> &Self::Target { &self.batch } } impl ImmutableUnjournaledUnmerkleized where F: Family, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, H: Hasher, Operation: EncodeShared, Operation: CodecRead, C: Clone + Send + Sync + 'static, S: Strategy, { /// Set commit metadata included in the next merkleization. pub fn with_metadata(mut self, metadata: V::Value) -> Self { self.metadata = Some(metadata); self } /// Set the inactivity floor included in the next merkleization. pub const fn with_inactivity_floor(mut self, floor: Location) -> Self { self.inactivity_floor = Some(floor); self } /// Set `key` to `value` in the speculative batch. pub fn set(mut self, key: K, value: V::Value) -> Self { self.batch = self.batch.set(key, value); self } } /// Wraps an unjournaled immutable batch after merkleization. pub struct ImmutableUnjournaledMerkleized where F: Family, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, H: Hasher, Operation: EncodeShared, Operation: CodecRead, C: Clone + Send + Sync + 'static, S: Strategy, { inner: Arc>, db: ImmutableUnjournaledDbHandle, } impl Deref for ImmutableUnjournaledMerkleized where F: Family, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, H: Hasher, Operation: EncodeShared, Operation: CodecRead, C: Clone + Send + Sync + 'static, S: Strategy, { type Target = CompactMerkleizedBatch; fn deref(&self) -> &Self::Target { &self.inner } } impl UnmerkleizedTrait for ImmutableUnjournaledUnmerkleized where F: Family, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, H: Hasher, Operation: EncodeShared, Operation: CodecRead, C: Clone + Send + Sync + 'static, S: Strategy, { type Merkleized = ImmutableUnjournaledMerkleized; type Error = Error; async fn merkleize(self) -> Result> { let db = self.db.read().await; let merkleized = self.batch.merkleize( &*db, self.metadata, self.inactivity_floor.unwrap_or_default(), ); Ok(ImmutableUnjournaledMerkleized { inner: merkleized, db: self.db.clone(), }) } } impl MerkleizedTrait for ImmutableUnjournaledMerkleized where F: Family, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, H: Hasher, Operation: EncodeShared, Operation: CodecRead, C: Clone + Send + Sync + 'static, S: Strategy, { type Digest = H::Digest; type Unmerkleized = ImmutableUnjournaledUnmerkleized; fn root(&self) -> H::Digest { self.inner.root() } fn new_batch(&self) -> Self::Unmerkleized { ImmutableUnjournaledUnmerkleized { batch: self.inner.new_batch::(), db: self.db.clone(), metadata: None, inactivity_floor: None, } } } impl ManagedDb for fixed::CompactDb where F: Family, E: Storage + Clock + Metrics, K: Array, V: FixedValue + 'static, H: Hasher + 'static, S: Strategy, Operation>: EncodeShared + CodecRead, { type Unmerkleized = ImmutableUnjournaledUnmerkleized, H, S, ()>; type Merkleized = ImmutableUnjournaledMerkleized, H, S, ()>; type Error = Error; type Config = fixed::CompactConfig; type SyncTarget = sync::compact::Target; async fn init(context: E, config: Self::Config) -> Result> { ::init(context, config).await } async fn new_batch(db: &Arc>) -> Self::Unmerkleized { let inner = db.read().await; ImmutableUnjournaledUnmerkleized { batch: inner.new_batch(), db: db.clone(), metadata: None, inactivity_floor: None, } } fn matches_sync_target(batch: &Self::Merkleized, target: &Self::SyncTarget) -> bool { batch.root() == target.root && target.leaf_count == Location::new(batch.bounds().total_size) } async fn finalize(&mut self, batch: Self::Merkleized) -> Result<(), Error> { self.apply_batch(batch.inner)?; self.sync().await?; Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { self.current_target() } async fn rewind_to_target(&mut self, target: Self::SyncTarget) -> Result<(), Error> { // Compact storage only retains the previous logical commit range. self.rewind().await?; let rewound_target = self.sync_target().await; assert_eq!( rewound_target, target, "rewound database target mismatch after one-step rewind", ); Ok(()) } fn max_rewind_depth() -> Option { Some(1) } } impl ManagedDb for variable::CompactDb where F: Family, E: Storage + Clock + Metrics, K: Key, V: VariableValue + 'static, H: Hasher + 'static, Operation>: EncodeShared + CodecRead, C: Clone + Send + Sync + 'static, S: Strategy, { type Unmerkleized = ImmutableUnjournaledUnmerkleized, H, S, C>; type Merkleized = ImmutableUnjournaledMerkleized, H, S, C>; type Error = Error; type Config = variable::CompactConfig; type SyncTarget = sync::compact::Target; async fn init(context: E, config: Self::Config) -> Result> { ::init(context, config).await } async fn new_batch(db: &Arc>) -> Self::Unmerkleized { let inner = db.read().await; ImmutableUnjournaledUnmerkleized { batch: inner.new_batch(), db: db.clone(), metadata: None, inactivity_floor: None, } } fn matches_sync_target(batch: &Self::Merkleized, target: &Self::SyncTarget) -> bool { batch.root() == target.root && target.leaf_count == Location::new(batch.bounds().total_size) } async fn finalize(&mut self, batch: Self::Merkleized) -> Result<(), Error> { self.apply_batch(batch.inner)?; self.sync().await?; Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { self.current_target() } async fn rewind_to_target(&mut self, target: Self::SyncTarget) -> Result<(), Error> { // Compact storage only retains the previous logical commit range. self.rewind().await?; let rewound_target = self.sync_target().await; assert_eq!( rewound_target, target, "rewound database target mismatch after one-step rewind", ); Ok(()) } fn max_rewind_depth() -> Option { Some(1) } } impl StateSyncDb for fixed::CompactDb where F: Family, E: Storage + Clock + Metrics, K: Array, V: FixedValue + 'static, H: Hasher + 'static, S: Strategy, Operation>: EncodeShared + CodecRead, R: sync::compact::Resolver< Family = F, Op = Operation>, Digest = H::Digest, >, { type SyncError = sync::Error; async fn sync_db( context: E, config: Self::Config, resolver: R, mut target: Self::SyncTarget, tip_updates: mpsc::Receiver, mut finish: Option>, reached_target: Option>, _sync_config: SyncEngineConfig, ) -> Result { let mut attempt = 0u64; let mut tip_updates = Some(tip_updates); loop { if let Some(tip_updates) = tip_updates.as_mut() { if let Some(update) = drain_latest_target(tip_updates).await { target = update; } } let context = context.child("sync").with_attribute("attempt", attempt); attempt += 1; let update_future = tip_updates.as_mut().map_or_else( || Either::Right(pending()), |updates| Either::Left(updates.recv()), ); let db = select! { update = update_future => { let Some(update) = update else { tip_updates = None; continue; }; target = update; continue; }, db = sync::compact::sync(sync::compact::Config:: { context, resolver: resolver.clone(), target: target.clone(), db_config: config.clone(), }) => db?, }; if let Some(tip_updates) = tip_updates.as_mut() { if let Some(update) = drain_latest_target(tip_updates).await { target = update; continue; } } if let Some(reached_target) = reached_target.as_ref() { if reached_target.send(target.clone()).await.is_err() { return Ok(db); } } let Some(finish) = finish.as_mut() else { return Ok(db); }; let Some(tip_updates) = tip_updates.as_mut() else { return Ok(db); }; select! { _ = finish.recv() => return Ok(db), update = tip_updates.recv() => { let Some(update) = update else { return Ok(db); }; target = update; }, } } } } impl StateSyncDb for variable::CompactDb where F: Family, E: Storage + Clock + Metrics, K: Key, V: VariableValue + 'static, H: Hasher + 'static, Operation>: EncodeShared + CodecRead, C: Clone + Send + Sync + 'static, S: Strategy, R: sync::compact::Resolver< Family = F, Op = Operation>, Digest = H::Digest, >, { type SyncError = sync::Error; async fn sync_db( context: E, config: Self::Config, resolver: R, mut target: Self::SyncTarget, tip_updates: mpsc::Receiver, mut finish: Option>, reached_target: Option>, _sync_config: SyncEngineConfig, ) -> Result { let mut attempt = 0u64; let mut tip_updates = Some(tip_updates); loop { if let Some(tip_updates) = tip_updates.as_mut() { if let Some(update) = drain_latest_target(tip_updates).await { target = update; } } let context = context.child("sync").with_attribute("attempt", attempt); attempt += 1; let update_future = tip_updates.as_mut().map_or_else( || Either::Right(pending()), |updates| Either::Left(updates.recv()), ); let db = select! { update = update_future => { let Some(update) = update else { tip_updates = None; continue; }; target = update; continue; }, db = sync::compact::sync(sync::compact::Config:: { context, resolver: resolver.clone(), target: target.clone(), db_config: config.clone(), }) => db?, }; if let Some(tip_updates) = tip_updates.as_mut() { if let Some(update) = drain_latest_target(tip_updates).await { target = update; continue; } } if let Some(reached_target) = reached_target.as_ref() { if reached_target.send(target.clone()).await.is_err() { return Ok(db); } } let Some(finish) = finish.as_mut() else { return Ok(db); }; let Some(tip_updates) = tip_updates.as_mut() else { return Ok(db); }; select! { _ = finish.recv() => return Ok(db), update = tip_updates.recv() => { let Some(update) = update else { return Ok(db); }; target = update; }, } } } } #[cfg(test)] mod tests { use super::*; use commonware_cryptography::{sha256::Digest, Sha256}; use commonware_parallel::Sequential; use commonware_runtime::{ buffer::paged::CacheRef, deterministic, BufferPooler, Runner as _, Spawner as _, Supervisor as _, }; use commonware_storage::{ journal::contiguous::fixed::Config as FixedJournalConfig, merkle::{compact::Config as MerkleConfig, full::Config as FullMerkleConfig, mmr}, translator::TwoCap, }; use commonware_utils::{NZUsize, NZU16, NZU64}; use std::time::Duration; type FixedDb = fixed::CompactDb; type FullFixedDb = fixed::Db; type VariableDb = variable::CompactDb< mmr::Family, deterministic::Context, Digest, Vec, Sha256, ((), (commonware_codec::RangeCfg, ())), Sequential, >; fn fixed_config(suffix: &str) -> fixed::CompactConfig { fixed::CompactConfig { merkle: MerkleConfig { partition: format!("stateful-immutable-unjournaled-{suffix}"), strategy: Sequential, }, commit_codec_config: (), } } fn full_fixed_config( suffix: &str, pooler: &impl BufferPooler, ) -> fixed::Config { let page_cache = CacheRef::from_pooler(pooler, NZU16!(101), NZUsize!(11)); fixed::Config { merkle_config: FullMerkleConfig { journal_partition: format!("stateful-immutable-full-journal-{suffix}"), metadata_partition: format!("stateful-immutable-full-metadata-{suffix}"), items_per_blob: NZU64!(11), write_buffer: NZUsize!(1024), strategy: Sequential, page_cache: page_cache.clone(), }, log: FixedJournalConfig { partition: format!("stateful-immutable-full-log-{suffix}"), items_per_blob: NZU64!(7), page_cache, write_buffer: NZUsize!(1024), }, translator: TwoCap, } } const fn sync_config() -> SyncEngineConfig { SyncEngineConfig { fetch_batch_size: NZU64!(1), apply_batch_size: 1, max_outstanding_requests: 1, update_channel_size: NZUsize!(1), max_retained_roots: 0, } } fn assert_managed_db>() {} fn assert_state_sync_db() where T: StateSyncDb, { } #[derive(Clone)] struct SupersedingCompactResolver { source: Arc, stale_target: sync::compact::Target, stale_request_tx: mpsc::Sender<()>, } impl sync::compact::Resolver for SupersedingCompactResolver { type Family = mmr::Family; type Digest = Digest; type Op = fixed::Operation; type Error = sync::compact::ServeError; async fn get_compact_state( &self, target: sync::compact::Target, ) -> Result, Self::Error> { if target == self.stale_target { let _ = self.stale_request_tx.send(()).await; return futures::future::pending().await; } sync::compact::Resolver::get_compact_state(&self.source, target).await } } #[test] fn immutable_unjournaled_trait_impls_compile() { assert_managed_db::(); assert_managed_db::(); assert_state_sync_db::>(); assert_state_sync_db::>(); } #[test] fn managed_db_finalize_commits_fixed_immutable_unjournaled_batches() { deterministic::Runner::default().start(|context| async move { let config = fixed_config("managed-db"); let db = FixedDb::init(context.child("db"), config).await.unwrap(); let db = Arc::new(AsyncRwLock::new(db)); let key = Sha256::hash(&[1]); let value = Sha256::hash(&[2]); let metadata = Sha256::hash(&[3]); let batch = >::new_batch(&db) .await .set(key, value) .with_inactivity_floor(mmr::Location::new(1)) .with_metadata(metadata); let merkleized = crate::stateful::db::Unmerkleized::merkleize(batch) .await .unwrap(); let expected_root = merkleized.root(); { let mut guard = db.write().await; >::finalize(&mut *guard, merkleized) .await .unwrap(); } let guard = db.read().await; assert_eq!(guard.root(), expected_root); assert_eq!(guard.get_metadata(), Some(metadata)); let target = >::sync_target(&*guard).await; assert_eq!(target.root, guard.root()); assert_eq!(target.leaf_count, mmr::Location::new(3)); }); } #[test] fn state_sync_fetches_fixed_immutable_compact_state() { deterministic::Runner::default().start(|context| async move { let mut source = FixedDb::init(context.child("source"), fixed_config("source")) .await .unwrap(); let metadata = Sha256::hash(&[3]); let floor = source.inactivity_floor_loc(); let batch = source .new_batch() .set(Sha256::hash(&[1]), Sha256::hash(&[2])) .merkleize(&source, Some(metadata), floor); source.apply_batch(batch).unwrap(); source.sync().await.unwrap(); let target = source.current_target(); let (_update_tx, update_rx) = mpsc::channel(1); let synced = >>::sync_db( context.child("target"), fixed_config("target"), Arc::new(source), target.clone(), update_rx, None, None, sync_config(), ) .await .unwrap(); assert_eq!(synced.current_target(), target); assert_eq!(synced.get_metadata(), Some(metadata)); }); } #[test] fn state_sync_supersedes_in_flight_stale_compact_target() { deterministic::Runner::default().start(|context| async move { let mut source = FullFixedDb::init( context.child("source"), full_fixed_config("source", &context), ) .await .unwrap(); let floor = source.inactivity_floor_loc(); let batch = source .new_batch() .set(Sha256::hash(&[1]), Sha256::hash(&[2])) .merkleize(&source, Some(Sha256::hash(&[9])), floor); source.apply_batch(batch).await.unwrap(); source.sync().await.unwrap(); let stale_target = sync::compact::Target { root: source.root(), leaf_count: source.bounds().await.end, }; let floor = source.inactivity_floor_loc(); let batch = source .new_batch() .set(Sha256::hash(&[3]), Sha256::hash(&[4])) .merkleize(&source, Some(Sha256::hash(&[10])), floor); source.apply_batch(batch).await.unwrap(); source.sync().await.unwrap(); let latest_target = sync::compact::Target { root: source.root(), leaf_count: source.bounds().await.end, }; let (stale_request_tx, mut stale_request_rx) = mpsc::channel(1); let resolver = SupersedingCompactResolver { source: Arc::new(source), stale_target: stale_target.clone(), stale_request_tx, }; let (update_tx, update_rx) = mpsc::channel(1); let sync_handle = context.child("sync").spawn(move |context| async move { >::sync_db( context.child("target"), fixed_config("supersede-target"), resolver, stale_target, update_rx, None, None, sync_config(), ) .await }); context .timeout(Duration::from_secs(1), async move { stale_request_rx.recv().await.unwrap(); }) .await .expect("sync should request the stale target first"); update_tx.send(latest_target.clone()).await.unwrap(); let synced = context .timeout(Duration::from_secs(1), sync_handle) .await .expect("sync should switch to the latest target") .expect("spawned sync task should complete") .unwrap(); assert_eq!(synced.current_target(), latest_target); assert_eq!(synced.get_metadata(), Some(Sha256::hash(&[10]))); }); } #[test] fn managed_db_rewinds_fixed_immutable_unjournaled_one_commit_range() { deterministic::Runner::default().start(|context| async move { let config = fixed_config("rewind"); let mut db = FixedDb::init(context.child("db"), config).await.unwrap(); let floor = db.inactivity_floor_loc(); let batch = db .new_batch() .set(Sha256::hash(&[1]), Sha256::hash(&[2])) .merkleize(&db, Some(Sha256::hash(&[11])), floor); db.apply_batch(batch).unwrap(); db.sync().await.unwrap(); let first_target = >::sync_target(&db).await; let floor = db.inactivity_floor_loc(); let batch = db .new_batch() .set(Sha256::hash(&[3]), Sha256::hash(&[4])) .merkleize(&db, Some(Sha256::hash(&[22])), floor); db.apply_batch(batch).unwrap(); db.sync().await.unwrap(); let second_target = >::sync_target(&db).await; assert_ne!(second_target, first_target); >::rewind_to_target(&mut db, first_target.clone()) .await .unwrap(); let rewound_target = >::sync_target(&db).await; assert_eq!(rewound_target, first_target); assert_eq!(db.get_metadata(), Some(Sha256::hash(&[11]))); }); } }