//! [`ManagedDb`] implementation for QMDB [`any`](commonware_storage::qmdb::any) databases. //! //! The QMDB batch API passes `&db` to `get()` and `merkleize()` for //! read-through to committed state. This module provides wrapper types //! that capture `Arc>` alongside the raw batch so the //! [`Unmerkleized`](super::Unmerkleized) and [`Merkleized`](super::Merkleized) //! traits can be implemented without a DB parameter. use crate::stateful::db::{ ManagedDb, Merkleized as MerkleizedTrait, StateSyncDb, SyncEngineConfig, Unmerkleized as UnmerkleizedTrait, }; use commonware_codec::{Codec, Read as CodecRead}; use commonware_cryptography::Hasher; use commonware_parallel::Strategy; use commonware_runtime::{Clock, Metrics, Storage}; use commonware_storage::{ index::{ unordered::Index as UnorderedIdx, Ordered as OrderedIndex, Unordered as UnorderedIndex, }, journal::contiguous::{ fixed::Journal as FixedJournal, variable::Journal as VariableJournal, Contiguous, Mutable, }, merkle::{Family, Location}, qmdb::{ any::{ batch::{MerkleizedBatch, UnmerkleizedBatch}, db::Db, operation::{Operation, Update}, ordered, unordered, value::{self, FixedEncoding, ValueEncoding, VariableEncoding}, FixedConfig, VariableConfig, }, operation::Key, sync::{self, resolver::Resolver, Target as AnySyncTarget}, Error, }, translator::Translator, Persistable, }; use commonware_utils::{channel::mpsc, non_empty_range, sync::AsyncRwLock, Array}; use std::{ops::Deref, sync::Arc}; // Matches commonware_storage::qmdb::any::BITMAP_CHUNK_BYTES, which is crate-private. const ANY_BITMAP_CHUNK_BYTES: usize = 64; type AnyDbHandle = Arc>>; /// Wraps a QMDB [`UnmerkleizedBatch`] with a reference to the parent /// database, implementing the [`Unmerkleized`](super::Unmerkleized) trait. pub struct AnyUnmerkleized where F: Family, E: Storage + Clock + Metrics, U: Update, C: Contiguous>, I: UnorderedIndex>, H: Hasher, S: Strategy, Operation: Codec, { batch: UnmerkleizedBatch, db: AnyDbHandle, metadata: Option, } /// Key-value operations for the `any` unordered update kind. impl AnyUnmerkleized, S> where F: Family, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding + 'static, C: Mutable>> + Persistable, I: UnorderedIndex> + 'static, H: Hasher, S: Strategy, Operation>: Codec, { /// Set commit metadata included in the next /// [`merkleize`](UnmerkleizedTrait::merkleize) call. pub fn with_metadata(mut self, metadata: V::Value) -> Self { self.metadata = Some(metadata); self } /// Read a value by key, falling back to committed state. pub async fn get(&self, key: &K) -> Result, Error> { let db = self.db.read().await; self.batch.get(key, &*db).await } /// Read multiple values by key, falling back to committed state. /// /// Returns results in the same order as the input keys. pub async fn get_many(&self, keys: &[&K]) -> Result>, Error> { let db = self.db.read().await; self.batch.get_many(keys, &*db).await } /// Record a mutation. `Some(value)` for upsert, `None` for delete. pub fn write(mut self, key: K, value: Option) -> Self { self.batch = self.batch.write(key, value); self } } /// Wraps a QMDB [`MerkleizedBatch`] with a reference to the parent /// database, implementing the [`Merkleized`](super::Merkleized) trait. pub struct AnyMerkleized where F: Family, E: Storage + Clock + Metrics, U: Update, C: Contiguous>, I: UnorderedIndex>, H: Hasher, S: Strategy, Operation: Codec, { inner: Arc>, db: AnyDbHandle, } impl Deref for AnyUnmerkleized where F: Family, E: Storage + Clock + Metrics, U: Update, C: Contiguous>, I: UnorderedIndex>, H: Hasher, S: Strategy, Operation: Codec, { type Target = UnmerkleizedBatch; fn deref(&self) -> &Self::Target { &self.batch } } impl Deref for AnyMerkleized where F: Family, E: Storage + Clock + Metrics, U: Update, C: Contiguous>, I: UnorderedIndex>, H: Hasher, S: Strategy, Operation: Codec, { type Target = MerkleizedBatch; fn deref(&self) -> &Self::Target { &self.inner } } /// Key-value operations for the `any` ordered update kind. impl AnyUnmerkleized, S> where F: Family, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding + 'static, C: Mutable>> + Persistable, I: OrderedIndex> + 'static, H: Hasher, S: Strategy, Operation>: Codec, { /// Set commit metadata included in the next /// [`merkleize`](UnmerkleizedTrait::merkleize) call. pub fn with_metadata(mut self, metadata: V::Value) -> Self { self.metadata = Some(metadata); self } /// Read a value by key, falling back to committed state. pub async fn get(&self, key: &K) -> Result, Error> { let db = self.db.read().await; self.batch.get(key, &*db).await } /// Read multiple values by key, falling back to committed state. /// /// Returns results in the same order as the input keys. pub async fn get_many(&self, keys: &[&K]) -> Result>, Error> { let db = self.db.read().await; self.batch.get_many(keys, &*db).await } /// Record a mutation. `Some(value)` for upsert, `None` for delete. pub fn write(mut self, key: K, value: Option) -> Self { self.batch = self.batch.write(key, value); self } } /// Read-through operations for the `any` merkleized batch. impl AnyMerkleized where F: Family, E: Storage + Clock + Metrics, U: Update, C: Contiguous>, I: UnorderedIndex> + 'static, H: Hasher, S: Strategy, Operation: Codec, { /// Read a value by key, falling back to committed state. pub async fn get(&self, key: &U::Key) -> Result, Error> { let db = self.db.read().await; self.inner.get(key, &*db).await } /// Read multiple values by key, falling back to committed state. /// /// Returns results in the same order as the input keys. pub async fn get_many(&self, keys: &[&U::Key]) -> Result>, Error> { let db = self.db.read().await; self.inner.get_many(keys, &*db).await } } /// Implement [`Unmerkleized`](UnmerkleizedTrait) for the `any` unordered update kind. impl UnmerkleizedTrait for AnyUnmerkleized, S> where F: Family, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding + 'static, C: Mutable>> + Persistable, I: UnorderedIndex> + 'static, H: Hasher, S: Strategy, Operation>: Codec, { type Merkleized = AnyMerkleized, S>; type Error = Error; async fn merkleize(self) -> Result> { let db = self.db.read().await; let merkleized = self.batch.merkleize(&*db, self.metadata).await?; Ok(AnyMerkleized { inner: merkleized, db: self.db.clone(), }) } } /// Implement [`Unmerkleized`](UnmerkleizedTrait) for the `any` ordered update kind. impl UnmerkleizedTrait for AnyUnmerkleized, S> where F: Family, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding + 'static, C: Mutable>> + Persistable, I: OrderedIndex> + 'static, H: Hasher, S: Strategy, Operation>: Codec, { type Merkleized = AnyMerkleized, S>; type Error = Error; async fn merkleize(self) -> Result> { let db = self.db.read().await; let merkleized = self.batch.merkleize(&*db, self.metadata).await?; Ok(AnyMerkleized { inner: merkleized, db: self.db.clone(), }) } } /// Implement [`Merkleized`](MerkleizedTrait) for all supported `any` update kinds. impl MerkleizedTrait for AnyMerkleized where F: Family, E: Storage + Clock + Metrics, U: Update, C: Mutable> + Persistable, I: UnorderedIndex> + 'static, H: Hasher, S: Strategy, Operation: Codec, AnyUnmerkleized: UnmerkleizedTrait, { type Digest = H::Digest; type Unmerkleized = AnyUnmerkleized; fn root(&self) -> H::Digest { self.inner.root() } fn new_batch(&self) -> Self::Unmerkleized { AnyUnmerkleized { batch: self.inner.new_batch::(), db: self.db.clone(), metadata: None, } } } /// Implement [`ManagedDb`] for unordered QMDB databases with fixed-size values. /// /// `new_batch` captures the `Arc>` in the returned /// wrapper so that `get()` and `merkleize()` can read through to /// committed state. /// /// `finalize` applies the merkleized batch's changeset and durably /// commits it to disk. impl ManagedDb for Db< F, E, FixedJournal>>>, UnorderedIdx>, H, unordered::Update>, ANY_BITMAP_CHUNK_BYTES, S, > where F: Family, E: Storage + Clock + Metrics, K: Array, V: value::FixedValue + 'static, H: Hasher + 'static, T: Translator, S: Strategy, { type Unmerkleized = AnyUnmerkleized< F, E, FixedJournal>>>, UnorderedIdx>, H, unordered::Update>, S, >; type Merkleized = AnyMerkleized< F, E, FixedJournal>>>, UnorderedIdx>, H, unordered::Update>, S, >; type Error = Error; type Config = FixedConfig; type SyncTarget = AnySyncTarget; async fn init(context: E, config: Self::Config) -> Result> { ::init(context, config).await } async fn new_batch(db: &Arc>) -> Self::Unmerkleized { let inner = db.read().await; AnyUnmerkleized { batch: inner.new_batch(), db: db.clone(), metadata: None, } } fn matches_sync_target(batch: &Self::Merkleized, target: &Self::SyncTarget) -> bool { batch.root() == target.root && *target.range.start() == batch.bounds().inactivity_floor && *target.range.end() == Location::::new(batch.bounds().total_size) } async fn finalize(&mut self, batch: Self::Merkleized) -> Result<(), Error> { self.apply_batch(batch.inner).await?; self.sync().await?; Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { let bounds = self.bounds().await; AnySyncTarget::new( self.root(), non_empty_range!(self.sync_boundary(), bounds.end), ) } async fn rewind_to_target(&mut self, target: Self::SyncTarget) -> Result<(), Error> { self.rewind(target.range.end()).await?; self.sync().await?; let rewound_target = self.sync_target().await; assert_eq!( rewound_target, target, "rewound database target mismatch after rewind", ); Ok(()) } } /// Implement [`ManagedDb`] for unordered QMDB databases with variable-size values. impl ManagedDb for Db< F, E, VariableJournal>>>, UnorderedIdx>, H, unordered::Update>, ANY_BITMAP_CHUNK_BYTES, S, > where F: Family, E: Storage + Clock + Metrics, K: Key, V: value::VariableValue + 'static, H: Hasher, T: Translator, S: Strategy, Operation>>: Codec, { type Unmerkleized = AnyUnmerkleized< F, E, VariableJournal>>>, UnorderedIdx>, H, unordered::Update>, S, >; type Merkleized = AnyMerkleized< F, E, VariableJournal>>>, UnorderedIdx>, H, unordered::Update>, S, >; type Error = Error; type Config = VariableConfig< T, >> as CodecRead>::Cfg, S, >; type SyncTarget = AnySyncTarget; async fn init(context: E, config: Self::Config) -> Result> { ::init(context, config).await } async fn new_batch(db: &Arc>) -> Self::Unmerkleized { let inner = db.read().await; AnyUnmerkleized { batch: inner.new_batch(), db: db.clone(), metadata: None, } } fn matches_sync_target(batch: &Self::Merkleized, target: &Self::SyncTarget) -> bool { batch.root() == target.root && *target.range.start() == batch.bounds().inactivity_floor && *target.range.end() == Location::::new(batch.bounds().total_size) } async fn finalize(&mut self, batch: Self::Merkleized) -> Result<(), Error> { self.apply_batch(batch.inner).await?; self.sync().await?; Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { let bounds = self.bounds().await; AnySyncTarget::new( self.root(), non_empty_range!(self.sync_boundary(), bounds.end), ) } async fn rewind_to_target(&mut self, target: Self::SyncTarget) -> Result<(), Error> { self.rewind(target.range.end()).await?; self.sync().await?; let rewound_target = self.sync_target().await; assert_eq!( rewound_target, target, "rewound database target mismatch after rewind", ); Ok(()) } } impl StateSyncDb for Db< F, E, FixedJournal>>>, UnorderedIdx>, H, unordered::Update>, ANY_BITMAP_CHUNK_BYTES, S, > where F: Family, E: Storage + Clock + Metrics, K: Array, V: value::FixedValue + 'static, H: Hasher, T: Translator, S: Strategy, R: Resolver< Family = F, Op = Operation>>, Digest = H::Digest, >, { type SyncError = sync::Error; async fn sync_db( context: E, config: Self::Config, resolver: R, target: Self::SyncTarget, tip_updates: mpsc::Receiver, finish: Option>, reached_target: Option>, sync_config: SyncEngineConfig, ) -> Result { sync::sync(sync::engine::Config { context, resolver, target, max_outstanding_requests: sync_config.max_outstanding_requests, fetch_batch_size: sync_config.fetch_batch_size, apply_batch_size: sync_config.apply_batch_size, db_config: config, update_rx: Some(tip_updates), finish_rx: finish, reached_target_tx: reached_target, max_retained_roots: sync_config.max_retained_roots, }) .await } } impl StateSyncDb for Db< F, E, VariableJournal>>>, UnorderedIdx>, H, unordered::Update>, ANY_BITMAP_CHUNK_BYTES, S, > where F: Family, E: Storage + Clock + Metrics, K: Key, V: value::VariableValue + 'static, H: Hasher, T: Translator, S: Strategy, Operation>>: Codec, R: Resolver< Family = F, Op = Operation>>, Digest = H::Digest, >, { type SyncError = sync::Error; async fn sync_db( context: E, config: Self::Config, resolver: R, target: Self::SyncTarget, tip_updates: mpsc::Receiver, finish: Option>, reached_target: Option>, sync_config: SyncEngineConfig, ) -> Result { sync::sync(sync::engine::Config { context, resolver, target, max_outstanding_requests: sync_config.max_outstanding_requests, fetch_batch_size: sync_config.fetch_batch_size, apply_batch_size: sync_config.apply_batch_size, db_config: config, update_rx: Some(tip_updates), finish_rx: finish, reached_target_tx: reached_target, max_retained_roots: sync_config.max_retained_roots, }) .await } }