//! Journaled [`ManagedDb`] implementation for QMDB //! [`immutable`](commonware_storage::qmdb::immutable) databases. //! //! Immutable databases support adding new keyed values but not updates or //! deletions. The wrapper types here capture `Arc>` //! so the batch API can read through to committed state. use crate::stateful::db::{ ManagedDb, Merkleized as MerkleizedTrait, StateSyncDb, SyncEngineConfig, Unmerkleized as UnmerkleizedTrait, }; use commonware_codec::{Codec, EncodeShared, Read as CodecRead}; use commonware_cryptography::Hasher; use commonware_parallel::Strategy; use commonware_runtime::{Clock, Metrics, Storage}; use commonware_storage::{ journal::{ contiguous::{ fixed::Journal as FixedJournal, variable::Journal as VariableJournal, Mutable, }, Error as JournalError, }, merkle::{Family, Location}, qmdb::{ any::value::{FixedEncoding, FixedValue, ValueEncoding, VariableEncoding, VariableValue}, immutable::{ batch::{MerkleizedBatch, UnmerkleizedBatch}, fixed, variable, Immutable, Operation, }, operation::Key, sync::{self, resolver::Resolver, Target as AnySyncTarget}, Error, }, translator::Translator, Persistable, }; use commonware_utils::{channel::mpsc, non_empty_range, sync::AsyncRwLock, Array}; use std::{ops::Deref, sync::Arc}; type ImmutableDbHandle = Arc>>; /// Wraps an immutable [`UnmerkleizedBatch`] with a reference to the parent /// database, implementing the [`Unmerkleized`](crate::stateful::db::Unmerkleized) trait. pub struct ImmutableUnmerkleized where F: Family, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, C: Mutable> + Persistable, H: Hasher, T: Translator, S: Strategy, Operation: EncodeShared, { batch: UnmerkleizedBatch, db: ImmutableDbHandle, metadata: Option, inactivity_floor: Option>, } impl Deref for ImmutableUnmerkleized where F: Family, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, C: Mutable> + Persistable, H: Hasher, T: Translator, S: Strategy, Operation: EncodeShared, { type Target = UnmerkleizedBatch; fn deref(&self) -> &Self::Target { &self.batch } } impl ImmutableUnmerkleized where F: Family, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, C: Mutable> + Persistable, H: Hasher, T: Translator, S: Strategy, Operation: EncodeShared, { /// Set commit metadata included in the next /// [`merkleize`](UnmerkleizedTrait::merkleize) call. pub fn with_metadata(mut self, metadata: V::Value) -> Self { self.metadata = Some(metadata); self } /// Set the inactivity floor to include within the next [`merkleize`](UnmerkleizedTrait::merkleize) call. /// /// If unset, [`merkleize`](UnmerkleizedTrait::merkleize) will use the [`Default`] of [`Location`]. pub const fn with_inactivity_floor(mut self, floor: Location) -> Self { self.inactivity_floor = Some(floor); self } /// Read a value by key, falling back to committed state. pub async fn get(&self, key: &K) -> Result, Error> { let db = self.db.read().await; self.batch.get(key, &*db).await } /// Read multiple values by key, falling back to committed state. /// /// Returns results in the same order as the input keys. pub async fn get_many(&self, keys: &[&K]) -> Result>, Error> { let db = self.db.read().await; self.batch.get_many(keys, &*db).await } /// Set `key` to `value` in the speculative batch. pub fn set(mut self, key: K, value: V::Value) -> Self { self.batch = self.batch.set(key, value); self } } /// Wraps an immutable [`MerkleizedBatch`] with a reference to the parent /// database, implementing the [`Merkleized`](crate::stateful::db::Merkleized) trait. pub struct ImmutableMerkleized where F: Family, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, C: Mutable> + Persistable, H: Hasher, T: Translator, S: Strategy, Operation: EncodeShared, { inner: Arc>, db: ImmutableDbHandle, } impl Deref for ImmutableMerkleized where F: Family, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, C: Mutable> + Persistable, H: Hasher, T: Translator, S: Strategy, Operation: EncodeShared, { type Target = MerkleizedBatch; fn deref(&self) -> &Self::Target { &self.inner } } impl ImmutableMerkleized where F: Family, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, C: Mutable> + Persistable, H: Hasher, T: Translator, S: Strategy, Operation: EncodeShared, { /// Read a value by key, falling back to committed state. pub async fn get(&self, key: &K) -> Result, Error> { let db = self.db.read().await; self.inner.get(key, &*db).await } /// Read multiple values by key, falling back to committed state. /// /// Returns results in the same order as the input keys. pub async fn get_many(&self, keys: &[&K]) -> Result>, Error> { let db = self.db.read().await; self.inner.get_many(keys, &*db).await } } impl UnmerkleizedTrait for ImmutableUnmerkleized where F: Family, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, C: Mutable> + Persistable, H: Hasher, T: Translator, S: Strategy, Operation: EncodeShared, { type Merkleized = ImmutableMerkleized; type Error = Error; async fn merkleize(self) -> Result> { let db = self.db.read().await; let merkleized = self.batch.merkleize( &*db, self.metadata, self.inactivity_floor.unwrap_or_default(), ); Ok(ImmutableMerkleized { inner: merkleized, db: self.db.clone(), }) } } impl MerkleizedTrait for ImmutableMerkleized where F: Family, E: Storage + Clock + Metrics, K: Key, V: ValueEncoding, C: Mutable> + Persistable, H: Hasher, T: Translator, S: Strategy, Operation: EncodeShared, { type Digest = H::Digest; type Unmerkleized = ImmutableUnmerkleized; fn root(&self) -> H::Digest { self.inner.root() } fn new_batch(&self) -> Self::Unmerkleized { ImmutableUnmerkleized { batch: self.inner.new_batch::(), db: self.db.clone(), metadata: None, inactivity_floor: None, } } } impl ManagedDb for fixed::Db where F: Family, E: Storage + Clock + Metrics, K: Array, V: FixedValue + 'static, H: Hasher + 'static, T: Translator, S: Strategy, { type Unmerkleized = ImmutableUnmerkleized< F, E, K, FixedEncoding, FixedJournal>, H, T, S, >; type Merkleized = ImmutableMerkleized< F, E, K, FixedEncoding, FixedJournal>, H, T, S, >; type Error = Error; type Config = fixed::Config; type SyncTarget = AnySyncTarget; async fn init(context: E, config: Self::Config) -> Result> { ::init(context, config).await } async fn new_batch(db: &Arc>) -> Self::Unmerkleized { let inner = db.read().await; ImmutableUnmerkleized { batch: inner.new_batch(), db: db.clone(), metadata: None, inactivity_floor: None, } } fn matches_sync_target(batch: &Self::Merkleized, target: &Self::SyncTarget) -> bool { batch.root() == target.root && *target.range.start() == batch.bounds().inactivity_floor && *target.range.end() == Location::::new(batch.bounds().total_size) } async fn finalize(&mut self, batch: Self::Merkleized) -> Result<(), Error> { self.apply_batch(batch.inner).await?; self.sync().await?; Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { let bounds = self.bounds().await; AnySyncTarget::new( self.root(), non_empty_range!(self.sync_boundary(), bounds.end), ) } async fn rewind_to_target(&mut self, target: Self::SyncTarget) -> Result<(), Error> { self.rewind(target.range.end()).await?; self.sync().await?; let rewound_target = self.sync_target().await; assert_eq!( rewound_target, target, "rewound database target mismatch after rewind", ); Ok(()) } } impl ManagedDb for variable::Db where F: Family, E: Storage + Clock + Metrics, K: Key, V: VariableValue + 'static, H: Hasher + 'static, T: Translator, S: Strategy, variable::Operation: Codec, { type Unmerkleized = ImmutableUnmerkleized< F, E, K, VariableEncoding, VariableJournal>, H, T, S, >; type Merkleized = ImmutableMerkleized< F, E, K, VariableEncoding, VariableJournal>, H, T, S, >; type Error = Error; type Config = variable::Config as CodecRead>::Cfg, S>; type SyncTarget = AnySyncTarget; async fn init(context: E, config: Self::Config) -> Result> { ::init(context, config).await } async fn new_batch(db: &Arc>) -> Self::Unmerkleized { let inner = db.read().await; ImmutableUnmerkleized { batch: inner.new_batch(), db: db.clone(), metadata: None, inactivity_floor: None, } } fn matches_sync_target(batch: &Self::Merkleized, target: &Self::SyncTarget) -> bool { batch.root() == target.root && *target.range.start() == batch.bounds().inactivity_floor && *target.range.end() == Location::::new(batch.bounds().total_size) } async fn finalize(&mut self, batch: Self::Merkleized) -> Result<(), Error> { self.apply_batch(batch.inner).await?; self.sync().await?; Ok(()) } async fn sync_target(&self) -> Self::SyncTarget { let bounds = self.bounds().await; AnySyncTarget::new( self.root(), non_empty_range!(self.sync_boundary(), bounds.end), ) } async fn rewind_to_target(&mut self, target: Self::SyncTarget) -> Result<(), Error> { self.rewind(target.range.end()).await?; self.sync().await?; let rewound_target = self.sync_target().await; assert_eq!( rewound_target, target, "rewound database target mismatch after rewind", ); Ok(()) } } impl StateSyncDb for fixed::Db where F: Family, E: Storage + Clock + Metrics, K: Array, V: FixedValue + 'static, H: Hasher + 'static, T: Translator, S: Strategy, R: Resolver, Digest = H::Digest>, { type SyncError = sync::Error; async fn sync_db( context: E, config: Self::Config, resolver: R, target: Self::SyncTarget, tip_updates: mpsc::Receiver, finish: Option>, reached_target: Option>, sync_config: SyncEngineConfig, ) -> Result { sync::sync(sync::engine::Config { context, resolver, target, max_outstanding_requests: sync_config.max_outstanding_requests, fetch_batch_size: sync_config.fetch_batch_size, apply_batch_size: sync_config.apply_batch_size, db_config: config, update_rx: Some(tip_updates), finish_rx: finish, reached_target_tx: reached_target, max_retained_roots: sync_config.max_retained_roots, }) .await } } impl StateSyncDb for variable::Db where F: Family, E: Storage + Clock + Metrics, K: Key, V: VariableValue + 'static, H: Hasher + 'static, T: Translator, S: Strategy, variable::Operation: Codec, R: Resolver, Digest = H::Digest>, { type SyncError = sync::Error; async fn sync_db( context: E, config: Self::Config, resolver: R, target: Self::SyncTarget, tip_updates: mpsc::Receiver, finish: Option>, reached_target: Option>, sync_config: SyncEngineConfig, ) -> Result { sync::sync(sync::engine::Config { context, resolver, target, max_outstanding_requests: sync_config.max_outstanding_requests, fetch_batch_size: sync_config.fetch_batch_size, apply_batch_size: sync_config.apply_batch_size, db_config: config, update_rx: Some(tip_updates), finish_rx: finish, reached_target_tx: reached_target, max_retained_roots: sync_config.max_retained_roots, }) .await } }