//! [`ManagedDb`] implementation for QMDB [`current`](commonware_storage::qmdb::current) databases. //! //! The QMDB batch API passes `&db` to `get()` and `merkleize()` for //! read-through to committed state. This module provides wrapper types //! that capture `Arc>` alongside the raw batch so the //! [`Unmerkleized`](super::Unmerkleized) and [`Merkleized`](super::Merkleized) //! traits can be implemented without a DB parameter. use crate::stateful::db::{ ManagedDb, Merkleized as MerkleizedTrait, StateSyncDb, SyncEngineConfig, Unmerkleized as UnmerkleizedTrait, }; use commonware_codec::{Codec, Read as CodecRead}; use commonware_cryptography::Hasher; use commonware_parallel::Strategy; use commonware_runtime::{Clock, Metrics, Storage}; use commonware_storage::{ index::{ unordered::Index as UnorderedIdx, Ordered as OrderedIndex, Unordered as UnorderedIndex, }, journal::contiguous::{ fixed::Journal as FixedJournal, variable::Journal as VariableJournal, Contiguous, Mutable, }, merkle::{Graftable, Location}, qmdb::{ any::{ operation::{Operation, Update}, ordered, unordered, value::{self, FixedEncoding, ValueEncoding, VariableEncoding}, }, current::{ batch::{MerkleizedBatch, UnmerkleizedBatch}, db::Db, FixedConfig, VariableConfig, }, operation::Key, sync::{self, resolver::Resolver, Target as CurrentSyncTarget}, Error, }, translator::Translator, Persistable, }; use commonware_utils::{channel::mpsc, non_empty_range, sync::AsyncRwLock, Array}; use std::{ops::Deref, sync::Arc}; type CurrentDbHandle = Arc>>; /// Wraps a QMDB [`UnmerkleizedBatch`] with a reference to the parent /// database, implementing the [`Unmerkleized`](super::Unmerkleized) trait. pub struct CurrentUnmerkleized where F: Graftable, E: Storage + Clock + Metrics, U: Update, C: Contiguous>, I: UnorderedIndex>, H: Hasher, S: Strategy, Operation: Codec, { batch: UnmerkleizedBatch, db: CurrentDbHandle, metadata: Option, } /// Key-value operations for the `current` unordered update kind. impl CurrentUnmerkleized, N, S> where F: Graftable, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding + 'static, C: Mutable>> + Persistable, I: UnorderedIndex> + 'static, H: Hasher, S: Strategy, Operation>: Codec, { /// Set commit metadata included in the next /// [`merkleize`](UnmerkleizedTrait::merkleize) call. pub fn with_metadata(mut self, metadata: V::Value) -> Self { self.metadata = Some(metadata); self } /// Read a value by key, falling back to committed state. pub async fn get(&self, key: &K) -> Result, Error> { let db = self.db.read().await; self.batch.get(key, &*db).await } /// Read multiple values by key, falling back to committed state. /// /// Returns results in the same order as the input keys. pub async fn get_many(&self, keys: &[&K]) -> Result>, Error> { let db = self.db.read().await; self.batch.get_many(keys, &*db).await } /// Record a mutation. `Some(value)` for upsert, `None` for delete. pub fn write(mut self, key: K, value: Option) -> Self { self.batch = self.batch.write(key, value); self } } /// Wraps a QMDB [`MerkleizedBatch`] with a reference to the parent /// database, implementing the [`Merkleized`](super::Merkleized) trait. pub struct CurrentMerkleized where F: Graftable, E: Storage + Clock + Metrics, U: Update, C: Contiguous>, I: UnorderedIndex>, H: Hasher, S: Strategy, Operation: Codec, { inner: Arc>, db: CurrentDbHandle, } impl Deref for CurrentUnmerkleized where F: Graftable, E: Storage + Clock + Metrics, U: Update, C: Contiguous>, I: UnorderedIndex>, H: Hasher, S: Strategy, Operation: Codec, { type Target = UnmerkleizedBatch; fn deref(&self) -> &Self::Target { &self.batch } } impl Deref for CurrentMerkleized where F: Graftable, E: Storage + Clock + Metrics, U: Update, C: Contiguous>, I: UnorderedIndex>, H: Hasher, S: Strategy, Operation: Codec, { type Target = MerkleizedBatch; fn deref(&self) -> &Self::Target { &self.inner } } /// Key-value operations for the `current` ordered update kind. impl CurrentUnmerkleized, N, S> where F: Graftable, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding + 'static, C: Mutable>> + Persistable, I: OrderedIndex> + 'static, H: Hasher, S: Strategy, Operation>: Codec, { /// Set commit metadata included in the next /// [`merkleize`](UnmerkleizedTrait::merkleize) call. pub fn with_metadata(mut self, metadata: V::Value) -> Self { self.metadata = Some(metadata); self } /// Read a value by key, falling back to committed state. pub async fn get(&self, key: &K) -> Result, Error> { let db = self.db.read().await; self.batch.get(key, &*db).await } /// Read multiple values by key, falling back to committed state. /// /// Returns results in the same order as the input keys. pub async fn get_many(&self, keys: &[&K]) -> Result>, Error> { let db = self.db.read().await; self.batch.get_many(keys, &*db).await } /// Record a mutation. `Some(value)` for upsert, `None` for delete. pub fn write(mut self, key: K, value: Option) -> Self { self.batch = self.batch.write(key, value); self } } /// Read-through operations for the `current` merkleized batch. impl CurrentMerkleized where F: Graftable, E: Storage + Clock + Metrics, U: Update, C: Contiguous>, I: UnorderedIndex> + 'static, H: Hasher, S: Strategy, Operation: Codec, { /// Read a value by key, falling back to committed state. pub async fn get(&self, key: &U::Key) -> Result, Error> { let db = self.db.read().await; self.inner.get(key, &*db).await } /// Read multiple values by key, falling back to committed state. /// /// Returns results in the same order as the input keys. pub async fn get_many(&self, keys: &[&U::Key]) -> Result>, Error> { let db = self.db.read().await; self.inner.get_many(keys, &*db).await } } /// Implement [`Unmerkleized`](UnmerkleizedTrait) for the `current` unordered update kind. impl UnmerkleizedTrait for CurrentUnmerkleized, N, S> where F: Graftable, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding + 'static, C: Mutable>> + Persistable, I: UnorderedIndex> + 'static, H: Hasher, S: Strategy, Operation>: Codec, { type Merkleized = CurrentMerkleized, N, S>; type Error = Error; async fn merkleize(self) -> Result> { let db = self.db.read().await; let merkleized = self.batch.merkleize(&*db, self.metadata).await?; Ok(CurrentMerkleized { inner: merkleized, db: self.db.clone(), }) } } /// Implement [`Unmerkleized`](UnmerkleizedTrait) for the `current` ordered update kind. impl UnmerkleizedTrait for CurrentUnmerkleized, N, S> where F: Graftable, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding + 'static, C: Mutable>> + Persistable, I: OrderedIndex> + 'static, H: Hasher, S: Strategy, Operation>: Codec, { type Merkleized = CurrentMerkleized, N, S>; type Error = Error; async fn merkleize(self) -> Result> { let db = self.db.read().await; let merkleized = self.batch.merkleize(&*db, self.metadata).await?; Ok(CurrentMerkleized { inner: merkleized, db: self.db.clone(), }) } } /// Implement [`Merkleized`](MerkleizedTrait) for all supported `current` update kinds. impl MerkleizedTrait for CurrentMerkleized where F: Graftable, E: Storage + Clock + Metrics, U: Update, C: Mutable> + Persistable, I: UnorderedIndex> + 'static, H: Hasher, S: Strategy, Operation: Codec, CurrentUnmerkleized: UnmerkleizedTrait, { type Digest = H::Digest; type Unmerkleized = CurrentUnmerkleized; fn root(&self) -> H::Digest { self.inner.root() } fn new_batch(&self) -> Self::Unmerkleized { CurrentUnmerkleized { batch: self.inner.new_batch::(), db: self.db.clone(), metadata: None, } } } /// Implement [`ManagedDb`] for unordered current QMDB databases with fixed-size values. impl ManagedDb for Db< F, E, FixedJournal>>>, UnorderedIdx>, H, unordered::Update>, N, S, > where F: Graftable, E: Storage + Clock + Metrics, K: Array, V: value::FixedValue + 'static, H: Hasher + 'static, T: Translator, S: Strategy, { type Unmerkleized = CurrentUnmerkleized< F, E, FixedJournal>>>, UnorderedIdx>, H, unordered::Update>, N, S, >; type Merkleized = CurrentMerkleized< F, E, FixedJournal>>>, UnorderedIdx>, H, unordered::Update>, N, S, >; type Error = Error; type Config = FixedConfig; type SyncTarget = CurrentSyncTarget; async fn init(context: E, config: Self::Config) -> Result> { ::init(context, config).await } async fn new_batch(db: &Arc>) -> Self::Unmerkleized { let inner = db.read().await; CurrentUnmerkleized { batch: inner.new_batch(), db: db.clone(), metadata: None, } } fn matches_sync_target(batch: &Self::Merkleized, target: &Self::SyncTarget) -> bool { batch.ops_root() == target.root && *target.range.start() == batch.sync_boundary() && *target.range.end() == Location::::new(batch.bounds().total_size) } async fn finalize(&mut self, batch: Self::Merkleized) -> Result<(), Error> { self.apply_batch(batch.inner).await?; self.sync().await?; Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { let bounds = self.bounds().await; CurrentSyncTarget::new( self.ops_root(), non_empty_range!(self.sync_boundary(), bounds.end), ) } async fn rewind_to_target(&mut self, target: Self::SyncTarget) -> Result<(), Error> { self.rewind(target.range.end()).await?; self.sync().await?; let rewound_target = self.sync_target().await; assert_eq!( rewound_target, target, "rewound database target mismatch after rewind", ); Ok(()) } } /// Workaround for . /// /// Inside a `ManagedDb` trait impl, `::init(...)` in a non-async `fn` /// resolves to the *trait* method (infinite recursion), while in an /// `async fn` it resolves correctly to the inherent method but the compiler /// cannot verify the RPITIT future is `Send`. By placing the call in this /// module -- which does not import `ManagedDb` -- the compiler /// unambiguously picks the inherent `Db::init`. mod open { use commonware_codec::{Codec, Read}; use commonware_cryptography::Hasher; use commonware_parallel::Strategy; use commonware_runtime::{Clock, Metrics, Storage}; use commonware_storage::{ merkle::Graftable, qmdb::{ any::{ operation::Operation, unordered, value::{VariableEncoding, VariableValue}, }, current::{unordered::variable::Db, VariableConfig}, Error, }, }; use commonware_utils::Array; type VConfig = VariableConfig< T, >> as Read>::Cfg, S, >; pub(super) async fn variable( context: E, config: VConfig, ) -> Result, Error> where F: Graftable, E: Storage + Clock + Metrics, K: Array, V: VariableValue + 'static, H: Hasher, T: commonware_storage::translator::Translator, S: Strategy, Operation>>: Codec, { Db::init(context, config).await } } /// Implement [`ManagedDb`] for unordered current QMDB databases with variable-size values. impl ManagedDb for Db< F, E, VariableJournal>>>, UnorderedIdx>, H, unordered::Update>, N, S, > where F: Graftable, E: Storage + Clock + Metrics, K: Key + Array, V: value::VariableValue + 'static, H: Hasher, T: Translator, S: Strategy, Operation>>: Codec, { type Unmerkleized = CurrentUnmerkleized< F, E, VariableJournal>>>, UnorderedIdx>, H, unordered::Update>, N, S, >; type Merkleized = CurrentMerkleized< F, E, VariableJournal>>>, UnorderedIdx>, H, unordered::Update>, N, S, >; type Error = Error; type Config = VariableConfig< T, >> as CodecRead>::Cfg, S, >; type SyncTarget = CurrentSyncTarget; async fn init(context: E, config: Self::Config) -> Result> { open::variable(context, config).await } async fn new_batch(db: &Arc>) -> Self::Unmerkleized { let inner = db.read().await; CurrentUnmerkleized { batch: inner.new_batch(), db: db.clone(), metadata: None, } } fn matches_sync_target(batch: &Self::Merkleized, target: &Self::SyncTarget) -> bool { batch.ops_root() == target.root && *target.range.start() == batch.sync_boundary() && *target.range.end() == Location::::new(batch.bounds().total_size) } async fn finalize(&mut self, batch: Self::Merkleized) -> Result<(), Error> { self.apply_batch(batch.inner).await?; self.sync().await?; Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { let bounds = self.bounds().await; CurrentSyncTarget::new( self.ops_root(), non_empty_range!(self.sync_boundary(), bounds.end), ) } async fn rewind_to_target(&mut self, target: Self::SyncTarget) -> Result<(), Error> { self.rewind(target.range.end()).await?; self.sync().await?; let rewound_target = self.sync_target().await; assert_eq!( rewound_target, target, "rewound database target mismatch after rewind", ); Ok(()) } } impl StateSyncDb for Db< F, E, FixedJournal>>>, UnorderedIdx>, H, unordered::Update>, N, S, > where F: Graftable, E: Storage + Clock + Metrics, K: Array, V: value::FixedValue + 'static, H: Hasher, T: Translator, S: Strategy, R: Resolver< Family = F, Op = Operation>>, Digest = H::Digest, >, { type SyncError = sync::Error; async fn sync_db( context: E, config: Self::Config, resolver: R, target: Self::SyncTarget, tip_updates: mpsc::Receiver, finish: Option>, reached_target: Option>, sync_config: SyncEngineConfig, ) -> Result { sync::sync(sync::engine::Config { context, resolver, target, max_outstanding_requests: sync_config.max_outstanding_requests, fetch_batch_size: sync_config.fetch_batch_size, apply_batch_size: sync_config.apply_batch_size, db_config: config, update_rx: Some(tip_updates), finish_rx: finish, reached_target_tx: reached_target, max_retained_roots: sync_config.max_retained_roots, }) .await } } impl StateSyncDb for Db< F, E, VariableJournal>>>, UnorderedIdx>, H, unordered::Update>, N, S, > where F: Graftable, E: Storage + Clock + Metrics, K: Key + Array, V: value::VariableValue + 'static, H: Hasher, T: Translator, S: Strategy, Operation>>: Codec, R: Resolver< Family = F, Op = Operation>>, Digest = H::Digest, >, { type SyncError = sync::Error; async fn sync_db( context: E, config: Self::Config, resolver: R, target: Self::SyncTarget, tip_updates: mpsc::Receiver, finish: Option>, reached_target: Option>, sync_config: SyncEngineConfig, ) -> Result { sync::sync(sync::engine::Config { context, resolver, target, max_outstanding_requests: sync_config.max_outstanding_requests, fetch_batch_size: sync_config.fetch_batch_size, apply_batch_size: sync_config.apply_batch_size, db_config: config, update_rx: Some(tip_updates), finish_rx: finish, reached_target_tx: reached_target, max_retained_roots: sync_config.max_retained_roots, }) .await } } #[cfg(test)] mod tests { use super::*; use commonware_cryptography::{sha256::Digest, Sha256}; use commonware_parallel::Sequential; use commonware_runtime::{ buffer::paged::CacheRef, deterministic, BufferPooler, Runner as _, Supervisor as _, }; use commonware_storage::{ journal::contiguous::fixed::Config as FixedJournalConfig, merkle::{full::Config as MerkleConfig, mmr}, qmdb::current::unordered::fixed, translator::TwoCap, }; use commonware_utils::{non_empty_range, NZUsize, NZU16, NZU64}; use std::num::{NonZeroU16, NonZeroUsize}; type FixedDb = fixed::Db< mmr::Family, deterministic::Context, Digest, Digest, Sha256, TwoCap, 64, Sequential, >; const PAGE_SIZE: NonZeroU16 = NZU16!(101); const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11); fn fixed_config(suffix: &str, pooler: &impl BufferPooler) -> FixedConfig { let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE); FixedConfig { merkle_config: MerkleConfig { journal_partition: format!("stateful-current-journal-{suffix}"), metadata_partition: format!("stateful-current-metadata-{suffix}"), items_per_blob: NZU64!(11), write_buffer: NZUsize!(1024), strategy: Sequential, page_cache: page_cache.clone(), }, journal_config: FixedJournalConfig { partition: format!("stateful-current-log-{suffix}"), items_per_blob: NZU64!(7), page_cache, write_buffer: NZUsize!(1024), }, grafted_metadata_partition: format!("stateful-current-grafted-{suffix}"), translator: TwoCap, } } #[test] fn managed_db_matches_sync_target_rejects_wrong_ops_root_and_range() { deterministic::Runner::default().start(|context| async move { let config = fixed_config("matches-sync-target", &context); let db = FixedDb::init(context.child("db"), config.clone()) .await .unwrap(); let db = Arc::new(AsyncRwLock::new(db)); let key = Sha256::hash(b"key"); let value = Sha256::hash(b"value"); let metadata = Sha256::hash(b"metadata"); let batch = >::new_batch(&db) .await .write(key, Some(value)) .with_metadata(metadata); let merkleized = crate::stateful::db::Unmerkleized::merkleize(batch) .await .unwrap(); let mut verification_db = FixedDb::init(context.child("verification_db"), config) .await .unwrap(); verification_db .apply_batch(merkleized.inner.clone()) .await .unwrap(); verification_db.sync().await.unwrap(); let valid_target = >::sync_target(&verification_db).await; assert!(>::matches_sync_target( &merkleized, &valid_target, )); let mut wrong_root = valid_target.clone(); wrong_root.root = Sha256::hash(b"wrong ops root"); assert!(!>::matches_sync_target( &merkleized, &wrong_root, )); let mut wrong_range = valid_target.clone(); wrong_range.range = non_empty_range!( mmr::Location::new(*valid_target.range.start()), mmr::Location::new(*valid_target.range.end() + 1) ); assert!(!>::matches_sync_target( &merkleized, &wrong_range, )); }); } }