//! Generic sync tests that work for both fixed and variable databases. //! //! This module provides a test harness trait and generic test functions that can be //! parameterized to run against either fixed-size or variable-size value databases. //! The shared functions are `pub(crate)` so that `current::sync::tests` can reuse them. use crate::{ journal::contiguous::Contiguous, merkle::{self, Location}, qmdb::{ self, any::traits::DbAny, operation::Operation as OperationTrait, sync::{ self, engine::{Config, NextStep}, resolver::{self, FetchResult, Resolver}, Engine, Target, }, }, Persistable, }; use commonware_codec::Encode; use commonware_cryptography::sha256::Digest; use commonware_macros::select; use commonware_runtime::{ deterministic, BufferPooler, Clock, Metrics as _, Runner as _, Supervisor as _, }; use commonware_utils::{ channel::{mpsc, oneshot}, non_empty_range, sync::{AsyncRwLock, Mutex}, NZU64, }; use futures::{pin_mut, FutureExt}; use rand::RngCore as _; use std::{ num::NonZeroU64, sync::{ atomic::{AtomicUsize, Ordering}, Arc, }, time::Duration, }; /// Type alias for the database type of a harness. pub(crate) type DbOf = ::Db; /// Type alias for the operation type of a harness. pub(crate) type OpOf = as qmdb::sync::Database>::Op; /// Type alias for the config type of a harness. pub(crate) type ConfigOf = as qmdb::sync::Database>::Config; /// Type alias for the journal type of a harness. pub(crate) type JournalOf = as qmdb::sync::Database>::Journal; /// Trait for cleanup operations in tests. pub(crate) trait Destructible { type Family: merkle::Family; fn destroy( self, ) -> impl std::future::Future>> + Send; } // Implement Destructible once for the generic full Merkle type used in tests. // This is here (rather than in fixed/variable modules) to avoid duplicate implementations. impl Destructible for crate::merkle::full::Merkle< F, deterministic::Context, Digest, commonware_parallel::Sequential, > { type Family = F; async fn destroy(self) -> Result<(), qmdb::Error> { self.destroy().await.map_err(qmdb::Error::Merkle) } } /// Trait providing internal access for from_sync_result tests. pub(crate) trait FromSyncTestable: qmdb::sync::Database { type Merkle: Destructible + Send; /// Get the Merkle structure and journal from the database. fn into_log_components(self) -> (Self::Merkle, Self::Journal); /// Get the pinned nodes at a given location fn pinned_nodes_at( &self, loc: Location, ) -> impl std::future::Future> + Send; } /// Harness for sync tests. pub(crate) trait SyncTestHarness: Sized + 'static { /// The merkle family the database under test uses. type Family: merkle::Family; /// The database type being tested. type Db: qmdb::sync::Database< Family = Self::Family, Context = deterministic::Context, Digest = Digest, Config: Clone, > + DbAny; /// Return the root the sync engine targets. fn sync_target_root(db: &Self::Db) -> Digest; /// Create a config with unique partition names fn config(suffix: &str, pooler: &impl BufferPooler) -> ConfigOf; /// Generate n test operations using the default seed (0) fn create_ops(n: usize) -> Vec>; /// Generate n test operations using a specific seed. /// Use different seeds when you need non-overlapping keys in the same test. fn create_ops_seeded(n: usize, seed: u64) -> Vec>; /// Initialize a database fn init_db(ctx: deterministic::Context) -> impl std::future::Future + Send; /// Initialize a database with a config fn init_db_with_config( ctx: deterministic::Context, config: ConfigOf, ) -> impl std::future::Future + Send; /// Apply operations to a database and commit. fn apply_ops( db: Self::Db, ops: Vec>, ) -> impl std::future::Future + Send; } /// Test that empty operations arrays fetched do not cause panics when stored and applied pub(crate) fn test_sync_empty_operations_no_panic() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Init target_db to satisfy engine configuration bounds let target_db = H::init_db(context.child("target")).await; // Use an arbitrary target let db_config = H::config(&context.next_u64().to_string(), &context); let config = Config { db_config, fetch_batch_size: NZU64!(10), target: Target { root: Digest::from([1u8; 32]), range: non_empty_range!(Location::new(0), Location::new(10)), }, context: context.child("client"), resolver: Arc::new(target_db), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; // Create the engine let mut client: Engine = Engine::new(config).await.unwrap(); // Pass empty operations vectors which should not cause panics client.store_operations(Location::new(0), vec![]); client.store_operations(Location::new(5), vec![]); // Apply operations which also shouldn't panic client.apply_operations().await.unwrap(); // It is considered a success simply if it didn't panic. }); } /// Test that resolver failure is handled correctly pub(crate) fn test_sync_resolver_fails() where resolver::tests::FailResolver, Digest>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let resolver = resolver::tests::FailResolver::, Digest>::new(); let target_root = Digest::from([0; 32]); let db_config = H::config(&context.next_u64().to_string(), &context); let engine_config = Config { context: context.child("client"), target: Target { root: target_root, range: non_empty_range!(Location::new(0), Location::new(5)), }, resolver, apply_batch_size: 2, max_outstanding_requests: 2, fetch_batch_size: NZU64!(2), db_config, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; let result: Result = sync::sync(engine_config).await; assert!(result.is_err()); }); } /// Test basic sync functionality with various batch sizes pub(crate) fn test_sync(target_db_ops: usize, fetch_batch_size: NonZeroU64) where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate target database let mut target_db = H::init_db(context.child("target")).await; let target_ops = H::create_ops(target_db_ops); target_db = H::apply_ops(target_db, target_ops).await; // commit already done in apply_ops target_db .prune(target_db.sync_boundary().await) .await .unwrap(); let target_op_count = target_db.bounds().await.end; let target_inactivity_floor = target_db.inactivity_floor_loc().await; let sync_root = H::sync_target_root(&target_db); let verification_root = target_db.root(); let lower_bound = target_db.sync_boundary().await; // Configure sync let db_config = H::config(&context.next_u64().to_string(), &context); let target_db = Arc::new(target_db); let client_context = context.child("client"); let config = Config { db_config: db_config.clone(), fetch_batch_size, target: Target { root: sync_root, range: non_empty_range!(lower_bound, target_op_count), }, context: client_context.child("client"), resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; // Perform sync let synced_db: H::Db = sync::sync(config).await.unwrap(); // Verify database state (root hash is the key verification) assert_eq!(synced_db.bounds().await.end, target_op_count); assert_eq!( synced_db.inactivity_floor_loc().await, target_inactivity_floor ); assert_eq!(synced_db.root(), verification_root); // Verify persistence let final_root = synced_db.root(); let final_op_count = synced_db.bounds().await.end; let final_inactivity_floor = synced_db.inactivity_floor_loc().await; // Reopen and verify state persisted drop(synced_db); let reopened_db = H::init_db_with_config(client_context.child("reopened"), db_config).await; assert_eq!(reopened_db.bounds().await.end, final_op_count); assert_eq!( reopened_db.inactivity_floor_loc().await, final_inactivity_floor ); assert_eq!(reopened_db.root(), final_root); // Cleanup reopened_db.destroy().await.unwrap(); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test syncing to a subset of the target database (target has additional ops beyond sync range) pub(crate) fn test_sync_subset_of_target_database(target_db_ops: usize) where Arc>: Resolver, Digest = Digest>, OpOf: Encode + Clone + OperationTrait, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let mut target_db = H::init_db(context.child("target")).await; let target_ops = H::create_ops(target_db_ops); // Apply all but the last operation target_db = H::apply_ops(target_db, target_ops[0..target_db_ops - 1].to_vec()).await; // commit already done in apply_ops let upper_bound = target_db.bounds().await.end; let sync_root = H::sync_target_root(&target_db); let verification_root = target_db.root(); let lower_bound = target_db.sync_boundary().await; // Add another operation after the sync range let final_op = target_ops[target_db_ops - 1].clone(); let final_key = final_op.key().cloned(); // Store the key before applying target_db = H::apply_ops(target_db, vec![final_op]).await; // commit already done in apply_ops // Sync to the original root (before final_op was added) let db_config = H::config(&context.next_u64().to_string(), &context); let config = Config { db_config, fetch_batch_size: NZU64!(10), target: Target { root: sync_root, range: non_empty_range!(lower_bound, upper_bound), }, context: context.child("client"), resolver: Arc::new(target_db), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; let synced_db: H::Db = sync::sync(config).await.unwrap(); // Verify the synced database has the correct range of operations assert_eq!(synced_db.sync_boundary().await, lower_bound); assert_eq!(synced_db.bounds().await.end, upper_bound); // Verify the final root digest matches our target assert_eq!(synced_db.root(), verification_root); // Verify the synced database doesn't have any operations beyond the sync range. // (the final_op should not be present) if let Some(key) = final_key { assert!(synced_db.get(&key).await.unwrap().is_none()); } synced_db.destroy().await.unwrap(); }); } /// Test syncing where the sync client has some but not all of the operations in the target DB. /// Tests the scenario where sync_db already has partial data and needs to sync additional ops. pub(crate) fn test_sync_use_existing_db_partial_match(original_ops: usize) where Arc>: Resolver, Digest = Digest>, OpOf: Encode + Clone + OperationTrait, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let original_ops_data = H::create_ops(original_ops); // Heap-pin large sub-futures so their state machines don't inflate this test's outer // state machine and overflow the test thread stack. let mut target_db = Box::pin(H::init_db(context.child("target"))).await; let sync_db_config = H::config(&context.next_u64().to_string(), &context); let client_context = context.child("client"); let mut sync_db: H::Db = Box::pin(H::init_db_with_config( client_context.child("client"), sync_db_config.clone(), )) .await; // Apply the same operations to both databases target_db = Box::pin(H::apply_ops(target_db, original_ops_data.clone())).await; sync_db = Box::pin(H::apply_ops(sync_db, original_ops_data.clone())).await; // commit already done in apply_ops // commit already done in apply_ops drop(sync_db); // Add more operations and commit the target database // (use different seed to avoid key collisions) let more_ops = H::create_ops_seeded(1, 1); target_db = Box::pin(H::apply_ops(target_db, more_ops.clone())).await; // commit already done in apply_ops let sync_root = H::sync_target_root(&target_db); let verification_root = target_db.root(); let lower_bound = target_db.sync_boundary().await; let upper_bound = target_db.bounds().await.end; // Reopen the sync database and sync it to the target database let target_db = Arc::new(target_db); let config = Config { db_config: sync_db_config, fetch_batch_size: NZU64!(10), target: Target { root: sync_root, range: non_empty_range!(lower_bound, upper_bound), }, context: client_context.child("sync"), resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; let synced_db: H::Db = sync::sync(config).await.unwrap(); // Verify database state let bounds = synced_db.bounds().await; assert_eq!(bounds.end, upper_bound); assert_eq!( synced_db.inactivity_floor_loc().await, target_db.inactivity_floor_loc().await ); assert_eq!(bounds.end, target_db.bounds().await.end); // Verify the root digest matches the target assert_eq!(synced_db.root(), verification_root); // Verify that original operations are present and correct (by key lookup) for target_op in &original_ops_data { if let Some(key) = target_op.key() { let target_value = target_db.get(key).await.unwrap(); let synced_value = synced_db.get(key).await.unwrap(); assert_eq!(target_value.is_some(), synced_value.is_some()); } } // Verify the last operation is present (if it's an update) if let Some(key) = more_ops[0].key() { let synced_value = synced_db.get(key).await.unwrap(); let target_value = target_db.get(key).await.unwrap(); assert_eq!(synced_value.is_some(), target_value.is_some()); } synced_db.destroy().await.unwrap(); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test case where existing database on disk exactly matches the sync target. /// Uses FailResolver to verify that no network requests are made since data already exists. pub(crate) fn test_sync_use_existing_db_exact_match(num_ops: usize) where resolver::tests::FailResolver, Digest>: Resolver, Digest = Digest>, OpOf: Encode + Clone + OperationTrait, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let target_ops = H::create_ops(num_ops); // Create two databases with their own configs let target_config = H::config(&context.next_u64().to_string(), &context); let mut target_db = H::init_db_with_config(context.child("target"), target_config).await; let sync_config = H::config(&context.next_u64().to_string(), &context); let client_context = context.child("client"); let mut sync_db = H::init_db_with_config(client_context.child("client"), sync_config.clone()).await; // Apply the same operations to both databases target_db = H::apply_ops(target_db, target_ops.clone()).await; sync_db = H::apply_ops(sync_db, target_ops.clone()).await; // commit already done in apply_ops // commit already done in apply_ops target_db .prune(target_db.sync_boundary().await) .await .unwrap(); sync_db.prune(sync_db.sync_boundary().await).await.unwrap(); sync_db.sync().await.unwrap(); drop(sync_db); // Capture target state let sync_root = H::sync_target_root(&target_db); let verification_root = target_db.root(); let lower_bound = target_db.sync_boundary().await; let upper_bound = target_db.bounds().await.end; // sync_db should never ask the resolver for operations // because it is already complete. Use a resolver that always fails // to ensure that it's not being used. let resolver = resolver::tests::FailResolver::, Digest>::new(); let config = Config { db_config: sync_config, // Use same config to access same partitions fetch_batch_size: NZU64!(10), target: Target { root: sync_root, range: non_empty_range!(lower_bound, upper_bound), }, context: client_context.child("sync"), resolver, apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; let synced_db: H::Db = sync::sync(config).await.unwrap(); // Verify database state let bounds = synced_db.bounds().await; assert_eq!(bounds.end, upper_bound); assert_eq!(bounds.end, target_db.bounds().await.end); assert_eq!(synced_db.sync_boundary().await, lower_bound); // Verify the root digest matches the target assert_eq!(synced_db.root(), verification_root); // Verify state matches for sample operations (via key lookup) for target_op in &target_ops { if let Some(key) = target_op.key() { let target_value = target_db.get(key).await.unwrap(); let synced_value = synced_db.get(key).await.unwrap(); assert_eq!(target_value.is_some(), synced_value.is_some()); } } synced_db.destroy().await.unwrap(); target_db.destroy().await.unwrap(); }); } /// Test that the client fails to sync if the lower bound is decreased via target update. pub(crate) fn test_target_update_lower_bound_decrease() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate target database let mut target_db = H::init_db(context.child("target")).await; let target_ops = H::create_ops(50); target_db = H::apply_ops(target_db, target_ops).await; // commit already done in apply_ops // Use inactivity_floor as range.start so we have a non-zero bound to decrement. // The engine only checks that range.start does not decrease on updates; it doesn't // require range.start to equal sync_boundary here. let initial_lower_bound = target_db.inactivity_floor_loc().await; assert!( *initial_lower_bound > 0, "test setup requires non-zero inactivity floor" ); let initial_upper_bound = target_db.bounds().await.end; let initial_root = H::sync_target_root(&target_db); // Create client with initial target let (update_sender, update_receiver) = mpsc::channel(1); let target_db = Arc::new(target_db); let config = Config { context: context.child("client"), db_config: H::config(&context.next_u64().to_string(), &context), fetch_batch_size: NZU64!(5), target: Target { root: initial_root, range: non_empty_range!(initial_lower_bound, initial_upper_bound), }, resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 10, update_rx: Some(update_receiver), finish_rx: None, reached_target_tx: None, max_retained_roots: 1, }; let client: Engine = Engine::new(config).await.unwrap(); // Send target update with decreased lower bound update_sender .send(Target { root: initial_root, range: non_empty_range!( initial_lower_bound.checked_sub(1).unwrap(), initial_upper_bound.checked_add(1).unwrap() ), }) .await .unwrap(); let result = client.step().await; assert!(matches!( result, Err(sync::Error::Engine( sync::EngineError::SyncTargetMovedBackward { .. } )) )); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test that the client fails to sync if the upper bound is decreased via target update. pub(crate) fn test_target_update_upper_bound_decrease() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate target database let mut target_db = H::init_db(context.child("target")).await; let target_ops = H::create_ops(50); target_db = H::apply_ops(target_db, target_ops).await; // commit already done in apply_ops // Capture initial target state let initial_lower_bound = target_db.sync_boundary().await; let initial_upper_bound = target_db.bounds().await.end; let initial_root = H::sync_target_root(&target_db); // Create client with initial target let (update_sender, update_receiver) = mpsc::channel(1); let target_db = Arc::new(target_db); let config = Config { context: context.child("client"), db_config: H::config(&context.next_u64().to_string(), &context), fetch_batch_size: NZU64!(5), target: Target { root: initial_root, range: non_empty_range!(initial_lower_bound, initial_upper_bound), }, resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 10, update_rx: Some(update_receiver), finish_rx: None, reached_target_tx: None, max_retained_roots: 1, }; let client: Engine = Engine::new(config).await.unwrap(); // Send target update with decreased upper bound update_sender .send(Target { root: initial_root, range: non_empty_range!( initial_lower_bound, initial_upper_bound.checked_sub(1).unwrap() ), }) .await .unwrap(); let result = client.step().await; assert!(matches!( result, Err(sync::Error::Engine( sync::EngineError::SyncTargetMovedBackward { .. } )) )); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test that the client succeeds when bounds are updated (increased). pub(crate) fn test_target_update_bounds_increase() where Arc>: Resolver, Digest = Digest>, OpOf: Encode + Clone, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate target database let mut target_db = H::init_db(context.child("target")).await; let target_ops = H::create_ops(100); target_db = H::apply_ops(target_db, target_ops).await; // commit already done in apply_ops // Capture initial target state let initial_lower_bound = target_db.sync_boundary().await; let initial_upper_bound = target_db.bounds().await.end; let initial_root = H::sync_target_root(&target_db); // Apply more operations to the target database // (use different seed to avoid key collisions) let additional_ops = H::create_ops_seeded(1, 1); let new_verification_root = { target_db = H::apply_ops(target_db, additional_ops).await; // commit already done in apply_ops // Capture new target state let new_lower_bound = target_db.sync_boundary().await; let new_upper_bound = target_db.bounds().await.end; let new_sync_root = H::sync_target_root(&target_db); let new_verification_root = target_db.root(); // Create client with placeholder initial target (stale compared to final target) let (update_sender, update_receiver) = mpsc::channel(1); let target_db = Arc::new(target_db); let config = Config { context: context.child("client"), db_config: H::config(&context.next_u64().to_string(), &context), fetch_batch_size: NZU64!(1), target: Target { root: initial_root, range: non_empty_range!(initial_lower_bound, initial_upper_bound), }, resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: Some(update_receiver), finish_rx: None, reached_target_tx: None, max_retained_roots: 1, }; // Send target update with increased bounds update_sender .send(Target { root: new_sync_root, range: non_empty_range!(new_lower_bound, new_upper_bound), }) .await .unwrap(); // Complete the sync let synced_db: H::Db = sync::sync(config).await.unwrap(); // Verify the synced database has the expected final state assert_eq!(synced_db.root(), new_verification_root); assert_eq!(synced_db.bounds().await.end, new_upper_bound); assert_eq!(synced_db.sync_boundary().await, new_lower_bound); synced_db.destroy().await.unwrap(); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); new_verification_root }; let _ = new_verification_root; // Silence unused variable warning }); } /// Test that target updates can be sent even after the client is done (no panic). pub(crate) fn test_target_update_on_done_client() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate target database let mut target_db = H::init_db(context.child("target")).await; let target_ops = H::create_ops(10); target_db = H::apply_ops(target_db, target_ops).await; // commit already done in apply_ops // Capture target state let lower_bound = target_db.sync_boundary().await; let upper_bound = target_db.bounds().await.end; let sync_root = H::sync_target_root(&target_db); let verification_root = target_db.root(); // Create client with target that will complete immediately let (update_sender, update_receiver) = mpsc::channel(1); let target_db = Arc::new(target_db); let config = Config { context: context.child("client"), db_config: H::config(&context.next_u64().to_string(), &context), fetch_batch_size: NZU64!(20), target: Target { root: sync_root, range: non_empty_range!(lower_bound, upper_bound), }, resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 10, update_rx: Some(update_receiver), finish_rx: None, reached_target_tx: None, max_retained_roots: 1, }; // Complete the sync let synced_db: H::Db = sync::sync(config).await.unwrap(); // Attempt to apply a target update after sync is complete to verify // we don't panic let _ = update_sender .send(Target { // Dummy target update root: Digest::from([2u8; 32]), range: non_empty_range!(lower_bound + 1, upper_bound + 1), }) .await; // Verify the synced database has the expected state assert_eq!(synced_db.root(), verification_root); assert_eq!(synced_db.bounds().await.end, upper_bound); assert_eq!(synced_db.sync_boundary().await, lower_bound); synced_db.destroy().await.unwrap(); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test that prune-only target updates are rejected as backward target movement. pub(crate) fn test_target_update_prune_only_rejected() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let mut target_db = H::init_db(context.child("target")).await; target_db = H::apply_ops(target_db, H::create_ops(50)).await; let initial_lower_bound = target_db.inactivity_floor_loc().await; assert!( *initial_lower_bound > 1, "test setup requires lower bound that can advance twice" ); let upper_bound = target_db.bounds().await.end; let root = H::sync_target_root(&target_db); let (update_sender, update_receiver) = mpsc::channel(2); let target_db = Arc::new(target_db); let config = Config { context: context.child("client"), db_config: H::config(&context.next_u64().to_string(), &context), fetch_batch_size: NZU64!(5), target: Target { root, range: non_empty_range!(initial_lower_bound, upper_bound), }, resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 10, update_rx: Some(update_receiver), finish_rx: None, reached_target_tx: None, max_retained_roots: 1, }; let client: Engine = Engine::new(config).await.unwrap(); let first_target = Target { root, range: non_empty_range!(initial_lower_bound.checked_add(1).unwrap(), upper_bound), }; let second_target = Target { root, range: non_empty_range!(initial_lower_bound.checked_add(2).unwrap(), upper_bound), }; update_sender.send(first_target).await.unwrap(); update_sender.send(second_target).await.unwrap(); let result = client.step().await; assert!(matches!( result, Err(sync::Error::Engine( sync::EngineError::SyncTargetMovedBackward { .. } )) )); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test that explicit finish control waits for a finish signal even after reaching target. pub(crate) fn test_sync_waits_for_explicit_finish() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let mut target_db = H::init_db(context.child("target")).await; target_db = H::apply_ops(target_db, H::create_ops(10)).await; let initial_target = Target { root: H::sync_target_root(&target_db), range: non_empty_range!( target_db.sync_boundary().await, target_db.bounds().await.end ), }; target_db = H::apply_ops(target_db, H::create_ops_seeded(5, 1)).await; let updated_lower_bound = target_db.sync_boundary().await; let updated_upper_bound = target_db.bounds().await.end; let updated_target = Target { root: H::sync_target_root(&target_db), range: non_empty_range!(updated_lower_bound, updated_upper_bound), }; let updated_verification_root = target_db.root(); let (update_sender, update_receiver) = mpsc::channel(1); let (finish_sender, finish_receiver) = mpsc::channel(1); let (reached_sender, mut reached_receiver) = mpsc::channel(1); let target_db = Arc::new(target_db); let config = Config { context: context.child("client"), db_config: H::config(&context.next_u64().to_string(), &context), fetch_batch_size: NZU64!(10), target: initial_target.clone(), resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: Some(update_receiver), finish_rx: Some(finish_receiver), reached_target_tx: Some(reached_sender), max_retained_roots: 0, }; let sync_handle = sync::sync(config); pin_mut!(sync_handle); select! { _ = sync_handle.as_mut() => { panic!("sync completed before explicit finish signal"); }, reached = reached_receiver.recv() => { let reached = reached.expect("engine should report reached-target before finish"); assert_eq!(reached, initial_target); }, } assert!( sync_handle.as_mut().now_or_never().is_none(), "sync must wait for explicit finish signal after reaching target" ); update_sender .send(updated_target.clone()) .await .expect("target update channel should be open"); select! { _ = sync_handle.as_mut() => { panic!("sync completed before explicit finish signal for updated target"); }, reached = reached_receiver.recv() => { let reached = reached.expect("engine should report updated target before finish"); assert_eq!(reached, updated_target); }, } assert!( sync_handle.as_mut().now_or_never().is_none(), "sync must still wait for explicit finish signal after updated target is reached" ); finish_sender .send(()) .await .expect("finish signal channel should be open"); let synced_db: H::Db = sync_handle .await .expect("sync should succeed after finish signal"); assert_eq!(synced_db.root(), updated_verification_root); assert_eq!(synced_db.bounds().await.end, updated_upper_bound); assert_eq!(synced_db.sync_boundary().await, updated_lower_bound); synced_db.destroy().await.unwrap(); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } async fn wait_for_reached_progress( context: deterministic::Context, target: &Target, ) { let target_end = *target.range.end(); let journal_size = format!("client_sync_journal_size {target_end}"); let target_end = format!("client_sync_target_end {target_end}"); loop { let metrics = context.encode(); if metrics.contains(&journal_size) && metrics.contains(&target_end) { return; } context.sleep(Duration::from_millis(1)).await; } } /// Test progress metrics for reached targets across target updates and explicit finish. pub(crate) fn test_sync_reports_progress_for_reached_targets_before_explicit_finish< H: SyncTestHarness, >() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let mut target_db = H::init_db(context.child("target")).await; target_db = H::apply_ops(target_db, H::create_ops(8)).await; let initial_target = Target { root: H::sync_target_root(&target_db), range: non_empty_range!( target_db.sync_boundary().await, target_db.bounds().await.end ), }; target_db = H::apply_ops(target_db, H::create_ops_seeded(5, 1)).await; let first_update = Target { root: H::sync_target_root(&target_db), range: non_empty_range!( target_db.sync_boundary().await, target_db.bounds().await.end ), }; target_db = H::apply_ops(target_db, H::create_ops_seeded(5, 2)).await; let second_update = Target { root: H::sync_target_root(&target_db), range: non_empty_range!( target_db.sync_boundary().await, target_db.bounds().await.end ), }; let final_root = target_db.root(); let (update_sender, update_receiver) = mpsc::channel(1); let (finish_sender, finish_receiver) = mpsc::channel(1); let target_db = Arc::new(target_db); let config = Config { context: context.child("client"), db_config: H::config(&context.next_u64().to_string(), &context), fetch_batch_size: NZU64!(2), target: initial_target.clone(), resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: Some(update_receiver), finish_rx: Some(finish_receiver), reached_target_tx: None, max_retained_roots: 1, }; let sync_handle = sync::sync(config); pin_mut!(sync_handle); select! { _ = sync_handle.as_mut() => { panic!("sync completed before explicit finish signal"); }, _ = wait_for_reached_progress(context.child("storage"), &initial_target) => {}, } assert!( sync_handle.as_mut().now_or_never().is_none(), "sync must wait for a target update or explicit finish after reaching the initial target" ); update_sender .send(first_update.clone()) .await .expect("target update channel should be open"); select! { _ = sync_handle.as_mut() => { panic!("sync completed before explicit finish signal after first update"); }, _ = wait_for_reached_progress(context.child("storage"), &first_update) => {}, } assert!( sync_handle.as_mut().now_or_never().is_none(), "sync must wait for another update or explicit finish after reaching the first update" ); update_sender .send(second_update.clone()) .await .expect("target update channel should be open"); select! { _ = sync_handle.as_mut() => { panic!("sync completed before explicit finish signal after second update"); }, _ = wait_for_reached_progress(context.child("storage"), &second_update) => {}, } assert!( sync_handle.as_mut().now_or_never().is_none(), "sync must wait for explicit finish after reporting final progress" ); finish_sender .send(()) .await .expect("finish signal channel should be open"); let synced_db: H::Db = sync_handle .await .expect("sync should succeed after finish signal"); assert_eq!(synced_db.root(), final_root); assert_eq!(synced_db.bounds().await.end, *second_update.range.end()); assert_eq!(synced_db.sync_boundary().await, *second_update.range.start()); synced_db.destroy().await.unwrap(); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test that a finish signal received before target completion still allows full sync. pub(crate) fn test_sync_handles_early_finish_signal() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let mut target_db = H::init_db(context.child("target")).await; target_db = H::apply_ops(target_db, H::create_ops(30)).await; let lower_bound = target_db.sync_boundary().await; let upper_bound = target_db.bounds().await.end; let target = Target { root: H::sync_target_root(&target_db), range: non_empty_range!(lower_bound, upper_bound), }; let verification_root = target_db.root(); let (finish_sender, finish_receiver) = mpsc::channel(1); let (reached_sender, mut reached_receiver) = mpsc::channel(1); finish_sender .send(()) .await .expect("finish signal channel should be open"); let target_db = Arc::new(target_db); let config = Config { context: context.child("client"), db_config: H::config(&context.next_u64().to_string(), &context), fetch_batch_size: NZU64!(3), target: target.clone(), resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: Some(finish_receiver), reached_target_tx: Some(reached_sender), max_retained_roots: 1, }; let synced_db: H::Db = sync::sync(config) .await .expect("sync should complete after early finish signal"); let reached = reached_receiver .recv() .await .expect("engine should report reached-target"); assert_eq!(reached, target); assert_eq!(synced_db.root(), verification_root); assert_eq!(synced_db.bounds().await.end, upper_bound); assert_eq!(synced_db.sync_boundary().await, lower_bound); synced_db.destroy().await.unwrap(); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test that dropping finish sender without sending is treated as an error. pub(crate) fn test_sync_fails_when_finish_sender_dropped() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let mut target_db = H::init_db(context.child("target")).await; target_db = H::apply_ops(target_db, H::create_ops(10)).await; let lower_bound = target_db.sync_boundary().await; let upper_bound = target_db.bounds().await.end; let (finish_sender, finish_receiver) = mpsc::channel(1); drop(finish_sender); let target_db = Arc::new(target_db); let config = Config { context: context.child("client"), db_config: H::config(&context.next_u64().to_string(), &context), fetch_batch_size: NZU64!(5), target: Target { root: H::sync_target_root(&target_db), range: non_empty_range!(lower_bound, upper_bound), }, resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: Some(finish_receiver), reached_target_tx: None, max_retained_roots: 1, }; let result: Result = sync::sync(config).await; assert!(matches!( result, Err(sync::Error::Engine(sync::EngineError::FinishChannelClosed)) )); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test that dropping reached-target receiver does not fail sync. pub(crate) fn test_sync_allows_dropped_reached_target_receiver() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let mut target_db = H::init_db(context.child("target")).await; target_db = H::apply_ops(target_db, H::create_ops(10)).await; let lower_bound = target_db.sync_boundary().await; let upper_bound = target_db.bounds().await.end; let verification_root = target_db.root(); let (reached_sender, reached_receiver) = mpsc::channel(1); drop(reached_receiver); let target_db = Arc::new(target_db); let config = Config { context: context.child("client"), db_config: H::config(&context.next_u64().to_string(), &context), fetch_batch_size: NZU64!(5), target: Target { root: H::sync_target_root(&target_db), range: non_empty_range!(lower_bound, upper_bound), }, resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: Some(reached_sender), max_retained_roots: 1, }; let synced_db: H::Db = sync::sync(config) .await .expect("sync should succeed when reached-target receiver is dropped"); assert_eq!(synced_db.root(), verification_root); assert_eq!(synced_db.bounds().await.end, upper_bound); assert_eq!(synced_db.sync_boundary().await, lower_bound); synced_db.destroy().await.unwrap(); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test that the client can handle target updates during sync execution. pub(crate) fn test_target_update_during_sync( initial_ops: usize, additional_ops: usize, ) where Arc>>>: Resolver, Digest = Digest>, OpOf: Encode + Clone, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate target database with initial operations let mut target_db = H::init_db(context.child("target")).await; let target_ops = H::create_ops(initial_ops); target_db = H::apply_ops(target_db, target_ops).await; // commit already done in apply_ops // Capture initial target state let initial_lower_bound = target_db.sync_boundary().await; let initial_upper_bound = target_db.bounds().await.end; let initial_sync_root = H::sync_target_root(&target_db); // Wrap target database for shared mutable access (using Option so we can take ownership) let target_db = Arc::new(AsyncRwLock::new(Some(target_db))); // Create client with initial target and small batch size let (update_sender, update_receiver) = mpsc::channel(1); // Step the client to process a batch let client = { let config = Config { context: context.child("client"), db_config: H::config(&context.next_u64().to_string(), &context), target: Target { root: initial_sync_root, range: non_empty_range!(initial_lower_bound, initial_upper_bound), }, resolver: target_db.clone(), fetch_batch_size: NZU64!(1), // Small batch size so we don't finish after one batch max_outstanding_requests: 10, apply_batch_size: 1024, update_rx: Some(update_receiver), finish_rx: None, reached_target_tx: None, max_retained_roots: 1, }; let mut client: Engine = Engine::new(config).await.unwrap(); loop { // Step the client until we have processed a batch of operations client = match client.step().await.unwrap() { NextStep::Continue(new_client) => new_client, NextStep::Complete(_) => panic!("client should not be complete"), }; let log_size = client.journal().size().await; if log_size > initial_lower_bound { break client; } } }; // Modify the target database by adding more operations // (use different seed to avoid key collisions) let additional_ops_data = H::create_ops_seeded(additional_ops, 1); let new_verification_root = { let mut db_guard = target_db.write().await; let db = db_guard.take().unwrap(); let db = H::apply_ops(db, additional_ops_data).await; // Capture new target state let new_lower_bound = db.sync_boundary().await; let new_upper_bound = db.bounds().await.end; let new_sync_root = H::sync_target_root(&db); let new_verification_root = db.root(); *db_guard = Some(db); // Send target update with new target update_sender .send(Target { root: new_sync_root, range: non_empty_range!(new_lower_bound, new_upper_bound), }) .await .unwrap(); new_verification_root }; // Complete the sync let synced_db = client.sync().await.unwrap(); // Verify the synced database has the expected final state assert_eq!(synced_db.root(), new_verification_root); // Verify the target database matches the synced database let target_db = Arc::try_unwrap(target_db).map_or_else( |_| panic!("Failed to unwrap Arc - still has references"), |rw_lock| rw_lock.into_inner().expect("db should be present"), ); { let synced_bounds = synced_db.bounds().await; let target_bounds = target_db.bounds().await; assert_eq!(synced_bounds.end, target_bounds.end); assert_eq!( synced_db.inactivity_floor_loc().await, target_db.inactivity_floor_loc().await ); assert_eq!(synced_db.root(), target_db.root()); } synced_db.destroy().await.unwrap(); target_db.destroy().await.unwrap(); }); } /// Test demonstrating that a synced database can be reopened and retain its state. pub(crate) fn test_sync_database_persistence() where Arc>: Resolver, Digest = Digest>, OpOf: Encode + Clone, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate a simple target database let mut target_db = H::init_db(context.child("target")).await; let target_ops = H::create_ops(10); target_db = H::apply_ops(target_db, target_ops).await; // commit already done in apply_ops // Capture target state let sync_root = H::sync_target_root(&target_db); let verification_root = target_db.root(); let lower_bound = target_db.sync_boundary().await; let upper_bound = target_db.bounds().await.end; // Perform sync let db_config = H::config(&context.next_u64().to_string(), &context); let client_context = context.child("client"); let target_db = Arc::new(target_db); let config = Config { db_config: db_config.clone(), fetch_batch_size: NZU64!(5), target: Target { root: sync_root, range: non_empty_range!(lower_bound, upper_bound), }, context: client_context.child("client"), resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; let synced_db: H::Db = sync::sync(config).await.unwrap(); // Verify initial sync worked assert_eq!(synced_db.root(), verification_root); // Save state before dropping let expected_root = synced_db.root(); let expected_op_count = synced_db.bounds().await.end; let expected_inactivity_floor_loc = synced_db.inactivity_floor_loc().await; // Re-open the database drop(synced_db); let reopened_db = H::init_db_with_config(client_context.child("reopened"), db_config).await; // Verify the state is unchanged assert_eq!(reopened_db.root(), expected_root); assert_eq!(reopened_db.bounds().await.end, expected_op_count); assert_eq!( reopened_db.inactivity_floor_loc().await, expected_inactivity_floor_loc ); // Cleanup Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); reopened_db.destroy().await.unwrap(); }); } /// Test post-sync usability: after syncing, the database supports normal operations. pub(crate) fn test_sync_post_sync_usability() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let mut target_db = H::init_db(context.child("target")).await; let target_ops = H::create_ops(50); target_db = H::apply_ops(target_db, target_ops).await; let sync_root = H::sync_target_root(&target_db); let lower_bound = target_db.sync_boundary().await; let upper_bound = target_db.bounds().await.end; let target_db = Arc::new(target_db); let config = H::config(&context.next_u64().to_string(), &context); let config = Config { db_config: config, fetch_batch_size: NZU64!(100), target: Target { root: sync_root, range: non_empty_range!(lower_bound, upper_bound), }, context: context.child("client"), resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; let synced_db: H::Db = sync::sync(config).await.unwrap(); let root_after_sync = synced_db.root(); // Apply additional operations after sync. let more_ops = H::create_ops_seeded(10, 1); let synced_db = H::apply_ops(synced_db, more_ops).await; // Root should change after applying more ops. assert_ne!(synced_db.root(), root_after_sync); assert!(synced_db.bounds().await.end > upper_bound); synced_db.destroy().await.unwrap(); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test `from_sync_result` where the database has all operations in the target range. pub(crate) fn test_from_sync_result_nonempty_to_nonempty_exact_match() where DbOf: FromSyncTestable, OpOf: Encode + Clone, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let db_config = H::config(&context.next_u64().to_string(), &context); let mut db = H::init_db_with_config(context.child("source"), db_config.clone()).await; let ops = H::create_ops(100); db = H::apply_ops(db, ops).await; // commit already done in apply_ops let sync_lower_bound = db.sync_boundary().await; let bounds = db.bounds().await; let sync_upper_bound = bounds.end; let target_db_op_count = bounds.end; let target_db_inactivity_floor_loc = db.inactivity_floor_loc().await; let pinned_nodes = db.pinned_nodes_at(sync_lower_bound).await; let (_, journal) = db.into_log_components(); let sync_db: DbOf = as qmdb::sync::Database>::from_sync_result( context.child("synced"), db_config, journal, Some(pinned_nodes), non_empty_range!(sync_lower_bound, sync_upper_bound), 1024, ) .await .unwrap(); // Verify database state assert_eq!(sync_db.bounds().await.end, target_db_op_count); assert_eq!( sync_db.inactivity_floor_loc().await, target_db_inactivity_floor_loc ); assert_eq!(sync_db.sync_boundary().await, sync_lower_bound); sync_db.destroy().await.unwrap(); }); } /// Test `from_sync_result` where the database has some but not all operations in the target range. pub(crate) fn test_from_sync_result_nonempty_to_nonempty_partial_match() where DbOf: FromSyncTestable, OpOf: Encode + Clone, JournalOf: Contiguous, { const NUM_OPS: usize = 100; const NUM_ADDITIONAL_OPS: usize = 5; let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate two databases. let mut target_db = H::init_db(context.child("target")).await; let sync_db_config = H::config(&context.next_u64().to_string(), &context); let client_context = context.child("client"); let mut sync_db = H::init_db_with_config(client_context.child("client"), sync_db_config.clone()).await; let original_ops = H::create_ops(NUM_OPS); target_db = H::apply_ops(target_db, original_ops.clone()).await; // commit already done in apply_ops target_db .prune(target_db.sync_boundary().await) .await .unwrap(); sync_db = H::apply_ops(sync_db, original_ops.clone()).await; // commit already done in apply_ops sync_db.prune(sync_db.sync_boundary().await).await.unwrap(); sync_db.sync().await.unwrap(); drop(sync_db); // Add more operations to the target db // (use different seed to avoid key collisions) let more_ops = H::create_ops_seeded(NUM_ADDITIONAL_OPS, 1); target_db = H::apply_ops(target_db, more_ops).await; // commit already done in apply_ops // Capture target db state for comparison let bounds = target_db.bounds().await; let target_db_op_count = bounds.end; let target_db_inactivity_floor_loc = target_db.inactivity_floor_loc().await; let sync_lower_bound = target_db.sync_boundary().await; let sync_upper_bound = bounds.end; let target_hash = target_db.root(); // Get pinned nodes at the sync lower bound from the target db (which has all the data). let pinned_nodes = target_db.pinned_nodes_at(sync_lower_bound).await; let (mmr, journal) = target_db.into_log_components(); // Re-open `sync_db` using from_sync_result let sync_db: DbOf = as qmdb::sync::Database>::from_sync_result( client_context.child("synced"), sync_db_config, journal, Some(pinned_nodes), non_empty_range!(sync_lower_bound, sync_upper_bound), 1024, ) .await .unwrap(); // Verify database state assert_eq!(sync_db.bounds().await.end, target_db_op_count); assert_eq!( sync_db.inactivity_floor_loc().await, target_db_inactivity_floor_loc ); assert_eq!(sync_db.sync_boundary().await, sync_lower_bound); // Verify the root digest matches the target (verifies content integrity) assert_eq!(sync_db.root(), target_hash); sync_db.destroy().await.unwrap(); mmr.destroy().await.unwrap(); }); } /// Test `from_sync_result` with an empty destination database syncing to a non-empty source. /// This tests the scenario where a sync client starts fresh with no existing data. pub(crate) fn test_from_sync_result_empty_to_nonempty() where DbOf: FromSyncTestable, OpOf: Encode + Clone, JournalOf: Contiguous, { const NUM_OPS: usize = 100; let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate a source database let mut source_db = H::init_db(context.child("source")).await; let ops = H::create_ops(NUM_OPS); source_db = H::apply_ops(source_db, ops).await; // commit already done in apply_ops source_db .prune(source_db.sync_boundary().await) .await .unwrap(); let lower_bound = source_db.sync_boundary().await; let upper_bound = source_db.bounds().await.end; // Get pinned nodes and target hash before deconstructing source_db let pinned_nodes = source_db.pinned_nodes_at(lower_bound).await; let target_hash = source_db.root(); let target_op_count = source_db.bounds().await.end; let target_inactivity_floor = source_db.inactivity_floor_loc().await; let (mmr, journal) = source_db.into_log_components(); // Use a different config (simulating a new empty database) let new_db_config = H::config(&context.next_u64().to_string(), &context); let db: DbOf = as qmdb::sync::Database>::from_sync_result( context.child("synced"), new_db_config, journal, Some(pinned_nodes), non_empty_range!(lower_bound, upper_bound), 1024, ) .await .unwrap(); // Verify database state assert_eq!(db.bounds().await.end, target_op_count); assert_eq!(db.inactivity_floor_loc().await, target_inactivity_floor); assert_eq!(db.sync_boundary().await, lower_bound); // Verify the root digest matches the target assert_eq!(db.root(), target_hash); db.destroy().await.unwrap(); mmr.destroy().await.unwrap(); }); } /// Test `from_sync_result` with an empty source database syncing to an empty target database. pub(crate) fn test_from_sync_result_empty_to_empty() where DbOf: FromSyncTestable, OpOf: Encode + Clone, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create an empty database (initialized with a single CommitFloor operation) let source_db = H::init_db(context.child("source")).await; // An empty database has exactly 1 operation (the initial CommitFloor) assert_eq!(source_db.bounds().await.end, Location::new(1)); let target_hash = source_db.root(); let (mmr, journal) = source_db.into_log_components(); // Use a different config (simulating a new empty database) let new_db_config = H::config(&context.next_u64().to_string(), &context); let mut synced_db: DbOf = as qmdb::sync::Database>::from_sync_result( context.child("synced"), new_db_config, journal, None, non_empty_range!(Location::new(0), Location::new(1)), 1024, ) .await .unwrap(); // Verify database state assert_eq!(synced_db.bounds().await.end, Location::new(1)); assert_eq!(synced_db.inactivity_floor_loc().await, Location::new(0)); assert_eq!(synced_db.root(), target_hash); // Test that we can perform operations on the synced database let ops = H::create_ops(10); synced_db = H::apply_ops(synced_db, ops).await; // Verify the operations worked assert!(synced_db.bounds().await.end > Location::new(1)); synced_db.destroy().await.unwrap(); mmr.destroy().await.unwrap(); }); } /// A resolver wrapper that corrupts pinned nodes on the first request, then returns correct /// data on subsequent requests. #[derive(Clone)] struct CorruptFirstPinnedNodesResolver { inner: R, corrupted: Arc, } impl Resolver for CorruptFirstPinnedNodesResolver where R: Resolver, { type Family = R::Family; type Digest = Digest; type Op = R::Op; type Error = R::Error; async fn get_operations( &self, op_count: Location, start_loc: Location, max_ops: NonZeroU64, include_pinned_nodes: bool, cancel_rx: oneshot::Receiver<()>, ) -> Result, Self::Error> { let mut result = self .inner .get_operations( op_count, start_loc, max_ops, include_pinned_nodes, cancel_rx, ) .await?; // Corrupt pinned nodes only on the first request that includes them. if result.pinned_nodes.is_some() && !self .corrupted .swap(true, std::sync::atomic::Ordering::Relaxed) { if let Some(ref mut nodes) = result.pinned_nodes { if !nodes.is_empty() { nodes[0] = Digest::from([0xFFu8; 32]); } } } Ok(result) } } /// Test that corrupted pinned nodes on the first attempt are rejected and the sync /// succeeds on retry when the resolver returns correct data. pub(crate) fn test_sync_retries_bad_pinned_nodes() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Build a target database with some operations and prune so that pinned nodes are needed. let mut target_db = H::init_db(context.child("target")).await; let ops = H::create_ops(20); target_db = H::apply_ops(target_db, ops).await; target_db .prune(target_db.sync_boundary().await) .await .unwrap(); let sync_root = H::sync_target_root(&target_db); let lower_bound = target_db.sync_boundary().await; let upper_bound = target_db.bounds().await.end; let db_config = H::config(&context.next_u64().to_string(), &context); let resolver = CorruptFirstPinnedNodesResolver { inner: Arc::new(target_db), corrupted: Arc::new(std::sync::atomic::AtomicBool::new(false)), }; let config = sync::engine::Config { db_config, fetch_batch_size: NZU64!(100), target: Target { root: sync_root, range: non_empty_range!(lower_bound, upper_bound), }, context: context.child("client"), resolver, apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; // Sync should succeed on the second attempt after the first corrupted pinned nodes // are rejected. let synced_db: H::Db = sync::sync(config).await.unwrap(); assert_eq!(synced_db.root(), sync_root); synced_db.destroy().await.unwrap(); }); } /// A resolver wrapper that replays the first fresh boundary request against the retained /// historical root, then blocks the retry until the test releases it. #[derive(Clone)] struct ReplayFreshBoundaryResolver> { inner: R, historical_target_size: Location, boundary_start: Location, release_historical_gap: Arc>>>, release_boundary_retry: Arc>>>, boundary_attempts: Arc, } impl> Resolver for ReplayFreshBoundaryResolver { type Family = R::Family; type Digest = Digest; type Op = R::Op; type Error = R::Error; async fn get_operations( &self, op_count: Location, start_loc: Location, max_ops: NonZeroU64, include_pinned_nodes: bool, cancel_rx: oneshot::Receiver<()>, ) -> Result, Self::Error> { if op_count == self.historical_target_size { if include_pinned_nodes { let _ = cancel_rx.await; return self .inner .get_operations( op_count, start_loc, max_ops, include_pinned_nodes, oneshot::channel().1, ) .await; } let release = self.release_historical_gap.lock().take(); if let Some(release) = release { let _ = release.await; } } if include_pinned_nodes && start_loc == self.boundary_start { let attempt = self.boundary_attempts.fetch_add(1, Ordering::Relaxed); if attempt == 0 { let mut result = self .inner .get_operations( self.historical_target_size, start_loc, max_ops, false, oneshot::channel().1, ) .await?; result.pinned_nodes = None; return Ok(result); } let release = self.release_boundary_retry.lock().take(); if let Some(release) = release { let _ = release.await; } } self.inner .get_operations( op_count, start_loc, max_ops, include_pinned_nodes, cancel_rx, ) .await } } /// Test that reaching the journal target does not report completion while the pruned /// boundary retry is still outstanding. pub(crate) fn test_sync_waits_for_boundary_retry_after_target_update() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let mut target_db = H::init_db(context.child("target")).await; let mut seed = 0; loop { target_db = H::apply_ops(target_db, H::create_ops_seeded(32, seed)).await; target_db .prune(target_db.sync_boundary().await) .await .unwrap(); if target_db.inactivity_floor_loc().await > Location::new(0) { break; } seed += 1; assert!(seed < 8, "expected prune floor to advance"); } let old_target = Target { root: H::sync_target_root(&target_db), range: non_empty_range!( target_db.inactivity_floor_loc().await, target_db.bounds().await.end ), }; target_db = H::apply_ops(target_db, H::create_ops_seeded(3, seed + 1)).await; let new_target = Target { root: H::sync_target_root(&target_db), range: non_empty_range!( target_db.inactivity_floor_loc().await, target_db.bounds().await.end ), }; let verification_root = target_db.root(); assert!(old_target.range.start() > Location::new(0)); assert!(new_target.range.end() > old_target.range.end()); let (release_historical_gap_tx, release_historical_gap_rx) = oneshot::channel(); let (release_boundary_retry_tx, release_boundary_retry_rx) = oneshot::channel(); let target_db = Arc::new(target_db); let resolver = ReplayFreshBoundaryResolver { inner: target_db.clone(), historical_target_size: old_target.range.end(), boundary_start: new_target.range.start(), release_historical_gap: Arc::new(Mutex::new(Some(release_historical_gap_rx))), release_boundary_retry: Arc::new(Mutex::new(Some(release_boundary_retry_rx))), boundary_attempts: Arc::new(AtomicUsize::new(0)), }; let (update_sender, update_receiver) = mpsc::channel(1); let (finish_sender, finish_receiver) = mpsc::channel(1); let (reached_sender, mut reached_receiver) = mpsc::channel(1); let config = Config { context: context.child("client"), db_config: H::config(&context.next_u64().to_string(), &context), fetch_batch_size: NZU64!(1), target: old_target.clone(), resolver, apply_batch_size: 1024, max_outstanding_requests: 2, update_rx: Some(update_receiver), finish_rx: Some(finish_receiver), reached_target_tx: Some(reached_sender), max_retained_roots: 1, }; let mut engine: Engine = Engine::new(config).await.unwrap(); update_sender.send(new_target.clone()).await.unwrap(); finish_sender.send(()).await.unwrap(); engine = match engine.step().await.unwrap() { NextStep::Continue(engine) => engine, NextStep::Complete(_) => panic!("target update should not complete sync"), }; let _ = release_historical_gap_tx.send(()); let journal_start = engine.journal().size().await; for step_idx in 0..4 { let next_step = engine.step(); pin_mut!(next_step); select! { result = next_step.as_mut() => { engine = match result.unwrap() { NextStep::Continue(engine) => engine, NextStep::Complete(_) => panic!("boundary retry should still be required"), }; assert_eq!( engine.journal().size().await, journal_start, "replayed fresh boundary responses must not advance the journal" ); }, _ = context.sleep(Duration::from_millis(100)) => { panic!( "engine should keep processing fetch results while the boundary retry is blocked: step={step_idx}" ); }, } } assert!( reached_receiver.recv().now_or_never().is_none(), "engine should not report reached-target while boundary state is missing" ); let _ = release_boundary_retry_tx.send(()); let synced_db = engine.sync().await.unwrap(); let reached = reached_receiver.recv().await.unwrap(); assert_eq!(reached, new_target); assert_eq!(synced_db.root(), verification_root); synced_db.destroy().await.unwrap(); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } mod harnesses { use super::SyncTestHarness; use crate::{ merkle::{self, mmb}, qmdb::any::value::VariableEncoding, translator::TwoCap, }; use commonware_cryptography::sha256::Digest; use commonware_math::algebra::Random; use commonware_runtime::{deterministic::Context, BufferPooler}; use commonware_utils::test_rng_seeded; use rand::RngCore; // ===== Family-generic op creation helpers ===== // // `Operation` is phantom in F for Update/Delete variants, so ops // are structurally identical across families. fn create_ordered_fixed_ops( n: usize, seed: u64, ) -> Vec> { use crate::qmdb::any::operation::{update::Ordered as Update, Operation}; let mut rng = test_rng_seeded(seed); let mut prev_key = Digest::random(&mut rng); let mut ops = Vec::new(); for i in 0..n { if i % 10 == 0 && i > 0 { ops.push(Operation::Delete(prev_key)); } else { let key = Digest::random(&mut rng); let next_key = Digest::random(&mut rng); let value = Digest::random(&mut rng); ops.push(Operation::Update(Update { key, value, next_key, })); prev_key = key; } } ops } fn create_unordered_fixed_ops( n: usize, seed: u64, ) -> Vec> { use crate::qmdb::any::operation::{update::Unordered as Update, Operation}; let mut rng = test_rng_seeded(seed); let mut prev_key = Digest::random(&mut rng); let mut ops = Vec::new(); for i in 0..n { if i % 10 == 0 && i > 0 { ops.push(Operation::Delete(prev_key)); } else { let key = Digest::random(&mut rng); let value = Digest::random(&mut rng); ops.push(Operation::Update(Update(key, value))); prev_key = key; } } ops } fn create_ordered_variable_ops( n: usize, seed: u64, ) -> Vec>> { use crate::qmdb::any::operation::{update::Ordered as Update, Operation}; let mut rng = test_rng_seeded(seed); let mut prev_key = Digest::random(&mut rng); let mut ops = Vec::new(); for i in 0..n { if i % 10 == 0 && i > 0 { ops.push(Operation::Delete(prev_key)); } else { let key = Digest::random(&mut rng); let next_key = Digest::random(&mut rng); let len = ((rng.next_u64() % 13) + 7) as usize; let value = vec![(rng.next_u64() % 255) as u8; len]; ops.push(Operation::Update(Update { key, value, next_key, })); prev_key = key; } } ops } fn create_unordered_variable_ops( n: usize, seed: u64, ) -> Vec>> { use crate::qmdb::any::operation::{update::Unordered as Update, Operation}; let mut rng = test_rng_seeded(seed); let mut prev_key = Digest::random(&mut rng); let mut ops = Vec::new(); for i in 0..n { if i % 10 == 0 && i > 0 { ops.push(Operation::Delete(prev_key)); } else { let key = Digest::random(&mut rng); let len = ((rng.next_u64() % 13) + 7) as usize; let value = vec![(rng.next_u64() % 255) as u8; len]; ops.push(Operation::Update(Update(key, value))); prev_key = key; } } ops } // ===== MMR harnesses (existing, unchanged) ===== // ----- Ordered/Fixed ----- pub struct OrderedFixedHarness; impl SyncTestHarness for OrderedFixedHarness { type Family = crate::mmr::Family; type Db = crate::qmdb::any::ordered::fixed::test::AnyTest; fn sync_target_root(db: &Self::Db) -> Digest { db.root() } fn config( suffix: &str, pooler: &impl BufferPooler, ) -> crate::qmdb::any::FixedConfig { crate::qmdb::any::test::fixed_db_config::<_>(suffix, pooler) } fn create_ops( n: usize, ) -> Vec> { crate::qmdb::any::ordered::fixed::test::create_test_ops(n) } fn create_ops_seeded( n: usize, seed: u64, ) -> Vec> { crate::qmdb::any::ordered::fixed::test::create_test_ops_seeded(n, seed) } async fn init_db(ctx: Context) -> Self::Db { crate::qmdb::any::ordered::fixed::test::create_test_db(ctx).await } async fn init_db_with_config( ctx: Context, config: crate::qmdb::any::FixedConfig, ) -> Self::Db { Self::Db::init(ctx, config).await.unwrap() } async fn apply_ops( mut db: Self::Db, ops: Vec< crate::qmdb::any::ordered::fixed::Operation, >, ) -> Self::Db { crate::qmdb::any::ordered::fixed::test::apply_ops(&mut db, ops).await; let merkleized = db.new_batch().merkleize(&db, None::).await.unwrap(); db.apply_batch(merkleized).await.unwrap(); db } } // ----- Ordered/Variable ----- pub struct OrderedVariableHarness; impl SyncTestHarness for OrderedVariableHarness { type Family = crate::mmr::Family; type Db = crate::qmdb::any::ordered::variable::test::AnyTest; fn sync_target_root(db: &Self::Db) -> Digest { db.root() } fn config( suffix: &str, pooler: &impl BufferPooler, ) -> crate::qmdb::any::ordered::variable::test::VarConfig { crate::qmdb::any::ordered::variable::test::create_test_config( suffix.parse().unwrap_or(0), pooler, ) } fn create_ops_seeded( n: usize, seed: u64, ) -> Vec>> { crate::qmdb::any::ordered::variable::test::create_test_ops_seeded(n, seed) } fn create_ops( n: usize, ) -> Vec>> { crate::qmdb::any::ordered::variable::test::create_test_ops(n) } async fn init_db(ctx: Context) -> Self::Db { crate::qmdb::any::ordered::variable::test::create_test_db(ctx).await } async fn init_db_with_config( ctx: Context, config: crate::qmdb::any::ordered::variable::test::VarConfig, ) -> Self::Db { Self::Db::init(ctx, config).await.unwrap() } async fn apply_ops( mut db: Self::Db, ops: Vec< crate::qmdb::any::ordered::variable::Operation>, >, ) -> Self::Db { crate::qmdb::any::ordered::variable::test::apply_ops(&mut db, ops).await; let merkleized = db .new_batch() .merkleize(&db, None::>) .await .unwrap(); db.apply_batch(merkleized).await.unwrap(); db } } // ----- Unordered/Fixed ----- pub struct UnorderedFixedHarness; impl SyncTestHarness for UnorderedFixedHarness { type Family = crate::mmr::Family; type Db = crate::qmdb::any::unordered::fixed::test::AnyTest; fn sync_target_root(db: &Self::Db) -> Digest { db.root() } fn config( suffix: &str, pooler: &impl BufferPooler, ) -> crate::qmdb::any::FixedConfig { crate::qmdb::any::test::fixed_db_config::<_>(suffix, pooler) } fn create_ops_seeded( n: usize, seed: u64, ) -> Vec> { crate::qmdb::any::unordered::fixed::test::create_test_ops_seeded(n, seed) } fn create_ops( n: usize, ) -> Vec> { crate::qmdb::any::unordered::fixed::test::create_test_ops(n) } async fn init_db(ctx: Context) -> Self::Db { crate::qmdb::any::unordered::fixed::test::create_test_db(ctx).await } async fn init_db_with_config( ctx: Context, config: crate::qmdb::any::FixedConfig, ) -> Self::Db { Self::Db::init(ctx, config).await.unwrap() } async fn apply_ops( mut db: Self::Db, ops: Vec< crate::qmdb::any::unordered::fixed::Operation, >, ) -> Self::Db { crate::qmdb::any::unordered::fixed::test::apply_ops(&mut db, ops).await; let merkleized = db.new_batch().merkleize(&db, None::).await.unwrap(); db.apply_batch(merkleized).await.unwrap(); db } } // ----- Unordered/Variable ----- pub struct UnorderedVariableHarness; impl SyncTestHarness for UnorderedVariableHarness { type Family = crate::mmr::Family; type Db = crate::qmdb::any::unordered::variable::test::AnyTest; fn sync_target_root(db: &Self::Db) -> Digest { db.root() } fn config( suffix: &str, pooler: &impl BufferPooler, ) -> crate::qmdb::any::unordered::variable::test::VarConfig { crate::qmdb::any::unordered::variable::test::create_test_config( suffix.parse().unwrap_or(0), pooler, ) } fn create_ops( n: usize, ) -> Vec< crate::qmdb::any::unordered::Operation< crate::mmr::Family, Digest, VariableEncoding>, >, > { crate::qmdb::any::unordered::variable::test::create_test_ops(n) } fn create_ops_seeded(n: usize, seed: u64) -> Vec> { crate::qmdb::any::unordered::variable::test::create_test_ops_seeded(n, seed) } async fn init_db(ctx: Context) -> Self::Db { crate::qmdb::any::unordered::variable::test::create_test_db(ctx).await } async fn init_db_with_config( ctx: Context, config: crate::qmdb::any::unordered::variable::test::VarConfig, ) -> Self::Db { Self::Db::init(ctx, config).await.unwrap() } async fn apply_ops( mut db: Self::Db, ops: Vec< crate::qmdb::any::unordered::Operation< crate::mmr::Family, Digest, VariableEncoding>, >, >, ) -> Self::Db { crate::qmdb::any::unordered::variable::test::apply_ops(&mut db, ops).await; let merkleized = db .new_batch() .merkleize(&db, None::>) .await .unwrap(); db.apply_batch(merkleized).await.unwrap(); db } } // ===== MMB harnesses ===== // ----- Ordered/Fixed MMB ----- pub struct OrderedFixedMmbHarness; impl SyncTestHarness for OrderedFixedMmbHarness { type Family = mmb::Family; type Db = crate::qmdb::any::ordered::fixed::Db< mmb::Family, Context, Digest, Digest, commonware_cryptography::Sha256, TwoCap, commonware_parallel::Sequential, >; fn sync_target_root(db: &Self::Db) -> Digest { db.root() } fn config( suffix: &str, pooler: &impl BufferPooler, ) -> crate::qmdb::any::FixedConfig { crate::qmdb::any::test::fixed_db_config::<_>(suffix, pooler) } fn create_ops( n: usize, ) -> Vec> { create_ordered_fixed_ops(n, 0) } fn create_ops_seeded( n: usize, seed: u64, ) -> Vec> { create_ordered_fixed_ops(n, seed) } async fn init_db(mut ctx: Context) -> Self::Db { let seed = ctx.next_u64(); let cfg = crate::qmdb::any::test::fixed_db_config::(&seed.to_string(), &ctx); Self::Db::init(ctx, cfg).await.unwrap() } async fn init_db_with_config( ctx: Context, config: crate::qmdb::any::FixedConfig, ) -> Self::Db { Self::Db::init(ctx, config).await.unwrap() } async fn apply_ops( mut db: Self::Db, ops: Vec>, ) -> Self::Db { use crate::qmdb::any::operation::Operation; let mut batch = db.new_batch(); for op in ops { match op { Operation::Update(data) => { batch = batch.write(data.key, Some(data.value)); } Operation::Delete(key) => { batch = batch.write(key, None); } Operation::CommitFloor(_, _) => {} } } let merkleized = batch.merkleize(&db, None::).await.unwrap(); db.apply_batch(merkleized).await.unwrap(); let merkleized = db.new_batch().merkleize(&db, None::).await.unwrap(); db.apply_batch(merkleized).await.unwrap(); db } } // ----- Ordered/Variable MMB ----- pub struct OrderedVariableMmbHarness; impl SyncTestHarness for OrderedVariableMmbHarness { type Family = mmb::Family; type Db = crate::qmdb::any::ordered::variable::Db< mmb::Family, Context, Digest, Vec, commonware_cryptography::Sha256, TwoCap, commonware_parallel::Sequential, >; fn sync_target_root(db: &Self::Db) -> Digest { db.root() } fn config( suffix: &str, pooler: &impl BufferPooler, ) -> crate::qmdb::any::ordered::variable::test::VarConfig { crate::qmdb::any::ordered::variable::test::create_test_config( suffix.parse().unwrap_or(0), pooler, ) } fn create_ops( n: usize, ) -> Vec>> { create_ordered_variable_ops(n, 0) } fn create_ops_seeded( n: usize, seed: u64, ) -> Vec>> { create_ordered_variable_ops(n, seed) } async fn init_db(mut ctx: Context) -> Self::Db { let seed = ctx.next_u64(); let config = crate::qmdb::any::ordered::variable::test::create_test_config(seed, &ctx); Self::Db::init(ctx, config).await.unwrap() } async fn init_db_with_config( ctx: Context, config: crate::qmdb::any::ordered::variable::test::VarConfig, ) -> Self::Db { Self::Db::init(ctx, config).await.unwrap() } async fn apply_ops( mut db: Self::Db, ops: Vec>>, ) -> Self::Db { use crate::qmdb::any::operation::Operation; let mut batch = db.new_batch(); for op in ops { match op { Operation::Update(data) => { batch = batch.write(data.key, Some(data.value)); } Operation::Delete(key) => { batch = batch.write(key, None); } Operation::CommitFloor(_, _) => {} } } let merkleized = batch.merkleize(&db, None::>).await.unwrap(); db.apply_batch(merkleized).await.unwrap(); let merkleized = db .new_batch() .merkleize(&db, None::>) .await .unwrap(); db.apply_batch(merkleized).await.unwrap(); db } } // ----- Unordered/Fixed MMB ----- pub struct UnorderedFixedMmbHarness; impl SyncTestHarness for UnorderedFixedMmbHarness { type Family = mmb::Family; type Db = crate::qmdb::any::unordered::fixed::Db< mmb::Family, Context, Digest, Digest, commonware_cryptography::Sha256, TwoCap, commonware_parallel::Sequential, >; fn sync_target_root(db: &Self::Db) -> Digest { db.root() } fn config( suffix: &str, pooler: &impl BufferPooler, ) -> crate::qmdb::any::FixedConfig { crate::qmdb::any::test::fixed_db_config::<_>(suffix, pooler) } fn create_ops( n: usize, ) -> Vec> { create_unordered_fixed_ops(n, 0) } fn create_ops_seeded( n: usize, seed: u64, ) -> Vec> { create_unordered_fixed_ops(n, seed) } async fn init_db(mut ctx: Context) -> Self::Db { let seed = ctx.next_u64(); let cfg = crate::qmdb::any::test::fixed_db_config::(&seed.to_string(), &ctx); Self::Db::init(ctx, cfg).await.unwrap() } async fn init_db_with_config( ctx: Context, config: crate::qmdb::any::FixedConfig, ) -> Self::Db { Self::Db::init(ctx, config).await.unwrap() } async fn apply_ops( mut db: Self::Db, ops: Vec>, ) -> Self::Db { use crate::qmdb::any::operation::Operation; let mut batch = db.new_batch(); for op in ops { match op { Operation::Update(data) => { batch = batch.write(data.0, Some(data.1)); } Operation::Delete(key) => { batch = batch.write(key, None); } Operation::CommitFloor(_, _) => {} } } let merkleized = batch.merkleize(&db, None::).await.unwrap(); db.apply_batch(merkleized).await.unwrap(); let merkleized = db.new_batch().merkleize(&db, None::).await.unwrap(); db.apply_batch(merkleized).await.unwrap(); db } } // ----- Unordered/Variable MMB ----- pub struct UnorderedVariableMmbHarness; impl SyncTestHarness for UnorderedVariableMmbHarness { type Family = mmb::Family; type Db = crate::qmdb::any::unordered::variable::Db< mmb::Family, Context, Digest, Vec, commonware_cryptography::Sha256, TwoCap, commonware_parallel::Sequential, >; fn sync_target_root(db: &Self::Db) -> Digest { db.root() } fn config( suffix: &str, pooler: &impl BufferPooler, ) -> crate::qmdb::any::unordered::variable::test::VarConfig { crate::qmdb::any::unordered::variable::test::create_test_config( suffix.parse().unwrap_or(0), pooler, ) } fn create_ops( n: usize, ) -> Vec>> { create_unordered_variable_ops(n, 0) } fn create_ops_seeded( n: usize, seed: u64, ) -> Vec>> { create_unordered_variable_ops(n, seed) } async fn init_db(mut ctx: Context) -> Self::Db { let seed = ctx.next_u64(); let config = crate::qmdb::any::unordered::variable::test::create_test_config(seed, &ctx); Self::Db::init(ctx, config).await.unwrap() } async fn init_db_with_config( ctx: Context, config: crate::qmdb::any::unordered::variable::test::VarConfig, ) -> Self::Db { Self::Db::init(ctx, config).await.unwrap() } async fn apply_ops( mut db: Self::Db, ops: Vec< crate::qmdb::any::unordered::variable::Operation>, >, ) -> Self::Db { use crate::qmdb::any::operation::Operation; let mut batch = db.new_batch(); for op in ops { match op { Operation::Update(data) => { batch = batch.write(data.0, Some(data.1)); } Operation::Delete(key) => { batch = batch.write(key, None); } Operation::CommitFloor(_, _) => {} } } let merkleized = batch.merkleize(&db, None::>).await.unwrap(); db.apply_batch(merkleized).await.unwrap(); let merkleized = db .new_batch() .merkleize(&db, None::>) .await .unwrap(); db.apply_batch(merkleized).await.unwrap(); db } } } // ===== Test Generation Macro ===== /// Macro to generate all standard sync tests for a given harness. macro_rules! sync_tests_for_harness { ($harness:ty, $mod_name:ident) => { mod $mod_name { use super::harnesses; use commonware_macros::test_traced; use rstest::rstest; use std::num::NonZeroU64; #[test_traced] fn test_sync_empty_operations_no_panic() { super::test_sync_empty_operations_no_panic::<$harness>(); } #[test_traced] fn test_sync_subset_of_target_database() { super::test_sync_subset_of_target_database::<$harness>(1000); } #[rstest] #[case::small_batch_size_one(10, 1)] #[case::small_batch_size_gt_db_size(10, 20)] #[case::batch_size_one(1000, 1)] #[case::floor_div_db_batch_size(1000, 3)] #[case::floor_div_db_batch_size_2(1000, 999)] #[case::div_db_batch_size(1000, 100)] #[case::db_size_eq_batch_size(1000, 1000)] #[case::batch_size_gt_db_size(1000, 1001)] fn test_sync(#[case] target_db_ops: usize, #[case] fetch_batch_size: u64) { super::test_sync::<$harness>( target_db_ops, NonZeroU64::new(fetch_batch_size).unwrap(), ); } #[test_traced] fn test_sync_use_existing_db_partial_match() { super::test_sync_use_existing_db_partial_match::<$harness>(1000); } #[test_traced] fn test_sync_use_existing_db_exact_match() { super::test_sync_use_existing_db_exact_match::<$harness>(1000); } #[test_traced("WARN")] fn test_target_update_lower_bound_decrease() { super::test_target_update_lower_bound_decrease::<$harness>(); } #[test_traced("WARN")] fn test_target_update_upper_bound_decrease() { super::test_target_update_upper_bound_decrease::<$harness>(); } #[test_traced("WARN")] fn test_target_update_bounds_increase() { super::test_target_update_bounds_increase::<$harness>(); } #[test] fn test_target_update_prune_only_rejected() { super::test_target_update_prune_only_rejected::<$harness>(); } #[test_traced("WARN")] fn test_target_update_on_done_client() { super::test_target_update_on_done_client::<$harness>(); } #[test_traced] fn test_sync_waits_for_explicit_finish() { super::test_sync_waits_for_explicit_finish::<$harness>(); } #[test_traced] fn test_sync_reports_progress_for_reached_targets_before_explicit_finish() { super::test_sync_reports_progress_for_reached_targets_before_explicit_finish::< $harness, >(); } #[test_traced] fn test_sync_handles_early_finish_signal() { super::test_sync_handles_early_finish_signal::<$harness>(); } #[test_traced] fn test_sync_fails_when_finish_sender_dropped() { super::test_sync_fails_when_finish_sender_dropped::<$harness>(); } #[test_traced] fn test_sync_allows_dropped_reached_target_receiver() { super::test_sync_allows_dropped_reached_target_receiver::<$harness>(); } #[rstest] #[case(1, 1)] #[case(1, 2)] #[case(1, 100)] #[case(2, 1)] #[case(2, 2)] #[case(2, 100)] // Regression test: panicked when we didn't set pinned nodes after updating target #[case(20, 10)] #[case(100, 1)] #[case(100, 2)] #[case(100, 100)] #[case(100, 1000)] fn test_target_update_during_sync( #[case] initial_ops: usize, #[case] additional_ops: usize, ) { super::test_target_update_during_sync::<$harness>(initial_ops, additional_ops); } #[test_traced] fn test_sync_database_persistence() { super::test_sync_database_persistence::<$harness>(); } #[test_traced] fn test_sync_post_sync_usability() { super::test_sync_post_sync_usability::<$harness>(); } #[test_traced] fn test_sync_resolver_fails() { super::test_sync_resolver_fails::<$harness>(); } #[test_traced] fn test_sync_retries_bad_pinned_nodes() { super::test_sync_retries_bad_pinned_nodes::<$harness>(); } #[test_traced] fn test_sync_waits_for_boundary_retry_after_target_update() { super::test_sync_waits_for_boundary_retry_after_target_update::<$harness>(); } } }; } /// Additional from_sync_result tests that require `FromSyncTestable`. /// Only the MMR harnesses have `FromSyncTestable` impls. macro_rules! from_sync_result_tests_for_harness { ($harness:ty, $mod_name:ident) => { mod $mod_name { use super::harnesses; use commonware_macros::test_traced; #[test_traced("WARN")] fn test_from_sync_result_empty_to_empty() { super::test_from_sync_result_empty_to_empty::<$harness>(); } #[test_traced] fn test_from_sync_result_empty_to_nonempty() { super::test_from_sync_result_empty_to_nonempty::<$harness>(); } #[test_traced] fn test_from_sync_result_nonempty_to_nonempty_partial_match() { super::test_from_sync_result_nonempty_to_nonempty_partial_match::<$harness>(); } #[test_traced] fn test_from_sync_result_nonempty_to_nonempty_exact_match() { super::test_from_sync_result_nonempty_to_nonempty_exact_match::<$harness>(); } } }; } // MMR harnesses (all tests including from_sync_result) sync_tests_for_harness!(harnesses::OrderedFixedHarness, ordered_fixed); sync_tests_for_harness!(harnesses::OrderedVariableHarness, ordered_variable); sync_tests_for_harness!(harnesses::UnorderedFixedHarness, unordered_fixed); sync_tests_for_harness!(harnesses::UnorderedVariableHarness, unordered_variable); from_sync_result_tests_for_harness!(harnesses::OrderedFixedHarness, ordered_fixed_from_sync); from_sync_result_tests_for_harness!( harnesses::OrderedVariableHarness, ordered_variable_from_sync ); from_sync_result_tests_for_harness!(harnesses::UnorderedFixedHarness, unordered_fixed_from_sync); from_sync_result_tests_for_harness!( harnesses::UnorderedVariableHarness, unordered_variable_from_sync ); // MMB harnesses (sync tests only, no from_sync_result) sync_tests_for_harness!(harnesses::OrderedFixedMmbHarness, ordered_fixed_mmb); sync_tests_for_harness!(harnesses::OrderedVariableMmbHarness, ordered_variable_mmb); sync_tests_for_harness!(harnesses::UnorderedFixedMmbHarness, unordered_fixed_mmb); sync_tests_for_harness!( harnesses::UnorderedVariableMmbHarness, unordered_variable_mmb );