//! Batch mutation API for Keyless QMDBs. use super::{operation::Operation, Keyless}; use crate::{ journal::{authenticated, contiguous::Mutable, Error as JournalError}, merkle::{Family, Location}, qmdb::{any::value::ValueEncoding, Error}, Context, Persistable, }; use commonware_codec::EncodeShared; use commonware_cryptography::{Digest, Hasher}; use core::iter; use std::sync::{Arc, Weak}; /// A speculative batch of operations whose root digest has not yet been computed, in contrast /// to [`MerkleizedBatch`]. /// /// Consuming [`UnmerkleizedBatch::merkleize`] produces an `Arc`. pub struct UnmerkleizedBatch where F: Family, V: ValueEncoding, H: Hasher, Operation: EncodeShared, { /// Authenticated journal batch for computing the speculative Merkle root. journal_batch: authenticated::UnmerkleizedBatch>, /// Pending appends. appends: Vec, /// Parent batch in the chain. `None` for batches created directly from the DB. parent: Option>>, /// Total operation count before this batch (committed DB + prior batches). /// This batch's i-th operation lands at location `base_size + i`. base_size: u64, /// The database size when this batch was created, used to detect stale batches. db_size: u64, } /// A speculative batch of operations whose root digest has been computed, /// in contrast to [`UnmerkleizedBatch`]. #[derive(Clone)] pub struct MerkleizedBatch where Operation: EncodeShared, { /// Authenticated journal batch (Merkle state + local items). pub(super) journal_batch: Arc>>, /// The parent batch in the chain, if any. pub(super) parent: Option>, /// Total operations before this batch's own ops (DB + ancestor batches). pub(super) base_size: u64, /// Total operation count after this batch. pub(super) total_size: u64, /// The database size when the initial batch was created. pub(super) db_size: u64, /// Each ancestor's `total_size` (operation count after that ancestor). /// Used by `apply_batch` to validate partial ancestor commits. pub(super) ancestor_batch_ends: Vec, } impl MerkleizedBatch where Operation: EncodeShared, { /// Iterate over ancestor batches (parent first, then grandparent, etc.). pub(super) fn ancestors(&self) -> impl Iterator> { let mut next = self.parent.as_ref().and_then(Weak::upgrade); iter::from_fn(move || { let batch = next.take()?; next = batch.parent.as_ref().and_then(Weak::upgrade); Some(batch) }) } } /// Read a single operation from the parent chain at the given location. /// /// Returns `None` if the location cannot be found in the live parent chain (e.g. the /// owning ancestor was committed and freed). Callers should fall through to the committed /// DB in that case. fn read_chain_op( batch: &MerkleizedBatch, loc: u64, ) -> Option> where Operation: EncodeShared, { // Each batch's items span [size - items.len(), size). We compute the range from the // journal (strong Arcs, always intact) rather than from the QMDB-layer Weak parent // (which may be dead). let self_end = batch.journal_batch.size(); let self_base = self_end - batch.journal_batch.items().len() as u64; if loc >= self_base && loc < self_end { return Some(batch.journal_batch.items()[(loc - self_base) as usize].clone()); } for ancestor in batch.ancestors() { let end = ancestor.journal_batch.size(); let base = end - ancestor.journal_batch.items().len() as u64; if loc >= base && loc < end { return Some(ancestor.journal_batch.items()[(loc - base) as usize].clone()); } } None } impl UnmerkleizedBatch where F: Family, V: ValueEncoding, H: Hasher, Operation: EncodeShared, { /// Create a batch from a committed DB (no parent chain). pub(super) fn new(keyless: &Keyless, journal_size: u64) -> Self where E: Context, C: Mutable> + Persistable, { Self { journal_batch: keyless.journal.new_batch(), appends: Vec::new(), parent: None, base_size: journal_size, db_size: journal_size, } } /// The location that the next appended value will be placed at. pub const fn size(&self) -> Location { Location::new(self.base_size + self.appends.len() as u64) } /// Append a value. pub fn append(mut self, value: V::Value) -> Self { self.appends.push(value); self } /// Read a value at `loc`. /// /// Reads from pending appends, parent chain, or base DB. pub async fn get( &self, loc: Location, db: &Keyless, ) -> Result, Error> where E: Context, C: Mutable> + Persistable, { let loc_val = *loc; // Check this batch's pending appends. if loc_val >= self.base_size { let idx = (loc_val - self.base_size) as usize; return if idx < self.appends.len() { Ok(Some(self.appends[idx].clone())) } else { Ok(None) }; } // Check parent operation chain. If the ancestor was freed, read_chain_op returns None // and we fall through to the DB. if let Some(parent) = self.parent.as_ref() { if loc_val >= self.db_size { if let Some(op) = read_chain_op(parent, loc_val) { return Ok(op.into_value()); } } } // Fall through to base DB. db.get(loc).await } /// Resolve appends into operations, merkleize, and return an `Arc`. pub fn merkleize( self, db: &Keyless, metadata: Option, ) -> Arc> where E: Context, C: Mutable> + Persistable, { let base = self.base_size; // Build operations: one Append per value, then Commit. let mut ops: Vec> = Vec::with_capacity(self.appends.len() + 1); for value in self.appends { ops.push(Operation::Append(value)); } ops.push(Operation::Commit(metadata)); let total_size = base + ops.len() as u64; // Add operations to the journal batch and merkleize. let mut journal_batch = self.journal_batch; for op in &ops { journal_batch = journal_batch.add(op.clone()); } let journal = db.journal.with_mem(|mem| journal_batch.merkleize(mem)); let mut ancestor_batch_ends = Vec::new(); if let Some(parent) = &self.parent { ancestor_batch_ends.push(parent.total_size); for batch in parent.ancestors() { ancestor_batch_ends.push(batch.total_size); } } Arc::new(MerkleizedBatch { journal_batch: journal, parent: self.parent.as_ref().map(Arc::downgrade), base_size: self.base_size, total_size, db_size: self.db_size, ancestor_batch_ends, }) } } impl MerkleizedBatch where Operation: EncodeShared, { /// Return the speculative root. pub fn root(&self) -> D { self.journal_batch.root() } /// Read a value at `loc`. pub async fn get( &self, loc: Location, db: &Keyless, ) -> Result, Error> where E: Context, H: Hasher, C: Mutable> + Persistable, { let loc_val = *loc; // Check this batch's local items first, then walk parent chain. If an ancestor was // freed, fall through to the committed DB. if loc_val >= self.db_size { if let Some(op) = read_chain_op(self, loc_val) { return Ok(op.into_value()); } } // Fall through to base DB. db.get(loc).await } /// Create a new speculative batch of operations with this batch as its parent. /// /// All uncommitted ancestors in the chain must be kept alive until the child (or any /// descendant) is merkleized. Dropping an uncommitted ancestor causes data /// loss detected at `apply_batch` time. pub fn new_batch(self: &Arc) -> UnmerkleizedBatch where H: Hasher, { UnmerkleizedBatch { journal_batch: self.journal_batch.new_batch::(), appends: Vec::new(), parent: Some(Arc::clone(self)), base_size: self.total_size, db_size: self.db_size, } } }