//! Authenticated journal implementation. //! //! An authenticated journal maintains a contiguous journal of items alongside a Merkle-family //! structure. The item at index i in the journal corresponds to the leaf at Location i in the //! Merkle structure. This structure enables efficient proofs that an item is included in the //! journal at a specific location. use crate::{ journal::{ contiguous::{fixed, variable, Contiguous, Many, Mutable, Reader}, Error as JournalError, }, merkle::{ self, batch, full::Merkle, hasher::{Hasher as _, Standard as StandardHasher}, mem::Mem, Bagging, Family, Location, Position, Proof, Readable, }, Context, Persistable, }; use alloc::{ sync::{Arc, Weak}, vec::Vec, }; use commonware_codec::{CodecFixedShared, CodecShared, Encode, EncodeShared}; use commonware_cryptography::{Digest, Hasher}; use commonware_parallel::Strategy; use core::num::NonZeroU64; use futures::{try_join, TryFutureExt as _}; use thiserror::Error; use tracing::{debug, warn}; /// Errors that can occur when interacting with an authenticated journal. #[derive(Error, Debug)] pub enum Error { #[error("merkle error: {0}")] Merkle(#[from] merkle::Error), #[error("journal error: {0}")] Journal(#[from] super::Error), } /// Strong ref to an ancestor [`MerkleizedBatch`] in the journal-batch chain. type MerkleizedParent = Arc::Digest, Item, S>>; /// A speculative batch whose root digest has not yet been computed, /// in contrast to [`MerkleizedBatch`]. pub struct UnmerkleizedBatch { // The inner batch of Merkle leaf digests. inner: batch::UnmerkleizedBatch, // The hasher to use for hashing the items. hasher: StandardHasher, // The items to append from this batch. items: Vec, // This batch's parent, or None if the parent is the journal itself. parent: Option>, } type MerkleizedBatchArc = Arc::Digest, Item, S>>; impl UnmerkleizedBatch { /// Add an item to the batch. #[allow(clippy::should_implement_trait)] pub fn add(mut self, item: Item) -> Self { let encoded = item.encode(); self.inner = self.inner.add(&self.hasher, &encoded); self.items.push(item); self } /// Collect ancestor items from the parent chain before downgrading. fn collect_ancestor_items( parent: &Option>, ) -> Vec>> { let Some(parent) = parent else { return Vec::new(); }; let mut items = Vec::new(); if !parent.items.is_empty() { items.push(Arc::clone(&parent.items)); } let mut current = parent.parent.as_ref().and_then(Weak::upgrade); while let Some(batch) = current { if !batch.items.is_empty() { items.push(Arc::clone(&batch.items)); } current = batch.parent.as_ref().and_then(Weak::upgrade); } items.reverse(); items } /// Merkleize the batch. /// `base` provides committed node data as fallback during hash computation. pub fn merkleize(self, base: &Mem) -> MerkleizedBatchArc { let Self { inner, hasher, items, parent, } = self; let items = Arc::new(items); let merkle = inner.merkleize(base, &hasher); let ancestor_items = Self::collect_ancestor_items(&parent); Arc::new(MerkleizedBatch { inner: merkle, bagging: hasher.root_bagging(), items, parent: parent.as_ref().map(Arc::downgrade), ancestor_items, }) } /// Like [`merkleize`](Self::merkleize), but the caller supplies the items instead of /// accumulating them with [`add`](Self::add). The two approaches must not be mixed: do /// not call [`add`](Self::add) before this method. /// /// The items are encoded and hashed into the Merkle structure, and the `Arc` is stored /// directly in the resulting [`MerkleizedBatch`] without copying. /// /// # Panics /// /// Panics if items were previously added via [`add`](Self::add). pub(crate) fn merkleize_with( mut self, base: &Mem, items: Arc>, ) -> MerkleizedBatchArc { assert!( self.items.is_empty(), "merkleize_with expects no items added via add" ); let starting_leaves = self.inner.leaves(); let digests: Vec = self.inner.strategy().map_init_collect_vec( items.iter().enumerate(), || self.hasher.clone(), |h, (i, item)| { let loc = Location::::new(*starting_leaves + i as u64); let pos = Position::try_from(loc).expect("valid leaf location"); h.leaf_digest(pos, &item.encode()) }, ); for digest in digests { self.inner = self.inner.add_leaf_digest(digest); } let merkle = self.inner.merkleize(base, &self.hasher); let ancestor_items = Self::collect_ancestor_items(&self.parent); Arc::new(MerkleizedBatch { inner: merkle, bagging: self.hasher.root_bagging(), items, parent: self.parent.as_ref().map(Arc::downgrade), ancestor_items, }) } } /// A speculative batch whose root digest has been computed, in contrast to [`UnmerkleizedBatch`]. #[derive(Clone, Debug)] pub struct MerkleizedBatch { /// The inner batch of Merkle leaf digests. pub(crate) inner: Arc>, /// The peak bagging policy inherited from the parent journal or batch. bagging: Bagging, /// The items to append from this batch. items: Arc>, /// This batch's parent, or None if the parent is the journal itself. parent: Option>, /// Ancestor item batches collected at merkleize time (root-to-tip order). pub(crate) ancestor_items: Vec>>, } impl MerkleizedBatch { /// The number of items visible through this batch, including ancestors. pub(crate) fn size(&self) -> u64 { *self.inner.leaves() } /// Compute the root digest after this batch is applied using `inactive_peaks` and the bagging /// carried by `hasher`. /// /// This recomputes the root rather than reading a cache. pub fn root( &self, base: &Mem, hasher: &impl merkle::hasher::Hasher, inactive_peaks: usize, ) -> Result> { self.inner.root(base, hasher, inactive_peaks) } /// Inclusion proof for the element at `loc`. pub fn proof( &self, hasher: &impl merkle::hasher::Hasher, loc: Location, inactive_peaks: usize, ) -> Result, merkle::Error> { self.inner.proof(hasher, loc, inactive_peaks) } /// Inclusion proof for all elements in `range`. pub fn range_proof( &self, hasher: &impl merkle::hasher::Hasher, range: core::ops::Range>, inactive_peaks: usize, ) -> Result, merkle::Error> { self.inner.range_proof(hasher, range, inactive_peaks) } /// The items added in this batch. pub(crate) const fn items(&self) -> &Arc> { &self.items } /// Create a new speculative batch of operations with this batch as its parent. /// /// The batch becomes invalid if any ancestor is dropped before being applied, or a sibling /// fork has been applied. pub fn new_batch>(self: &Arc) -> UnmerkleizedBatch where Item: Encode, { UnmerkleizedBatch { inner: self.inner.new_batch(), hasher: StandardHasher::new(self.bagging), items: Vec::new(), parent: Some(Arc::clone(self)), } } } impl Readable for MerkleizedBatch { type Family = F; type Digest = D; type Error = merkle::Error; fn size(&self) -> Position { self.inner.size() } fn get_node(&self, pos: Position) -> Option { self.inner.get_node(pos) } fn pruning_boundary(&self) -> Location { self.inner.pruning_boundary() } } /// An append-only data structure that maintains a sequential journal of items alongside a /// Merkle-family structure. The item at index i in the journal corresponds to the leaf at Location /// i in the Merkle structure. This structure enables efficient proofs that an item is included in /// the journal at a specific location. pub struct Journal where F: Family, E: Context, C: Contiguous, H: Hasher, S: Strategy, { /// Merkle structure where each leaf is an item digest. /// Invariant: leaf i corresponds to item i in the journal. pub(crate) merkle: Merkle, /// Journal of items. /// Invariant: item i corresponds to leaf i in the Merkle structure. pub(crate) journal: C, pub(crate) hasher: StandardHasher, } impl Journal where F: Family, E: Context, C: Contiguous, H: Hasher, S: Strategy, { /// Returns the Location of the next item appended to the journal. pub async fn size(&self) -> Location { Location::new(self.journal.size().await) } /// Compute the root of the Merkle structure using `inactive_peaks` and the bagging carried by /// the journal's hasher. pub fn root(&self, inactive_peaks: usize) -> Result> { self.merkle .root(&self.hasher, inactive_peaks) .map_err(Into::into) } /// Convert authenticated-journal errors to the contiguous journal trait error type. fn map_error(error: Error) -> JournalError { match error { Error::Journal(inner) => inner, Error::Merkle(inner) => JournalError::Merkle(anyhow::Error::from(inner)), } } /// Return a reference to the merkleization strategy. pub const fn strategy(&self) -> &S { self.merkle.strategy() } /// Create a speculative batch atop this journal. pub fn new_batch(&self) -> UnmerkleizedBatch where C::Item: Encode, { let root = self.merkle.to_batch(); UnmerkleizedBatch { inner: root.new_batch(), hasher: StandardHasher::new(self.hasher.root_bagging()), items: Vec::new(), parent: None, } } /// Borrow the committed Mem through the read lock. pub(crate) fn with_mem(&self, f: impl FnOnce(&Mem) -> R) -> R { self.merkle.with_mem(f) } /// Create an owned [`MerkleizedBatch`] representing the current committed state. /// /// The batch has no items (the committed items are on disk, not in memory). /// This is the starting point for building owned batch chains. pub(crate) fn to_merkleized_batch(&self) -> Arc> { Arc::new(MerkleizedBatch { inner: self.merkle.to_batch(), bagging: self.hasher.root_bagging(), items: Arc::new(Vec::new()), parent: None, ancestor_items: Vec::new(), }) } } impl Journal where F: Family, E: Context, C: Contiguous + Persistable, H: Hasher, S: Strategy, { /// Durably persist the journal. This is faster than `sync()` but does not persist the Merkle /// structure, meaning recovery will be required on startup if we crash before `sync()`. pub async fn commit(&self) -> Result<(), Error> { self.journal.commit().await.map_err(Error::Journal) } } impl Journal where F: Family, E: Context, C: Mutable, H: Hasher, S: Strategy, { /// Create a new [Journal] from the given components after aligning the Merkle structure with /// the journal. pub async fn from_components( mut merkle: Merkle, journal: C, hasher: StandardHasher, apply_batch_size: u64, ) -> Result> { Self::align(&mut merkle, &journal, &hasher, apply_batch_size).await?; // Sync the Merkle structure to disk to avoid having to repeat any recovery that may have // been performed on next startup. merkle.sync().await?; Ok(Self { merkle, journal, hasher, }) } /// Align the Merkle structure to be consistent with the journal. Any items in the structure /// that are not in the journal are popped, and any items in the journal that are not in the /// structure are added. Items are added in batches of size `apply_batch_size` to avoid memory /// bloat. async fn align( merkle: &mut Merkle, journal: &C, hasher: &StandardHasher, apply_batch_size: u64, ) -> Result<(), Error> { // Rewind Merkle structure elements that are ahead of the journal. let journal_size = journal.size().await; let mut merkle_leaves = merkle.leaves(); if merkle_leaves > journal_size { let rewind_count = merkle_leaves - journal_size; warn!( journal_size, ?rewind_count, "rewinding Merkle structure to match journal" ); merkle.rewind(*rewind_count as usize).await?; merkle_leaves = Location::new(journal_size); } // If the Merkle structure is behind, replay journal items to catch up. if merkle_leaves < journal_size { let replay_count = journal_size - *merkle_leaves; warn!( ?journal_size, replay_count, "Merkle structure lags behind journal, replaying journal to catch up" ); let reader = journal.reader().await; while merkle_leaves < journal_size { let batch = { let mut batch = merkle.new_batch(); let mut count = 0u64; while count < apply_batch_size && merkle_leaves < journal_size { let op = reader.read(*merkle_leaves).await?; batch = batch.add(hasher, &op.encode()); merkle_leaves += 1; count += 1; } batch }; let batch = merkle.with_mem(|mem| batch.merkleize(mem, hasher)); merkle.apply_batch(&batch)?; } return Ok(()); } // At this point the Merkle structure and journal should be consistent. assert_eq!(journal.size().await, *merkle.leaves()); Ok(()) } /// Append an item to the journal and update the Merkle structure. pub async fn append(&mut self, item: &C::Item) -> Result, Error> { let encoded_item = item.encode(); // Append item to the journal, then update the Merkle structure state. let loc = self.journal.append(item).await?; let unmerkleized_batch = self.merkle.new_batch().add(&self.hasher, &encoded_item); let batch = self .merkle .with_mem(|mem| unmerkleized_batch.merkleize(mem, &self.hasher)); self.merkle.apply_batch(&batch)?; Ok(Location::new(loc)) } /// Apply a batch to the journal. /// /// A batch is valid if the journal has not been modified since the batch /// chain was created, or if only ancestors of this batch have been applied. /// Already-committed ancestors are skipped automatically. /// Applying a batch from a different fork returns an error. pub async fn apply_batch( &mut self, batch: &MerkleizedBatch, ) -> Result<(), Error> { let merkle_size = self.merkle.size(); let base_size = batch.inner.base_size(); // Determine whether ancestors have already been committed. // `base_size` is the merkle size when the batch chain was forked. // If the merkle has advanced past the fork point, ancestors are // already on disk; check that the current size is reachable from // the batch chain before skipping them. let skip_ancestors = if merkle_size == base_size { false } else if merkle_size > base_size && merkle_size < batch.inner.size() { true } else { // Merkle is at an incompatible position (a sibling or unrelated // fork was committed). Eagerly reject to avoid mutating the journal. return Err(merkle::Error::StaleBatch { expected: base_size, actual: merkle_size, } .into()); }; // Apply ancestor item batches in root-to-tip order. Already-committed // batches are skipped by tracking cumulative leaf count. // Batches are collected into a single append_many call to acquire the // journal's write lock once instead of per-batch. let committed_leaves = self.journal.size().await; let base_leaves = *Location::::try_from(base_size)?; let mut batch_leaf_end = base_leaves; let mut batches: Vec<&[C::Item]> = Vec::with_capacity(batch.ancestor_items.len() + 1); for ancestor in &batch.ancestor_items { batch_leaf_end += ancestor.len() as u64; if skip_ancestors && batch_leaf_end <= committed_leaves { continue; } batches.push(ancestor); } if !batch.items.is_empty() { batches.push(&batch.items); } if !batches.is_empty() { self.journal.append_many(Many::Nested(&batches)).await?; } self.merkle.apply_batch(&batch.inner)?; assert_eq!(*self.merkle.leaves(), self.journal.size().await); Ok(()) } /// Rewind the journal and Merkle structure. pub async fn rewind(&mut self, size: u64) -> Result<(), Error> { self.journal.rewind(size).await?; let leaves = *self.merkle.leaves(); if leaves > size { self.merkle.rewind((leaves - size) as usize).await?; } Ok(()) } /// Prune both the Merkle structure and journal to the given location. /// /// # Returns /// The new pruning boundary, which may be less than the requested `prune_loc`. pub async fn prune(&mut self, prune_loc: Location) -> Result, Error> { self.prune_inner(prune_loc) .await .map(|(boundary, _)| boundary) } async fn prune_inner( &mut self, prune_loc: Location, ) -> Result<(Location, bool), Error> { if self.merkle.size() == 0 { // DB is empty, nothing to prune. return Ok((Location::new(self.reader().await.bounds().start), false)); } // Sync the Merkle structure before pruning the journal, otherwise its last element could // end up behind the journal's first element after a crash, and there would be no way to // replay the items between the structure's last element and the journal's first element. self.merkle.sync().await?; let journal_pruned = self.journal.prune(*prune_loc).await?; let bounds = self.reader().await.bounds(); let boundary = Location::new(bounds.start); let merkle_boundary = self.merkle.bounds().start; if boundary > merkle_boundary { debug!(size = ?bounds.end, ?prune_loc, boundary = ?bounds.start, "pruned inactive ops"); self.merkle.prune(boundary).await?; } Ok((boundary, journal_pruned || boundary > merkle_boundary)) } } impl Journal where F: Family, E: Context, C: Contiguous, H: Hasher, S: Strategy, { /// Generate a proof of inclusion for items starting at `start_loc`. /// /// Returns a proof and the items corresponding to the leaves in the range `start_loc..end_loc`, /// where `end_loc` is the minimum of the current item count and `start_loc + max_ops`. /// /// # Errors /// /// - Returns [Error::Merkle] with [merkle::Error::LocationOverflow] if `start_loc` > /// [Family::MAX_LEAVES]. /// - Returns [Error::Merkle] with [merkle::Error::RangeOutOfBounds] if `start_loc` >= current /// item count. /// - Returns [Error::Journal] with [crate::journal::Error::ItemPruned] if `start_loc` has been /// pruned. pub async fn proof( &self, start_loc: Location, max_ops: NonZeroU64, inactive_peaks: usize, ) -> Result<(Proof, Vec), Error> { self.historical_proof(self.size().await, start_loc, max_ops, inactive_peaks) .await } /// Generate a historical proof with respect to the state of the Merkle structure when it had /// `historical_leaves` leaves. /// /// Returns a proof and the items corresponding to the leaves in the range `start_loc..end_loc`, /// where `end_loc` is the minimum of `historical_leaves` and `start_loc + max_ops`. /// /// # Errors /// /// - Returns [Error::Merkle] with [merkle::Error::RangeOutOfBounds] if `start_loc` >= /// `historical_leaves` or `historical_leaves` > number of items in the journal. /// - Returns [Error::Journal] with [crate::journal::Error::ItemPruned] if `start_loc` has been /// pruned. pub async fn historical_proof( &self, historical_leaves: Location, start_loc: Location, max_ops: NonZeroU64, inactive_peaks: usize, ) -> Result<(Proof, Vec), Error> { let reader = self.journal.reader().await; let bounds = reader.bounds(); if *historical_leaves > bounds.end { return Err(merkle::Error::RangeOutOfBounds(Location::new(bounds.end)).into()); } if start_loc >= historical_leaves { return Err(merkle::Error::RangeOutOfBounds(start_loc).into()); } let end_loc = std::cmp::min(historical_leaves, start_loc.saturating_add(max_ops.get())); let hasher = self.hasher.clone(); let proof = self .merkle .historical_range_proof( &hasher, historical_leaves, start_loc..end_loc, inactive_peaks, ) .await?; let positions: Vec = (*start_loc..*end_loc).collect(); let ops = reader.read_many(&positions).await?; Ok((proof, ops)) } } impl Journal where F: Family, E: Context, C: Contiguous + Persistable, H: Hasher, S: Strategy, { /// Destroy the authenticated journal, removing all data from disk. pub async fn destroy(self) -> Result<(), Error> { try_join!( self.journal.destroy().map_err(Error::Journal), self.merkle.destroy().map_err(Error::Merkle), )?; Ok(()) } /// Durably persist the journal, ensuring no recovery is required on startup. pub async fn sync(&self) -> Result<(), Error> { try_join!( self.journal.sync().map_err(Error::Journal), self.merkle.sync().map_err(Error::Merkle) )?; Ok(()) } } /// The number of items to apply to the Merkle structure in a single batch. const APPLY_BATCH_SIZE: u64 = 1 << 16; /// Generate a `new()` constructor for an authenticated journal backed by a specific contiguous /// journal type. macro_rules! impl_journal_new { ($journal_mod:ident, $cfg_ty:ty, $codec_bound:path) => { impl Journal, H, S> where F: Family, E: Context, O: $codec_bound, H: Hasher, S: Strategy, { /// Create a new authenticated [Journal]. /// /// The inner journal will be rewound to the last item matching `rewind_predicate`, /// and the merkle structure will be aligned to match. pub async fn new( context: E, merkle_cfg: merkle::full::Config, journal_cfg: $cfg_ty, rewind_predicate: fn(&O) -> bool, bagging: merkle::Bagging, ) -> Result> { let mut journal = $journal_mod::Journal::init(context.child("journal"), journal_cfg).await?; journal.rewind_to(rewind_predicate).await?; let hasher = StandardHasher::::new(bagging); let mut merkle = Merkle::init(context.child("merkle"), &hasher, merkle_cfg).await?; Self::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE).await?; journal.sync().await?; merkle.sync().await?; Ok(Self { merkle, journal, hasher, }) } } }; } impl_journal_new!(fixed, fixed::Config, CodecFixedShared); impl_journal_new!(variable, variable::Config, CodecShared); impl Contiguous for Journal where F: Family, E: Context, C: Contiguous, H: Hasher, S: Strategy, { type Item = C::Item; async fn reader(&self) -> impl Reader + '_ { self.journal.reader().await } async fn size(&self) -> u64 { self.journal.size().await } } impl Mutable for Journal where F: Family, E: Context, C: Mutable, H: Hasher, S: Strategy, { async fn append(&mut self, item: &Self::Item) -> Result { let res = self.append(item).await.map_err(Self::map_error)?; Ok(*res) } async fn prune(&mut self, min_position: u64) -> Result { let prune_to = { let reader = self.journal.reader().await; let bounds = reader.bounds(); min_position.min(bounds.end) }; let (_, pruned) = self .prune_inner(Location::new(prune_to)) .await .map_err(Self::map_error)?; Ok(pruned) } async fn rewind(&mut self, size: u64) -> Result<(), JournalError> { self.rewind(size).await.map_err(Self::map_error) } } /// A [Mutable] journal that can serve as the inner journal of an authenticated [Journal]. pub trait Inner: Mutable + Persistable { /// The configuration needed to initialize this journal. type Config: Clone + Send; /// Initialize an authenticated [Journal] backed by this journal type. fn init( context: E, merkle_cfg: merkle::full::Config, journal_cfg: Self::Config, rewind_predicate: fn(&Self::Item) -> bool, bagging: merkle::Bagging, ) -> impl core::future::Future, Error>> + Send where Self: Sized, Self::Item: EncodeShared; } impl Persistable for Journal where F: Family, E: Context, C: Contiguous + Persistable, H: Hasher, S: Strategy, { type Error = JournalError; async fn commit(&self) -> Result<(), JournalError> { self.commit().await.map_err(Self::map_error) } async fn sync(&self) -> Result<(), JournalError> { self.sync().await.map_err(Self::map_error) } async fn destroy(self) -> Result<(), JournalError> { self.destroy().await.map_err(Self::map_error) } } #[cfg(test)] impl Journal where F: Family, E: Context, C: Contiguous, S: Strategy, H: Hasher, { /// Test helper: Read the item at the given location. pub(crate) async fn read(&self, loc: Location) -> Result> { self.journal .reader() .await .read(*loc) .await .map_err(Error::Journal) } } #[cfg(test)] mod tests { use super::*; use crate::{ journal::contiguous::fixed::{Config as JConfig, Journal as ContiguousJournal}, merkle::{ full::{Config as MerkleConfig, Merkle}, mmb, mmr, Bagging::{BackwardFold, ForwardFold}, }, qmdb::{ any::{ operation::{update::Unordered as Update, Unordered as Op}, value::FixedEncoding, }, operation::Committable, }, }; use commonware_codec::Encode; use commonware_cryptography::{sha256::Digest, Sha256}; use commonware_macros::test_traced; use commonware_parallel::Sequential; use commonware_runtime::{ buffer::paged::CacheRef, deterministic::{self, Context}, BufferPooler, Runner as _, Supervisor as _, }; use commonware_utils::{NZUsize, NZU16, NZU64}; use futures::StreamExt as _; use std::num::{NonZeroU16, NonZeroUsize}; const PAGE_SIZE: NonZeroU16 = NZU16!(101); const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11); /// Generic operation type for testing, parameterized by Merkle family. type TestOp = Op>; /// Generic authenticated journal type for testing, parameterized by Merkle family. type TestJournal = Journal< F, deterministic::Context, ContiguousJournal>, Sha256, Sequential, >; fn journal_root(journal: &TestJournal) -> Digest { journal.root(0).unwrap() } fn batch_root( journal: &TestJournal, batch: &MerkleizedBatch, Sequential>, ) -> Digest { journal .merkle .with_mem(|mem| batch.root(mem, &journal.hasher, 0)) .unwrap() } /// Create Merkle configuration for tests. fn merkle_config(suffix: &str, pooler: &impl BufferPooler) -> MerkleConfig { MerkleConfig { journal_partition: format!("mmr-journal-{suffix}"), metadata_partition: format!("mmr-metadata-{suffix}"), items_per_blob: NZU64!(11), write_buffer: NZUsize!(1024), strategy: Sequential, page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE), } } /// Create journal configuration for tests. fn journal_config(suffix: &str, pooler: &impl BufferPooler) -> JConfig { JConfig { partition: format!("journal-{suffix}"), items_per_blob: NZU64!(7), write_buffer: NZUsize!(1024), page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE), } } /// Create a new empty authenticated journal. async fn create_empty_journal( context: Context, suffix: &str, ) -> TestJournal { let merkle_cfg = merkle_config(suffix, &context); let journal_cfg = journal_config(suffix, &context); TestJournal::::new( context, merkle_cfg, journal_cfg, |op: &TestOp| op.is_commit(), ForwardFold, ) .await .unwrap() } #[test] fn test_batches_inherit_journal_bagging() { deterministic::Runner::default().start(|context| async move { let merkle_cfg = merkle_config("batch-bagging", &context); let journal_cfg = journal_config("batch-bagging", &context); let journal = TestJournal::::new( context, merkle_cfg, journal_cfg, |op: &TestOp| op.is_commit(), BackwardFold, ) .await .unwrap(); let batch = journal.new_batch(); assert_eq!(batch.hasher.root_bagging(), BackwardFold); let merkleized = journal.merkle.with_mem(|mem| batch.merkleize(mem)); let child: UnmerkleizedBatch, Sequential> = merkleized.new_batch(); assert_eq!(child.hasher.root_bagging(), BackwardFold); }); } /// Create a test operation with predictable values based on index. fn create_operation(index: u8) -> TestOp { TestOp::::Update(Update( Sha256::fill(index), Sha256::fill(index.wrapping_add(1)), )) } /// Create an authenticated journal with N committed operations. /// /// Operations are added and then synced to ensure they are committed. async fn create_journal_with_ops( context: Context, suffix: &str, count: usize, ) -> TestJournal { let mut journal = create_empty_journal::(context, suffix).await; for i in 0..count { let op = create_operation::(i as u8); let loc = journal.append(&op).await.unwrap(); assert_eq!(loc, Location::::new(i as u64)); } journal.sync().await.unwrap(); journal } /// Create separate Merkle and journal components for testing alignment. /// /// These components are created independently and can be manipulated separately to test /// scenarios where the Merkle structure and journal are out of sync (e.g., one ahead of the /// other). async fn create_components( context: Context, suffix: &str, ) -> ( Merkle, ContiguousJournal>, StandardHasher, ) { let hasher = StandardHasher::new(ForwardFold); let merkle = Merkle::::init( context.child("mmr"), &hasher, merkle_config(suffix, &context), ) .await .unwrap(); let journal = ContiguousJournal::init(context.child("journal"), journal_config(suffix, &context)) .await .unwrap(); (merkle, journal, hasher) } /// Verify that a proof correctly proves the given operations are included in the Merkle /// structure. fn verify_proof( proof: &Proof::Digest>, operations: &[TestOp], start_loc: Location, root: &::Digest, hasher: &StandardHasher, ) -> bool { let encoded_ops: Vec<_> = operations.iter().map(|op| op.encode()).collect(); proof.verify_range_inclusion(hasher, &encoded_ops, start_loc, root) } /// Verify that new() creates an empty authenticated journal. async fn test_new_creates_empty_journal_inner(context: Context) { let journal = create_empty_journal::(context, "new-empty").await; let bounds = journal.reader().await.bounds(); assert_eq!(bounds.end, 0); assert_eq!(bounds.start, 0); assert!(bounds.is_empty()); } #[test_traced("INFO")] fn test_new_creates_empty_journal_mmr() { let executor = deterministic::Runner::default(); executor.start(test_new_creates_empty_journal_inner::); } #[test_traced("INFO")] fn test_new_creates_empty_journal_mmb() { let executor = deterministic::Runner::default(); executor.start(test_new_creates_empty_journal_inner::); } /// Verify that align() correctly handles empty Merkle and journal components. async fn test_align_with_empty_mmr_and_journal_inner(context: Context) { let (mut merkle, journal, hasher) = create_components::(context, "align-empty").await; TestJournal::::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE) .await .unwrap(); assert_eq!(merkle.leaves(), Location::::new(0)); assert_eq!(journal.size().await, 0); } #[test_traced("INFO")] fn test_align_with_empty_mmr_and_journal_mmr() { let executor = deterministic::Runner::default(); executor.start(test_align_with_empty_mmr_and_journal_inner::); } #[test_traced("INFO")] fn test_align_with_empty_mmr_and_journal_mmb() { let executor = deterministic::Runner::default(); executor.start(test_align_with_empty_mmr_and_journal_inner::); } /// Verify that align() pops Merkle elements when Merkle is ahead of the journal. async fn test_align_when_mmr_ahead_inner(context: Context) { let (mut merkle, journal, hasher) = create_components::(context, "mmr-ahead").await; // Add 20 operations to both Merkle and journal { let batch = { let mut batch = merkle.new_batch(); for i in 0..20 { let op = create_operation::(i as u8); let encoded = op.encode(); batch = batch.add(&hasher, &encoded); journal.append(&op).await.unwrap(); } batch }; let batch = merkle.with_mem(|mem| batch.merkleize(mem, &hasher)); merkle.apply_batch(&batch).unwrap(); } // Add commit operation to journal only (making journal ahead) let commit_op = TestOp::::CommitFloor(None, Location::::new(0)); journal.append(&commit_op).await.unwrap(); journal.sync().await.unwrap(); // Merkle has 20 leaves, journal has 21 operations (20 ops + 1 commit) TestJournal::::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE) .await .unwrap(); // Merkle should have been aligned to match journal assert_eq!(merkle.leaves(), Location::::new(21)); assert_eq!(journal.size().await, 21); } #[test_traced("WARN")] fn test_align_when_mmr_ahead_mmr() { let executor = deterministic::Runner::default(); executor.start(test_align_when_mmr_ahead_inner::); } #[test_traced("WARN")] fn test_align_when_mmr_ahead_mmb() { let executor = deterministic::Runner::default(); executor.start(test_align_when_mmr_ahead_inner::); } /// Verify that align() replays journal operations when journal is ahead of Merkle. async fn test_align_when_journal_ahead_inner(context: Context) { let (mut merkle, journal, hasher) = create_components::(context, "journal-ahead").await; // Add 20 operations to journal only for i in 0..20 { let op = create_operation::(i as u8); journal.append(&op).await.unwrap(); } // Add commit let commit_op = TestOp::::CommitFloor(None, Location::::new(0)); journal.append(&commit_op).await.unwrap(); journal.sync().await.unwrap(); // Journal has 21 operations, Merkle has 0 leaves TestJournal::::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE) .await .unwrap(); // Merkle should have been replayed to match journal assert_eq!(merkle.leaves(), Location::::new(21)); assert_eq!(journal.size().await, 21); } #[test_traced("WARN")] fn test_align_when_journal_ahead_mmr() { let executor = deterministic::Runner::default(); executor.start(test_align_when_journal_ahead_inner::); } #[test_traced("WARN")] fn test_align_when_journal_ahead_mmb() { let executor = deterministic::Runner::default(); executor.start(test_align_when_journal_ahead_inner::); } /// Verify that align() discards uncommitted operations. async fn test_align_with_mismatched_committed_ops_inner( context: Context, ) { let mut journal = create_empty_journal::(context.child("first"), "mismatched").await; // Add 20 uncommitted operations for i in 0..20 { let loc = journal .append(&create_operation::(i as u8)) .await .unwrap(); assert_eq!(loc, Location::::new(i as u64)); } // Don't sync - these are uncommitted // After alignment, they should be discarded let size_before = journal.size().await; assert_eq!(size_before, 20); // Drop and recreate to simulate restart (which calls align internally) journal.sync().await.unwrap(); drop(journal); let journal = create_empty_journal::(context.child("second"), "mismatched").await; // Uncommitted operations should be gone assert_eq!(journal.size().await, 0); } #[test_traced("INFO")] fn test_align_with_mismatched_committed_ops_mmr() { let executor = deterministic::Runner::default(); executor.start(|context| { test_align_with_mismatched_committed_ops_inner::(context) }); } #[test_traced("INFO")] fn test_align_with_mismatched_committed_ops_mmb() { let executor = deterministic::Runner::default(); executor.start(|context| { test_align_with_mismatched_committed_ops_inner::(context) }); } async fn test_rewind_inner(context: Context) { // Test 1: Matching operation is kept { let mut journal = ContiguousJournal::init( context.child("rewind_match"), journal_config("rewind-match", &context), ) .await .unwrap(); // Add operations where operation 3 is a commit for i in 0..3 { journal.append(&create_operation::(i)).await.unwrap(); } journal .append(&TestOp::::CommitFloor(None, Location::::new(0))) .await .unwrap(); for i in 4..7 { journal.append(&create_operation::(i)).await.unwrap(); } // Rewind to last commit let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap(); assert_eq!(final_size, 4); assert_eq!(journal.size().await, 4); // Verify the commit operation is still there let op = journal.read(3).await.unwrap(); assert!(op.is_commit()); } // Test 2: Last matching operation is chosen when multiple match { let mut journal = ContiguousJournal::init( context.child("rewind_multiple"), journal_config("rewind-multiple", &context), ) .await .unwrap(); // Add multiple commits journal.append(&create_operation::(0)).await.unwrap(); journal .append(&TestOp::::CommitFloor(None, Location::::new(0))) .await .unwrap(); // pos 1 journal.append(&create_operation::(2)).await.unwrap(); journal .append(&TestOp::::CommitFloor(None, Location::::new(1))) .await .unwrap(); // pos 3 journal.append(&create_operation::(4)).await.unwrap(); // Should rewind to last commit (pos 3) let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap(); assert_eq!(final_size, 4); // Verify the last commit is still there let op = journal.read(3).await.unwrap(); assert!(op.is_commit()); // Verify we can't read pos 4 assert!(journal.read(4).await.is_err()); } // Test 3: Rewind to pruning boundary when no match { let mut journal = ContiguousJournal::init( context.child("rewind_no_match"), journal_config("rewind-no-match", &context), ) .await .unwrap(); // Add operations with no commits for i in 0..10 { journal.append(&create_operation::(i)).await.unwrap(); } // Rewind should go to pruning boundary (0 for unpruned) let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap(); assert_eq!(final_size, 0, "Should rewind to pruning boundary (0)"); assert_eq!(journal.size().await, 0); } // Test 4: Rewind with existing pruning boundary { let mut journal = ContiguousJournal::init( context.child("rewind_with_pruning"), journal_config("rewind-with-pruning", &context), ) .await .unwrap(); // Add operations and a commit at position 10 (past first section boundary of 7) for i in 0..10 { journal.append(&create_operation::(i)).await.unwrap(); } journal .append(&TestOp::::CommitFloor(None, Location::::new(0))) .await .unwrap(); // pos 10 for i in 11..15 { journal.append(&create_operation::(i)).await.unwrap(); } journal.sync().await.unwrap(); // Prune up to position 8 (this will prune section 0, items 0-6, keeping 7+) journal.prune(8).await.unwrap(); assert_eq!(journal.reader().await.bounds().start, 7); // Add more uncommitted operations for i in 15..20 { journal.append(&create_operation::(i)).await.unwrap(); } // Rewind should keep the commit at position 10 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap(); assert_eq!(final_size, 11); // Verify commit is still there let op = journal.read(10).await.unwrap(); assert!(op.is_commit()); } // Test 5: Rewind with no matches after pruning boundary { let mut journal = ContiguousJournal::init( context.child("rewind_no_match_pruned"), journal_config("rewind-no-match-pruned", &context), ) .await .unwrap(); // Add operations with a commit at position 5 (in section 0: 0-6) for i in 0..5 { journal.append(&create_operation::(i)).await.unwrap(); } journal .append(&TestOp::::CommitFloor(None, Location::::new(0))) .await .unwrap(); // pos 5 for i in 6..10 { journal.append(&create_operation::(i)).await.unwrap(); } journal.sync().await.unwrap(); // Prune up to position 8 (this prunes section 0, including the commit at pos 5) // Pruning boundary will be at position 7 (start of section 1) journal.prune(8).await.unwrap(); assert_eq!(journal.reader().await.bounds().start, 7); // Add uncommitted operations with no commits (in section 1: 7-13) for i in 10..14 { journal.append(&create_operation::(i)).await.unwrap(); } // Rewind with no matching commits after the pruning boundary // Should rewind to the pruning boundary at position 7 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap(); assert_eq!(final_size, 7); } // Test 6: Empty journal { let mut journal = ContiguousJournal::init( context.child("rewind_empty"), journal_config("rewind-empty", &context), ) .await .unwrap(); // Rewind empty journal should be no-op let final_size = journal .rewind_to(|op: &TestOp| op.is_commit()) .await .unwrap(); assert_eq!(final_size, 0); assert_eq!(journal.size().await, 0); } // Test 7: Position based authenticated journal rewind. { let merkle_cfg = merkle_config("rewind", &context); let journal_cfg = journal_config("rewind", &context); let mut journal = TestJournal::::new( context, merkle_cfg, journal_cfg, |op| op.is_commit(), ForwardFold, ) .await .unwrap(); // Add operations with a commit at position 5 (in section 0: 0-6) for i in 0..5 { journal.append(&create_operation::(i)).await.unwrap(); } journal .append(&TestOp::::CommitFloor(None, Location::::new(0))) .await .unwrap(); // pos 5 for i in 6..10 { journal.append(&create_operation::(i)).await.unwrap(); } assert_eq!(journal.size().await, 10); journal.rewind(2).await.unwrap(); assert_eq!(journal.size().await, 2); assert_eq!(journal.merkle.leaves(), 2); assert_eq!(journal.merkle.size(), 3); let bounds = journal.reader().await.bounds(); assert_eq!(bounds.start, 0); assert!(!bounds.is_empty()); assert!(matches!( journal.rewind(3).await, Err(Error::Journal(JournalError::InvalidRewind(_))) )); journal.rewind(0).await.unwrap(); assert_eq!(journal.size().await, 0); assert_eq!(journal.merkle.leaves(), 0); assert_eq!(journal.merkle.size(), 0); let bounds = journal.reader().await.bounds(); assert_eq!(bounds.start, 0); assert!(bounds.is_empty()); // Test rewinding after pruning. for i in 0..255 { journal.append(&create_operation::(i)).await.unwrap(); } journal.prune(Location::::new(100)).await.unwrap(); assert_eq!(journal.reader().await.bounds().start, 98); let res = journal.rewind(97).await; assert!(matches!( res, Err(Error::Journal(JournalError::InvalidRewind(97))) )); journal.rewind(98).await.unwrap(); let bounds = journal.reader().await.bounds(); assert_eq!(bounds.end, 98); assert_eq!(journal.merkle.leaves(), 98); assert_eq!(bounds.start, 98); assert!(bounds.is_empty()); } } #[test_traced("INFO")] fn test_rewind_mmr() { let executor = deterministic::Runner::default(); executor.start(test_rewind_inner::); } #[test_traced("INFO")] fn test_rewind_mmb() { let executor = deterministic::Runner::default(); executor.start(test_rewind_inner::); } /// Verify that append() increments the operation count, returns correct locations, and /// operations can be read back correctly. async fn test_apply_op_and_read_operations_inner(context: Context) { let mut journal = create_empty_journal::(context, "apply_op").await; assert_eq!(journal.size().await, 0); // Add 50 operations let expected_ops: Vec<_> = (0..50).map(|i| create_operation::(i as u8)).collect(); for (i, op) in expected_ops.iter().enumerate() { let loc = journal.append(op).await.unwrap(); assert_eq!(loc, Location::::new(i as u64)); assert_eq!(journal.size().await, (i + 1) as u64); } assert_eq!(journal.size().await, 50); // Verify all operations can be read back correctly journal.sync().await.unwrap(); for (i, expected_op) in expected_ops.iter().enumerate() { let read_op = journal.read(Location::::new(i as u64)).await.unwrap(); assert_eq!(read_op, *expected_op); } } #[test_traced("INFO")] fn test_apply_op_and_read_operations_mmr() { let executor = deterministic::Runner::default(); executor.start(test_apply_op_and_read_operations_inner::); } #[test_traced("INFO")] fn test_apply_op_and_read_operations_mmb() { let executor = deterministic::Runner::default(); executor.start(test_apply_op_and_read_operations_inner::); } /// Verify that read() returns correct operations at various positions. async fn test_read_operations_at_various_positions_inner( context: Context, ) { let journal = create_journal_with_ops::(context, "read", 50).await; // Verify reading first operation let first_op = journal.read(Location::::new(0)).await.unwrap(); assert_eq!(first_op, create_operation::(0)); // Verify reading middle operation let middle_op = journal.read(Location::::new(25)).await.unwrap(); assert_eq!(middle_op, create_operation::(25)); // Verify reading last operation let last_op = journal.read(Location::::new(49)).await.unwrap(); assert_eq!(last_op, create_operation::(49)); // Verify all operations match expected values for i in 0..50 { let op = journal.read(Location::::new(i)).await.unwrap(); assert_eq!(op, create_operation::(i as u8)); } } #[test_traced("INFO")] fn test_read_operations_at_various_positions_mmr() { let executor = deterministic::Runner::default(); executor.start(|context| { test_read_operations_at_various_positions_inner::(context) }); } #[test_traced("INFO")] fn test_read_operations_at_various_positions_mmb() { let executor = deterministic::Runner::default(); executor.start(|context| { test_read_operations_at_various_positions_inner::(context) }); } /// Verify that read() returns an error for pruned operations. async fn test_read_pruned_operation_returns_error_inner( context: Context, ) { let mut journal = create_journal_with_ops::(context, "read_pruned", 100).await; // Add commit and prune journal .append(&TestOp::::CommitFloor(None, Location::::new(50))) .await .unwrap(); journal.sync().await.unwrap(); let pruned_boundary = journal.prune(Location::::new(50)).await.unwrap(); // Try to read an operation before the pruned boundary let read_loc = Location::::new(0); if read_loc < pruned_boundary { let result = journal.read(read_loc).await; assert!(matches!( result, Err(Error::Journal(crate::journal::Error::ItemPruned(_))) )); } } #[test_traced("INFO")] fn test_read_pruned_operation_returns_error_mmr() { let executor = deterministic::Runner::default(); executor.start(|context| { test_read_pruned_operation_returns_error_inner::(context) }); } #[test_traced("INFO")] fn test_read_pruned_operation_returns_error_mmb() { let executor = deterministic::Runner::default(); executor.start(|context| { test_read_pruned_operation_returns_error_inner::(context) }); } /// Verify that read() returns an error for out-of-range locations. async fn test_read_out_of_range_returns_error_inner(context: Context) { let journal = create_journal_with_ops::(context, "read_oob", 3).await; // Try to read beyond the end let result = journal.read(Location::::new(10)).await; assert!(matches!( result, Err(Error::Journal(crate::journal::Error::ItemOutOfRange(_))) )); } #[test_traced("INFO")] fn test_read_out_of_range_returns_error_mmr() { let executor = deterministic::Runner::default(); executor.start(test_read_out_of_range_returns_error_inner::); } #[test_traced("INFO")] fn test_read_out_of_range_returns_error_mmb() { let executor = deterministic::Runner::default(); executor.start(test_read_out_of_range_returns_error_inner::); } /// Verify that we can read all operations back correctly. async fn test_read_all_operations_back_correctly_inner( context: Context, ) { let journal = create_journal_with_ops::(context, "read_all", 50).await; assert_eq!(journal.size().await, 50); // Verify all operations can be read back and match expected values for i in 0..50 { let op = journal.read(Location::::new(i)).await.unwrap(); assert_eq!(op, create_operation::(i as u8)); } } #[test_traced("INFO")] fn test_read_all_operations_back_correctly_mmr() { let executor = deterministic::Runner::default(); executor.start(test_read_all_operations_back_correctly_inner::); } #[test_traced("INFO")] fn test_read_all_operations_back_correctly_mmb() { let executor = deterministic::Runner::default(); executor.start(test_read_all_operations_back_correctly_inner::); } /// Verify that sync() persists operations. async fn test_sync_inner(context: Context) { let mut journal = create_empty_journal::(context.child("first"), "close_pending").await; // Add 20 operations let expected_ops: Vec<_> = (0..20).map(|i| create_operation::(i as u8)).collect(); for (i, op) in expected_ops.iter().enumerate() { let loc = journal.append(op).await.unwrap(); assert_eq!(loc, Location::::new(i as u64),); } // Add commit operation to commit the operations let commit_loc = journal .append(&TestOp::::CommitFloor(None, Location::::new(0))) .await .unwrap(); assert_eq!( commit_loc, Location::::new(20), "commit should be at location 20" ); journal.sync().await.unwrap(); // Reopen and verify the operations persisted drop(journal); let journal = create_empty_journal::(context.child("second"), "close_pending").await; assert_eq!(journal.size().await, 21); // Verify all operations can be read back for (i, expected_op) in expected_ops.iter().enumerate() { let read_op = journal.read(Location::::new(i as u64)).await.unwrap(); assert_eq!(read_op, *expected_op); } } #[test_traced("INFO")] fn test_sync_mmr() { let executor = deterministic::Runner::default(); executor.start(test_sync_inner::); } #[test_traced("INFO")] fn test_sync_mmb() { let executor = deterministic::Runner::default(); executor.start(test_sync_inner::); } /// Verify that pruning an empty journal returns the boundary. async fn test_prune_empty_journal_inner(context: Context) { let mut journal = create_empty_journal::(context, "prune_empty").await; let boundary = journal.prune(Location::::new(0)).await.unwrap(); assert_eq!(boundary, Location::::new(0)); } #[test_traced("INFO")] fn test_prune_empty_journal_mmr() { let executor = deterministic::Runner::default(); executor.start(test_prune_empty_journal_inner::); } #[test_traced("INFO")] fn test_prune_empty_journal_mmb() { let executor = deterministic::Runner::default(); executor.start(test_prune_empty_journal_inner::); } /// Verify that pruning to a specific location works correctly. async fn test_prune_to_location_inner(context: Context) { let mut journal = create_journal_with_ops::(context, "prune_to", 100).await; // Add commit at position 50 journal .append(&TestOp::::CommitFloor(None, Location::::new(50))) .await .unwrap(); journal.sync().await.unwrap(); let boundary = journal.prune(Location::::new(50)).await.unwrap(); // Boundary should be <= requested location (may align to section boundary) assert!(boundary <= Location::::new(50)); } #[test_traced("INFO")] fn test_prune_to_location_mmr() { let executor = deterministic::Runner::default(); executor.start(test_prune_to_location_inner::); } #[test_traced("INFO")] fn test_prune_to_location_mmb() { let executor = deterministic::Runner::default(); executor.start(test_prune_to_location_inner::); } /// Verify that prune() returns the actual boundary (which may differ from requested). async fn test_prune_returns_actual_boundary_inner(context: Context) { let mut journal = create_journal_with_ops::(context, "prune_boundary", 100).await; journal .append(&TestOp::::CommitFloor(None, Location::::new(50))) .await .unwrap(); journal.sync().await.unwrap(); let requested = Location::::new(50); let actual = journal.prune(requested).await.unwrap(); // Actual boundary should match bounds.start let bounds = journal.reader().await.bounds(); assert!(!bounds.is_empty()); assert_eq!(actual, bounds.start); // Actual may be <= requested due to section alignment assert!(actual <= requested); } #[test_traced("INFO")] fn test_prune_returns_actual_boundary_mmr() { let executor = deterministic::Runner::default(); executor.start(test_prune_returns_actual_boundary_inner::); } #[test_traced("INFO")] fn test_prune_returns_actual_boundary_mmb() { let executor = deterministic::Runner::default(); executor.start(test_prune_returns_actual_boundary_inner::); } /// Verify that pruning through the Mutable trait also prunes authenticated Merkle state. async fn test_mutable_prune_updates_merkle_boundary_inner( context: Context, ) { let mut journal = create_journal_with_ops::(context, "trait_prune", 100).await; journal .append(&TestOp::::CommitFloor(None, Location::::new(50))) .await .unwrap(); journal.sync().await.unwrap(); let pruned = as Mutable>::prune(&mut journal, 50) .await .unwrap(); assert!(pruned); let item_boundary = journal.reader().await.bounds().start; let merkle_boundary = journal.merkle.bounds().start; assert_eq!(Location::::new(item_boundary), merkle_boundary); assert!(merkle_boundary > Location::::new(0)); let pruned = as Mutable>::prune(&mut journal, 50) .await .unwrap(); assert!(!pruned); assert_eq!(journal.reader().await.bounds().start, item_boundary); assert_eq!(journal.merkle.bounds().start, merkle_boundary); } #[test_traced("INFO")] fn test_mutable_prune_updates_merkle_boundary_mmr() { let executor = deterministic::Runner::default(); executor.start(test_mutable_prune_updates_merkle_boundary_inner::); } #[test_traced("INFO")] fn test_mutable_prune_updates_merkle_boundary_mmb() { let executor = deterministic::Runner::default(); executor.start(test_mutable_prune_updates_merkle_boundary_inner::); } /// Verify that pruning doesn't change the operation count. async fn test_prune_preserves_operation_count_inner(context: Context) { let mut journal = create_journal_with_ops::(context, "prune_count", 100).await; journal .append(&TestOp::::CommitFloor(None, Location::::new(50))) .await .unwrap(); journal.sync().await.unwrap(); let count_before = journal.size().await; journal.prune(Location::::new(50)).await.unwrap(); let count_after = journal.size().await; assert_eq!(count_before, count_after); } #[test_traced("INFO")] fn test_prune_preserves_operation_count_mmr() { let executor = deterministic::Runner::default(); executor.start(test_prune_preserves_operation_count_inner::); } #[test_traced("INFO")] fn test_prune_preserves_operation_count_mmb() { let executor = deterministic::Runner::default(); executor.start(test_prune_preserves_operation_count_inner::); } /// Verify bounds() for empty journal, no pruning, and after pruning. async fn test_bounds_empty_and_pruned_inner(context: Context) { // Test empty journal let journal = create_empty_journal::(context.child("empty"), "oldest").await; assert!(journal.reader().await.bounds().is_empty()); journal.destroy().await.unwrap(); // Test no pruning let journal = create_journal_with_ops::(context.child("no_prune"), "oldest", 100).await; let bounds = journal.reader().await.bounds(); assert!(!bounds.is_empty()); assert_eq!(bounds.start, 0); journal.destroy().await.unwrap(); // Test after pruning let mut journal = create_journal_with_ops::(context.child("pruned"), "oldest", 100).await; journal .append(&TestOp::::CommitFloor(None, Location::::new(50))) .await .unwrap(); journal.sync().await.unwrap(); let pruned_boundary = journal.prune(Location::::new(50)).await.unwrap(); // Should match the pruned boundary (may be <= 50 due to section alignment) let bounds = journal.reader().await.bounds(); assert!(!bounds.is_empty()); assert_eq!(bounds.start, pruned_boundary); // Should be <= requested location (50) assert!(pruned_boundary <= 50); journal.destroy().await.unwrap(); } #[test_traced("INFO")] fn test_bounds_empty_and_pruned_mmr() { let executor = deterministic::Runner::default(); executor.start(test_bounds_empty_and_pruned_inner::); } #[test_traced("INFO")] fn test_bounds_empty_and_pruned_mmb() { let executor = deterministic::Runner::default(); executor.start(test_bounds_empty_and_pruned_inner::); } /// Verify bounds().start for empty journal, no pruning, and after pruning. async fn test_bounds_start_after_prune_inner(context: Context) { // Test empty journal let journal = create_empty_journal::(context.child("empty"), "boundary").await; assert_eq!(journal.reader().await.bounds().start, 0); // Test no pruning let journal = create_journal_with_ops::(context.child("no_prune"), "boundary", 100).await; assert_eq!(journal.reader().await.bounds().start, 0); // Test after pruning let mut journal = create_journal_with_ops::(context.child("pruned"), "boundary", 100).await; journal .append(&TestOp::::CommitFloor(None, Location::::new(50))) .await .unwrap(); journal.sync().await.unwrap(); let pruned_boundary = journal.prune(Location::::new(50)).await.unwrap(); assert_eq!(journal.reader().await.bounds().start, pruned_boundary); } #[test_traced("INFO")] fn test_bounds_start_after_prune_mmr() { let executor = deterministic::Runner::default(); executor.start(test_bounds_start_after_prune_inner::); } #[test_traced("INFO")] fn test_bounds_start_after_prune_mmb() { let executor = deterministic::Runner::default(); executor.start(test_bounds_start_after_prune_inner::); } /// Verify that Merkle prunes to the journal's actual boundary, not the requested location. async fn test_mmr_prunes_to_journal_boundary_inner(context: Context) { let mut journal = create_journal_with_ops::(context, "mmr_boundary", 50).await; journal .append(&TestOp::::CommitFloor(None, Location::::new(25))) .await .unwrap(); journal.sync().await.unwrap(); let pruned_boundary = journal.prune(Location::::new(25)).await.unwrap(); // Verify Merkle and journal remain in sync let bounds = journal.reader().await.bounds(); assert!(!bounds.is_empty()); assert_eq!(pruned_boundary, bounds.start); // Verify boundary is at or before requested (due to section alignment) assert!(pruned_boundary <= Location::::new(25)); // Verify operation count is unchanged assert_eq!(journal.size().await, 51); } #[test_traced("INFO")] fn test_mmr_prunes_to_journal_boundary_mmr() { let executor = deterministic::Runner::default(); executor.start(test_mmr_prunes_to_journal_boundary_inner::); } #[test_traced("INFO")] fn test_mmr_prunes_to_journal_boundary_mmb() { let executor = deterministic::Runner::default(); executor.start(test_mmr_prunes_to_journal_boundary_inner::); } /// Verify proof() for multiple operations. async fn test_proof_multiple_operations_inner(context: Context) { let journal = create_journal_with_ops::(context, "proof_multi", 50).await; let (proof, ops) = journal .proof(Location::::new(0), NZU64!(50), 0) .await .unwrap(); assert_eq!(ops.len(), 50); for (i, op) in ops.iter().enumerate() { assert_eq!(*op, create_operation::(i as u8)); } // Verify the proof is valid let hasher = StandardHasher::new(ForwardFold); let root = journal_root(&journal); assert!(verify_proof( &proof, &ops, Location::::new(0), &root, &hasher )); } #[test_traced("INFO")] fn test_proof_multiple_operations_mmr() { let executor = deterministic::Runner::default(); executor.start(test_proof_multiple_operations_inner::); } #[test_traced("INFO")] fn test_proof_multiple_operations_mmb() { let executor = deterministic::Runner::default(); executor.start(test_proof_multiple_operations_inner::); } /// Verify that historical_proof() respects the max_ops limit. async fn test_historical_proof_limited_by_max_ops_inner( context: Context, ) { let journal = create_journal_with_ops::(context, "proof_limit", 50).await; let size = journal.size().await; let (proof, ops) = journal .historical_proof(size, Location::::new(0), NZU64!(20), 0) .await .unwrap(); // Should return only 20 operations despite 50 being available assert_eq!(ops.len(), 20); for (i, op) in ops.iter().enumerate() { assert_eq!(*op, create_operation::(i as u8)); } // Verify the proof is valid let hasher = StandardHasher::new(ForwardFold); let root = journal_root(&journal); assert!(verify_proof( &proof, &ops, Location::::new(0), &root, &hasher )); } #[test_traced("INFO")] fn test_historical_proof_limited_by_max_ops_mmr() { let executor = deterministic::Runner::default(); executor.start(|context| { test_historical_proof_limited_by_max_ops_inner::(context) }); } #[test_traced("INFO")] fn test_historical_proof_limited_by_max_ops_mmb() { let executor = deterministic::Runner::default(); executor.start(|context| { test_historical_proof_limited_by_max_ops_inner::(context) }); } /// Verify historical_proof() at the end of the journal. async fn test_historical_proof_at_end_of_journal_inner( context: Context, ) { let journal = create_journal_with_ops::(context, "proof_end", 50).await; let size = journal.size().await; // Request proof starting near the end let (proof, ops) = journal .historical_proof(size, Location::::new(40), NZU64!(20), 0) .await .unwrap(); // Should return only 10 operations (positions 40-49) assert_eq!(ops.len(), 10); for (i, op) in ops.iter().enumerate() { assert_eq!(*op, create_operation::((40 + i) as u8)); } // Verify the proof is valid let hasher = StandardHasher::new(ForwardFold); let root = journal_root(&journal); assert!(verify_proof( &proof, &ops, Location::::new(40), &root, &hasher )); } #[test_traced("INFO")] fn test_historical_proof_at_end_of_journal_mmr() { let executor = deterministic::Runner::default(); executor.start(test_historical_proof_at_end_of_journal_inner::); } #[test_traced("INFO")] fn test_historical_proof_at_end_of_journal_mmb() { let executor = deterministic::Runner::default(); executor.start(test_historical_proof_at_end_of_journal_inner::); } /// Verify that historical_proof() returns an error for invalid size. async fn test_historical_proof_out_of_range_returns_error_inner( context: Context, ) { let journal = create_journal_with_ops::(context, "proof_oob", 5).await; // Request proof with size > actual journal size let result = journal .historical_proof(Location::::new(10), Location::::new(0), NZU64!(1), 0) .await; assert!(matches!( result, Err(Error::Merkle(merkle::Error::RangeOutOfBounds(_))) )); } #[test_traced("INFO")] fn test_historical_proof_out_of_range_returns_error_mmr() { let executor = deterministic::Runner::default(); executor.start(|context| { test_historical_proof_out_of_range_returns_error_inner::(context) }); } #[test_traced("INFO")] fn test_historical_proof_out_of_range_returns_error_mmb() { let executor = deterministic::Runner::default(); executor.start(|context| { test_historical_proof_out_of_range_returns_error_inner::(context) }); } /// Verify that historical_proof() returns an error when start_loc >= size. async fn test_historical_proof_start_too_large_returns_error_inner( context: Context, ) { let journal = create_journal_with_ops::(context, "proof_start_oob", 5).await; let size = journal.size().await; // Request proof starting at size (should fail) let result = journal.historical_proof(size, size, NZU64!(1), 0).await; assert!(matches!( result, Err(Error::Merkle(merkle::Error::RangeOutOfBounds(_))) )); } #[test_traced("INFO")] fn test_historical_proof_start_too_large_returns_error_mmr() { let executor = deterministic::Runner::default(); executor.start(|context| { test_historical_proof_start_too_large_returns_error_inner::(context) }); } #[test_traced("INFO")] fn test_historical_proof_start_too_large_returns_error_mmb() { let executor = deterministic::Runner::default(); executor.start(|context| { test_historical_proof_start_too_large_returns_error_inner::(context) }); } /// Verify historical_proof() for a truly historical state (before more operations added). async fn test_historical_proof_truly_historical_inner(context: Context) { // Create journal with initial operations let mut journal = create_journal_with_ops::(context, "proof_historical", 50).await; // Capture root at historical state let hasher = StandardHasher::new(ForwardFold); let historical_root = journal_root(&journal); let historical_size = journal.size().await; // Add more operations after the historical state for i in 50..100 { journal .append(&create_operation::(i as u8)) .await .unwrap(); } journal.sync().await.unwrap(); // Generate proof for the historical state let (proof, ops) = journal .historical_proof(historical_size, Location::::new(0), NZU64!(50), 0) .await .unwrap(); // Verify operations match expected historical operations assert_eq!(ops.len(), 50); for (i, op) in ops.iter().enumerate() { assert_eq!(*op, create_operation::(i as u8)); } // Verify the proof is valid against the historical root assert!(verify_proof( &proof, &ops, Location::::new(0), &historical_root, &hasher )); } #[test_traced("INFO")] fn test_historical_proof_truly_historical_mmr() { let executor = deterministic::Runner::default(); executor.start(test_historical_proof_truly_historical_inner::); } #[test_traced("INFO")] fn test_historical_proof_truly_historical_mmb() { let executor = deterministic::Runner::default(); executor.start(test_historical_proof_truly_historical_inner::); } /// Verify that historical_proof() returns an error when start_loc is pruned. async fn test_historical_proof_pruned_location_returns_error_inner( context: Context, ) { let mut journal = create_journal_with_ops::(context, "proof_pruned", 50).await; journal .append(&TestOp::::CommitFloor(None, Location::::new(25))) .await .unwrap(); journal.sync().await.unwrap(); let pruned_boundary = journal.prune(Location::::new(25)).await.unwrap(); // Try to get proof starting at a location before the pruned boundary let size = journal.size().await; let start_loc = Location::::new(0); if start_loc < pruned_boundary { let result = journal .historical_proof(size, start_loc, NZU64!(1), 0) .await; // Should fail when trying to read pruned operations assert!(result.is_err()); } } #[test_traced("INFO")] fn test_historical_proof_pruned_location_returns_error_mmr() { let executor = deterministic::Runner::default(); executor.start(|context| { test_historical_proof_pruned_location_returns_error_inner::(context) }); } #[test_traced("INFO")] fn test_historical_proof_pruned_location_returns_error_mmb() { let executor = deterministic::Runner::default(); executor.start(|context| { test_historical_proof_pruned_location_returns_error_inner::(context) }); } /// Verify replay() with empty journal and multiple operations. async fn test_replay_operations_inner(context: Context) { // Test empty journal let journal = create_empty_journal::(context.child("empty"), "replay").await; let reader = journal.reader().await; let stream = reader.replay(NZUsize!(10), 0).await.unwrap(); futures::pin_mut!(stream); assert!(stream.next().await.is_none()); // Test replaying all operations let journal = create_journal_with_ops::(context.child("with_ops"), "replay", 50).await; let reader = journal.reader().await; let stream = reader.replay(NZUsize!(100), 0).await.unwrap(); futures::pin_mut!(stream); for i in 0..50 { let (pos, op) = stream.next().await.unwrap().unwrap(); assert_eq!(pos, i); assert_eq!(op, create_operation::(i as u8)); } assert!(stream.next().await.is_none()); } #[test_traced("INFO")] fn test_replay_operations_mmr() { let executor = deterministic::Runner::default(); executor.start(test_replay_operations_inner::); } #[test_traced("INFO")] fn test_replay_operations_mmb() { let executor = deterministic::Runner::default(); executor.start(test_replay_operations_inner::); } /// Verify replay() starting from a middle location. async fn test_replay_from_middle_inner(context: Context) { let journal = create_journal_with_ops::(context, "replay_middle", 50).await; let reader = journal.reader().await; let stream = reader.replay(NZUsize!(100), 25).await.unwrap(); futures::pin_mut!(stream); let mut count = 0; while let Some(result) = stream.next().await { let (pos, op) = result.unwrap(); assert_eq!(pos, 25 + count); assert_eq!(op, create_operation::((25 + count) as u8)); count += 1; } // Should have replayed positions 25-49 (25 operations) assert_eq!(count, 25); } #[test_traced("INFO")] fn test_replay_from_middle_mmr() { let executor = deterministic::Runner::default(); executor.start(test_replay_from_middle_inner::); } #[test_traced("INFO")] fn test_replay_from_middle_mmb() { let executor = deterministic::Runner::default(); executor.start(test_replay_from_middle_inner::); } /// Verify the speculative batch API: fork two batches, verify independent roots, apply one. async fn test_speculative_batch_inner(context: Context) { let mut journal = create_journal_with_ops::(context, "speculative_batch", 10).await; let original_root = journal_root(&journal); // Fork two independent speculative batches. let b1 = journal.new_batch(); let b2 = journal.new_batch(); // Add different items to each batch. let op_a = create_operation::(100); let op_b = create_operation::(200); let b1 = b1.add(op_a.clone()); let b2 = b2.add(op_b); // Merkleize and verify independent roots. let m1 = journal.merkle.with_mem(|mem| b1.merkleize(mem)); let m2 = journal.merkle.with_mem(|mem| b2.merkleize(mem)); assert_ne!(batch_root(&journal, &m1), batch_root(&journal, &m2)); assert_ne!(batch_root(&journal, &m1), original_root); assert_ne!(batch_root(&journal, &m2), original_root); // Journal root should be unchanged (batches are speculative). assert_eq!(journal_root(&journal), original_root); // Apply batch 1. let expected_root = batch_root(&journal, &m1); journal.apply_batch(&m1).await.unwrap(); // Journal should now match the applied batch's root. assert_eq!(journal_root(&journal), expected_root); assert_eq!(*journal.size().await, 11); } #[test_traced("INFO")] fn test_speculative_batch_mmr() { let executor = deterministic::Runner::default(); executor.start(test_speculative_batch_inner::); } #[test_traced("INFO")] fn test_speculative_batch_mmb() { let executor = deterministic::Runner::default(); executor.start(test_speculative_batch_inner::); } /// Verify stacking: create batch A, merkleize, create batch B from merkleized A, /// merkleize, and apply. Verify root and items. async fn test_speculative_batch_stacking_inner(context: Context) { let mut journal = create_journal_with_ops::(context, "batch_stacking", 10).await; let op_a = create_operation::(100); let op_b = create_operation::(200); let (merkleized_a, merkleized_b) = { let batch_a = journal.new_batch().add(op_a.clone()); let merkleized_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem)); let batch_b = merkleized_a.new_batch::().add(op_b.clone()); let merkleized_b = journal.merkle.with_mem(|mem| batch_b.merkleize(mem)); (merkleized_a, merkleized_b) }; let expected_root = batch_root(&journal, &merkleized_b); journal.apply_batch(&merkleized_b).await.unwrap(); drop(merkleized_a); assert_eq!(journal_root(&journal), expected_root); assert_eq!(*journal.size().await, 12); // Verify both items were appended correctly. let read_a = journal.read(Location::::new(10)).await.unwrap(); assert_eq!(read_a, op_a); let read_b = journal.read(Location::::new(11)).await.unwrap(); assert_eq!(read_b, op_b); } #[test_traced("INFO")] fn test_speculative_batch_stacking_mmr() { let executor = deterministic::Runner::default(); executor.start(test_speculative_batch_stacking_inner::); } #[test_traced("INFO")] fn test_speculative_batch_stacking_mmb() { let executor = deterministic::Runner::default(); executor.start(test_speculative_batch_stacking_inner::); } /// Verify sequential batch application: apply batch A, then build and apply batch B /// from the committed state. Verify root and items. async fn test_speculative_batch_sequential_inner(context: Context) { let mut journal = create_journal_with_ops::(context, "batch_sequential", 10).await; let op_a = create_operation::(100); let op_b = create_operation::(200); // Apply batch A. let batch_a = journal.new_batch().add(op_a.clone()); let merkleized_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem)); journal.apply_batch(&merkleized_a).await.unwrap(); assert_eq!(*journal.size().await, 11); // Apply batch B (built on top of the committed A). let batch_b = journal.new_batch().add(op_b.clone()); let merkleized_b = journal.merkle.with_mem(|mem| batch_b.merkleize(mem)); let expected_root = batch_root(&journal, &merkleized_b); journal.apply_batch(&merkleized_b).await.unwrap(); assert_eq!(journal_root(&journal), expected_root); assert_eq!(*journal.size().await, 12); // Verify both items were appended correctly. let read_a = journal.read(Location::::new(10)).await.unwrap(); assert_eq!(read_a, op_a); let read_b = journal.read(Location::::new(11)).await.unwrap(); assert_eq!(read_b, op_b); } #[test_traced("INFO")] fn test_speculative_batch_sequential_mmr() { let executor = deterministic::Runner::default(); executor.start(test_speculative_batch_sequential_inner::); } #[test_traced("INFO")] fn test_speculative_batch_sequential_mmb() { let executor = deterministic::Runner::default(); executor.start(test_speculative_batch_sequential_inner::); } async fn test_stale_batch_sibling_inner(context: Context) { let mut journal = create_empty_journal::(context, "stale-sibling").await; let op_a = create_operation::(1); let op_b = create_operation::(2); // Create two batches from the same base. let batch_a = journal.new_batch().add(op_a.clone()); let merkleized_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem)); let batch_b = journal.new_batch().add(op_b); let merkleized_b = journal.merkle.with_mem(|mem| batch_b.merkleize(mem)); // Apply A -- should succeed. journal.apply_batch(&merkleized_a).await.unwrap(); let expected_root = journal_root(&journal); let expected_size = journal.size().await; // Apply B -- should fail (stale). let result = journal.apply_batch(&merkleized_b).await; assert!( matches!( result, Err(super::Error::Merkle(merkle::Error::StaleBatch { .. })) ), "expected StaleBatch, got {result:?}" ); // The stale batch must not mutate the journal or desync it from the Merkle. assert_eq!(journal_root(&journal), expected_root); assert_eq!(journal.size().await, expected_size); let (_, ops) = journal .proof(Location::::new(0), NZU64!(1), 0) .await .unwrap(); assert_eq!(ops, vec![op_a]); } #[test_traced("INFO")] fn test_stale_batch_sibling_mmr() { let executor = deterministic::Runner::default(); executor.start(test_stale_batch_sibling_inner::); } #[test_traced("INFO")] fn test_stale_batch_sibling_mmb() { let executor = deterministic::Runner::default(); executor.start(test_stale_batch_sibling_inner::); } async fn test_stale_batch_chained_inner(context: Context) { let mut journal = create_journal_with_ops::(context, "stale-chained", 5).await; // Parent batch, then fork two children. let parent_batch = journal.new_batch().add(create_operation::(10)); let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem)); let batch_a = parent.new_batch::().add(create_operation::(20)); let child_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem)); let batch_b = parent.new_batch::().add(create_operation::(30)); let child_b = journal.merkle.with_mem(|mem| batch_b.merkleize(mem)); // Apply child_a, then child_b should be stale. journal.apply_batch(&child_a).await.unwrap(); let result = journal.apply_batch(&child_b).await; drop(parent); assert!( matches!( result, Err(super::Error::Merkle(merkle::Error::StaleBatch { .. })) ), "expected StaleBatch for sibling, got {result:?}" ); } #[test_traced("INFO")] fn test_stale_batch_chained_mmr() { let executor = deterministic::Runner::default(); executor.start(test_stale_batch_chained_inner::); } #[test_traced("INFO")] fn test_stale_batch_chained_mmb() { let executor = deterministic::Runner::default(); executor.start(test_stale_batch_chained_inner::); } async fn test_stale_batch_parent_before_child_inner(context: Context) { let mut journal = create_empty_journal::(context, "stale-parent-first").await; // Create parent, then child. let parent_batch = journal.new_batch().add(create_operation::(1)); let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem)); let child_batch = parent.new_batch::().add(create_operation::(2)); let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem)); let expected_root = batch_root(&journal, &child); // Apply parent, then child (sequential commit). journal.apply_batch(&parent).await.unwrap(); journal.apply_batch(&child).await.unwrap(); assert_eq!(journal_root(&journal), expected_root); assert_eq!(*journal.size().await, 2); } #[test_traced("INFO")] fn test_stale_batch_parent_before_child_mmr() { let executor = deterministic::Runner::default(); executor.start(test_stale_batch_parent_before_child_inner::); } #[test_traced("INFO")] fn test_stale_batch_parent_before_child_mmb() { let executor = deterministic::Runner::default(); executor.start(test_stale_batch_parent_before_child_inner::); } async fn test_stale_batch_child_before_parent_inner(context: Context) { let mut journal = create_empty_journal::(context, "stale-child-first").await; // Create parent, then child. let parent_batch = journal.new_batch().add(create_operation::(1)); let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem)); let child_batch = parent.new_batch::().add(create_operation::(2)); let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem)); // Apply child first (full chain) -- parent should now be stale. journal.apply_batch(&child).await.unwrap(); let result = journal.apply_batch(&parent).await; assert!( matches!( result, Err(super::Error::Merkle(merkle::Error::StaleBatch { .. })) ), "expected StaleBatch for parent after child applied, got {result:?}" ); } #[test_traced("INFO")] fn test_stale_batch_child_before_parent_mmr() { let executor = deterministic::Runner::default(); executor.start(test_stale_batch_child_before_parent_inner::); } #[test_traced("INFO")] fn test_stale_batch_child_before_parent_mmb() { let executor = deterministic::Runner::default(); executor.start(test_stale_batch_child_before_parent_inner::); } /// Apply parent then child: child skips already-committed ancestor items. async fn test_apply_batch_skip_ancestor_items_inner(context: Context) { let mut journal = create_journal_with_ops::(context, "rp-skip", 3).await; // Parent: 2 items. let parent_batch = journal .new_batch() .add(create_operation::(10)) .add(create_operation::(11)); let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem)); // Child: 3 more items. let child_batch = parent .new_batch::() .add(create_operation::(20)) .add(create_operation::(21)) .add(create_operation::(22)); let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem)); // Apply parent. journal.apply_batch(&parent).await.unwrap(); // Apply child (ancestor items already committed, skipped automatically). journal.apply_batch(&child).await.unwrap(); // Verify all items are present. let (_, ops) = journal .proof(Location::::new(3), NZU64!(5), 0) .await .unwrap(); assert_eq!(ops.len(), 5); } #[test_traced("INFO")] fn test_apply_batch_skip_ancestor_items_mmr() { let executor = deterministic::Runner::default(); executor.start(test_apply_batch_skip_ancestor_items_inner::); } #[test_traced("INFO")] fn test_apply_batch_skip_ancestor_items_mmb() { let executor = deterministic::Runner::default(); executor.start(test_apply_batch_skip_ancestor_items_inner::); } /// `apply_batch` works correctly across a 3-level chain. async fn test_apply_batch_cross_batch_inner(context: Context) { let mut journal = create_journal_with_ops::(context, "rp-cross", 2).await; // Grandparent: 3 items. let grandparent_batch = journal .new_batch() .add(create_operation::(3)) .add(create_operation::(4)) .add(create_operation::(5)); let grandparent = journal .merkle .with_mem(|mem| grandparent_batch.merkleize(mem)); // Parent: 2 items. let parent_batch = grandparent .new_batch::() .add(create_operation::(6)) .add(create_operation::(7)); let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem)); // Child: 1 item. let child_batch = parent.new_batch::().add(create_operation::(8)); let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem)); // Apply grandparent, then parent, then child sequentially. journal.apply_batch(&grandparent).await.unwrap(); // Apply parent (ancestor items already committed, skipped automatically). journal.apply_batch(&parent).await.unwrap(); // Apply child (ancestor items already committed, skipped automatically). journal.apply_batch(&child).await.unwrap(); // All 8 items (2 base + 3 + 2 + 1) should be present. assert_eq!(*journal.size().await, 8); // Verify the actual items at each location. let (_, ops) = journal .proof(Location::::new(2), NZU64!(6), 0) .await .unwrap(); for (i, op) in ops.iter().enumerate() { assert_eq!(*op, create_operation::((i + 3) as u8)); } } #[test_traced("INFO")] fn test_apply_batch_cross_batch_mmr() { let executor = deterministic::Runner::default(); executor.start(test_apply_batch_cross_batch_inner::); } #[test_traced("INFO")] fn test_apply_batch_cross_batch_mmb() { let executor = deterministic::Runner::default(); executor.start(test_apply_batch_cross_batch_inner::); } /// merkleize_with produces the same root as add + merkleize. async fn test_merkleize_with_matches_add_inner(context: Context) { let journal = create_journal_with_ops::(context, "mw-matches", 5).await; let ops = vec![ create_operation::(10), create_operation::(11), create_operation::(12), ]; // add + merkleize let mut batch = journal.new_batch(); for op in &ops { batch = batch.add(op.clone()); } let expected = journal.merkle.with_mem(|mem| batch.merkleize(mem)); // merkleize_with let batch = journal.new_batch(); let actual = journal .merkle .with_mem(|mem| batch.merkleize_with(mem, Arc::new(ops))); assert_eq!( batch_root(&journal, &actual), batch_root(&journal, &expected) ); } #[test_traced("INFO")] fn test_merkleize_with_matches_add_mmr() { let executor = deterministic::Runner::default(); executor.start(test_merkleize_with_matches_add_inner::); } #[test_traced("INFO")] fn test_merkleize_with_matches_add_mmb() { let executor = deterministic::Runner::default(); executor.start(test_merkleize_with_matches_add_inner::); } /// merkleize_with items are readable after apply. async fn test_merkleize_with_apply_inner(context: Context) { let mut journal = create_journal_with_ops::(context, "mw-apply", 5).await; let ops = vec![create_operation::(10), create_operation::(11)]; let batch = journal.new_batch(); let merkleized = journal .merkle .with_mem(|mem| batch.merkleize_with(mem, Arc::new(ops.clone()))); let expected_root = batch_root(&journal, &merkleized); journal.apply_batch(&merkleized).await.unwrap(); assert_eq!(journal_root(&journal), expected_root); assert_eq!(*journal.size().await, 7); let reader = journal.reader().await; assert_eq!(reader.read(5).await.unwrap(), ops[0]); assert_eq!(reader.read(6).await.unwrap(), ops[1]); } #[test_traced("INFO")] fn test_merkleize_with_apply_mmr() { let executor = deterministic::Runner::default(); executor.start(test_merkleize_with_apply_inner::); } #[test_traced("INFO")] fn test_merkleize_with_apply_mmb() { let executor = deterministic::Runner::default(); executor.start(test_merkleize_with_apply_inner::); } /// merkleize_with stores the caller's Arc directly (no deep copy). async fn test_merkleize_with_shares_arc_inner(context: Context) { let journal = create_journal_with_ops::(context, "mw-arc", 3).await; let ops = Arc::new(vec![create_operation::(20), create_operation::(21)]); let ops_clone = Arc::clone(&ops); let batch = journal.new_batch(); let merkleized = journal .merkle .with_mem(|mem| batch.merkleize_with(mem, ops_clone)); // The batch should hold the same Arc allocation, not a copy. assert!(Arc::ptr_eq(&merkleized.items, &ops)); } #[test_traced("INFO")] fn test_merkleize_with_shares_arc_mmr() { let executor = deterministic::Runner::default(); executor.start(test_merkleize_with_shares_arc_inner::); } #[test_traced("INFO")] fn test_merkleize_with_shares_arc_mmb() { let executor = deterministic::Runner::default(); executor.start(test_merkleize_with_shares_arc_inner::); } /// Apply C (grandchild of A) after only A is committed. B's journal items /// must still be applied -- skip only A's items. async fn test_apply_batch_skips_only_committed_ancestor_items_inner( context: Context, ) { let mut journal = create_empty_journal::(context.child("storage"), "skip-partial").await; // Build chain: A -> B -> C let a_batch = journal.new_batch().add(create_operation::(1)); let a = journal.merkle.with_mem(|mem| a_batch.merkleize(mem)); let b_batch = a.new_batch::().add(create_operation::(2)); let b = journal.merkle.with_mem(|mem| b_batch.merkleize(mem)); let c_batch = b.new_batch::().add(create_operation::(3)); let c = journal.merkle.with_mem(|mem| c_batch.merkleize(mem)); // Apply A, then apply C directly (skipping B's apply_batch). journal.apply_batch(&a).await.unwrap(); journal.apply_batch(&c).await.unwrap(); // All 3 items should be in the journal. assert_eq!(*journal.size().await, 3); // Build a reference that applies all three sequentially. let mut reference = create_empty_journal::(context.child("ref"), "skip-partial-ref").await; for i in 1..=3u8 { reference.append(&create_operation::(i)).await.unwrap(); } assert_eq!(journal_root(&journal), journal_root(&reference)); } #[test_traced("INFO")] fn test_apply_batch_skips_only_committed_ancestor_items_mmr() { let executor = deterministic::Runner::default(); executor.start(test_apply_batch_skips_only_committed_ancestor_items_inner::); } #[test_traced("INFO")] fn test_apply_batch_skips_only_committed_ancestor_items_mmb() { let executor = deterministic::Runner::default(); executor.start(test_apply_batch_skips_only_committed_ancestor_items_inner::); } }