//! Generic sync tests that work for both fixed and variable databases. //! //! This module provides a test harness trait and generic test functions that can be //! parameterized to run against either fixed-size or variable-size value databases. //! The shared functions are `pub(crate)` so that `current::sync::tests` can reuse them. use crate::{ journal::contiguous::Contiguous, merkle::{mmr, mmr::Location}, qmdb::{ self, any::traits::DbAny, operation::Operation as OperationTrait, sync::{ self, engine::{Config, NextStep}, resolver::{self, FetchResult, Resolver}, Engine, Target, }, }, Persistable, }; use commonware_codec::Encode; use commonware_cryptography::sha256::Digest; use commonware_macros::select; use commonware_runtime::{deterministic, BufferPooler, Metrics, Runner as _}; use commonware_utils::{ channel::{mpsc, oneshot}, non_empty_range, sync::AsyncRwLock, NZU64, }; use futures::{pin_mut, FutureExt}; use rand::RngCore as _; use std::{num::NonZeroU64, sync::Arc}; /// Type alias for the database type of a harness. pub(crate) type DbOf = ::Db; /// Type alias for the operation type of a harness. pub(crate) type OpOf = as qmdb::sync::Database>::Op; /// Type alias for the config type of a harness. pub(crate) type ConfigOf = as qmdb::sync::Database>::Config; /// Type alias for the journal type of a harness. pub(crate) type JournalOf = as qmdb::sync::Database>::Journal; /// Trait for cleanup operations in tests. pub(crate) trait Destructible { fn destroy( self, ) -> impl std::future::Future>> + Send; } // Implement Destructible for the concrete MMR type used in tests. // This is here (rather than in fixed/variable modules) to avoid duplicate implementations. impl Destructible for crate::mmr::journaled::Mmr { async fn destroy(self) -> Result<(), qmdb::Error> { self.destroy().await.map_err(qmdb::Error::Merkle) } } /// Trait providing internal access for from_sync_result tests. pub(crate) trait FromSyncTestable: qmdb::sync::Database { type Mmr: Destructible + Send; /// Get the MMR and journal from the database fn into_log_components(self) -> (Self::Mmr, Self::Journal); /// Get the pinned nodes at a given location fn pinned_nodes_at( &self, loc: Location, ) -> impl std::future::Future> + Send; } /// Harness for sync tests. pub(crate) trait SyncTestHarness: Sized + 'static { /// The database type being tested. type Db: qmdb::sync::Database + DbAny; /// Return the root the sync engine targets. fn sync_target_root(db: &Self::Db) -> Digest; /// Create a config with unique partition names fn config(suffix: &str, pooler: &impl BufferPooler) -> ConfigOf; /// Generate n test operations using the default seed (0) fn create_ops(n: usize) -> Vec>; /// Generate n test operations using a specific seed. /// Use different seeds when you need non-overlapping keys in the same test. fn create_ops_seeded(n: usize, seed: u64) -> Vec>; /// Initialize a database fn init_db(ctx: deterministic::Context) -> impl std::future::Future + Send; /// Initialize a database with a config fn init_db_with_config( ctx: deterministic::Context, config: ConfigOf, ) -> impl std::future::Future + Send; /// Apply operations to a database and commit. fn apply_ops( db: Self::Db, ops: Vec>, ) -> impl std::future::Future + Send; } /// Test that empty operations arrays fetched do not cause panics when stored and applied pub(crate) fn test_sync_empty_operations_no_panic() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Init target_db to satisfy engine configuration bounds let target_db = H::init_db(context.with_label("target")).await; // Use an arbitrary target let db_config = H::config(&context.next_u64().to_string(), &context); let config = Config { db_config, fetch_batch_size: NZU64!(10), target: Target { root: Digest::from([1u8; 32]), range: non_empty_range!(Location::new(0), Location::new(10)), }, context: context.with_label("client"), resolver: Arc::new(target_db), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; // Create the engine let mut client: Engine = Engine::new(config).await.unwrap(); // Pass empty operations vectors which should not cause panics client.store_operations(Location::new(0), vec![]); client.store_operations(Location::new(5), vec![]); // Apply operations which also shouldn't panic client.apply_operations().await.unwrap(); // It is considered a success simply if it didn't panic. }); } /// Test that resolver failure is handled correctly pub(crate) fn test_sync_resolver_fails() where resolver::tests::FailResolver, Digest>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let resolver = resolver::tests::FailResolver::, Digest>::new(); let target_root = Digest::from([0; 32]); let db_config = H::config(&context.next_u64().to_string(), &context); let engine_config = Config { context: context.with_label("client"), target: Target { root: target_root, range: non_empty_range!(Location::new(0), Location::new(5)), }, resolver, apply_batch_size: 2, max_outstanding_requests: 2, fetch_batch_size: NZU64!(2), db_config, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; let result: Result = sync::sync(engine_config).await; assert!(result.is_err()); }); } /// Test basic sync functionality with various batch sizes pub(crate) fn test_sync(target_db_ops: usize, fetch_batch_size: NonZeroU64) where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate target database let mut target_db = H::init_db(context.with_label("target")).await; let target_ops = H::create_ops(target_db_ops); target_db = H::apply_ops(target_db, target_ops).await; // commit already done in apply_ops target_db .prune(target_db.inactivity_floor_loc().await) .await .unwrap(); let target_op_count = target_db.bounds().await.end; let target_inactivity_floor = target_db.inactivity_floor_loc().await; let sync_root = H::sync_target_root(&target_db); let verification_root = target_db.root(); let lower_bound = target_db.inactivity_floor_loc().await; // Configure sync let db_config = H::config(&context.next_u64().to_string(), &context); let target_db = Arc::new(target_db); let client_context = context.with_label("client"); let config = Config { db_config: db_config.clone(), fetch_batch_size, target: Target { root: sync_root, range: non_empty_range!(lower_bound, target_op_count), }, context: client_context.clone(), resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; // Perform sync let synced_db: H::Db = sync::sync(config).await.unwrap(); // Verify database state (root hash is the key verification) assert_eq!(synced_db.bounds().await.end, target_op_count); assert_eq!( synced_db.inactivity_floor_loc().await, target_inactivity_floor ); assert_eq!(synced_db.root(), verification_root); // Verify persistence let final_root = synced_db.root(); let final_op_count = synced_db.bounds().await.end; let final_inactivity_floor = synced_db.inactivity_floor_loc().await; // Reopen and verify state persisted drop(synced_db); let reopened_db = H::init_db_with_config(client_context.with_label("reopened"), db_config).await; assert_eq!(reopened_db.bounds().await.end, final_op_count); assert_eq!( reopened_db.inactivity_floor_loc().await, final_inactivity_floor ); assert_eq!(reopened_db.root(), final_root); // Cleanup reopened_db.destroy().await.unwrap(); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test syncing to a subset of the target database (target has additional ops beyond sync range) pub(crate) fn test_sync_subset_of_target_database(target_db_ops: usize) where Arc>: Resolver, Digest = Digest>, OpOf: Encode + Clone + OperationTrait, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let mut target_db = H::init_db(context.with_label("target")).await; let target_ops = H::create_ops(target_db_ops); // Apply all but the last operation target_db = H::apply_ops(target_db, target_ops[0..target_db_ops - 1].to_vec()).await; // commit already done in apply_ops let upper_bound = target_db.bounds().await.end; let sync_root = H::sync_target_root(&target_db); let verification_root = target_db.root(); let lower_bound = target_db.inactivity_floor_loc().await; // Add another operation after the sync range let final_op = target_ops[target_db_ops - 1].clone(); let final_key = final_op.key().cloned(); // Store the key before applying target_db = H::apply_ops(target_db, vec![final_op]).await; // commit already done in apply_ops // Sync to the original root (before final_op was added) let db_config = H::config(&context.next_u64().to_string(), &context); let config = Config { db_config, fetch_batch_size: NZU64!(10), target: Target { root: sync_root, range: non_empty_range!(lower_bound, upper_bound), }, context: context.with_label("client"), resolver: Arc::new(target_db), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; let synced_db: H::Db = sync::sync(config).await.unwrap(); // Verify the synced database has the correct range of operations assert_eq!(synced_db.inactivity_floor_loc().await, lower_bound); assert_eq!(synced_db.bounds().await.end, upper_bound); // Verify the final root digest matches our target assert_eq!(synced_db.root(), verification_root); // Verify the synced database doesn't have any operations beyond the sync range. // (the final_op should not be present) if let Some(key) = final_key { assert!(synced_db.get(&key).await.unwrap().is_none()); } synced_db.destroy().await.unwrap(); }); } /// Test syncing where the sync client has some but not all of the operations in the target DB. /// Tests the scenario where sync_db already has partial data and needs to sync additional ops. pub(crate) fn test_sync_use_existing_db_partial_match(original_ops: usize) where Arc>: Resolver, Digest = Digest>, OpOf: Encode + Clone + OperationTrait, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let original_ops_data = H::create_ops(original_ops); // Create two databases let mut target_db = H::init_db(context.with_label("target")).await; let sync_db_config = H::config(&context.next_u64().to_string(), &context); let client_context = context.with_label("client"); let mut sync_db: H::Db = H::init_db_with_config(client_context.clone(), sync_db_config.clone()).await; // Apply the same operations to both databases target_db = H::apply_ops(target_db, original_ops_data.clone()).await; sync_db = H::apply_ops(sync_db, original_ops_data.clone()).await; // commit already done in apply_ops // commit already done in apply_ops drop(sync_db); // Add more operations and commit the target database // (use different seed to avoid key collisions) let more_ops = H::create_ops_seeded(1, 1); target_db = H::apply_ops(target_db, more_ops.clone()).await; // commit already done in apply_ops let sync_root = H::sync_target_root(&target_db); let verification_root = target_db.root(); let lower_bound = target_db.inactivity_floor_loc().await; let upper_bound = target_db.bounds().await.end; // Reopen the sync database and sync it to the target database let target_db = Arc::new(target_db); let config = Config { db_config: sync_db_config, fetch_batch_size: NZU64!(10), target: Target { root: sync_root, range: non_empty_range!(lower_bound, upper_bound), }, context: client_context.with_label("sync"), resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; let synced_db: H::Db = sync::sync(config).await.unwrap(); // Verify database state let bounds = synced_db.bounds().await; assert_eq!(bounds.end, upper_bound); assert_eq!( synced_db.inactivity_floor_loc().await, target_db.inactivity_floor_loc().await ); assert_eq!(synced_db.inactivity_floor_loc().await, lower_bound); assert_eq!(bounds.end, target_db.bounds().await.end); // Verify the root digest matches the target assert_eq!(synced_db.root(), verification_root); // Verify that original operations are present and correct (by key lookup) for target_op in &original_ops_data { if let Some(key) = target_op.key() { let target_value = target_db.get(key).await.unwrap(); let synced_value = synced_db.get(key).await.unwrap(); assert_eq!(target_value.is_some(), synced_value.is_some()); } } // Verify the last operation is present (if it's an update) if let Some(key) = more_ops[0].key() { let synced_value = synced_db.get(key).await.unwrap(); let target_value = target_db.get(key).await.unwrap(); assert_eq!(synced_value.is_some(), target_value.is_some()); } synced_db.destroy().await.unwrap(); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test case where existing database on disk exactly matches the sync target. /// Uses FailResolver to verify that no network requests are made since data already exists. pub(crate) fn test_sync_use_existing_db_exact_match(num_ops: usize) where resolver::tests::FailResolver, Digest>: Resolver, Digest = Digest>, OpOf: Encode + Clone + OperationTrait, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let target_ops = H::create_ops(num_ops); // Create two databases with their own configs let target_config = H::config(&context.next_u64().to_string(), &context); let mut target_db = H::init_db_with_config(context.with_label("target"), target_config).await; let sync_config = H::config(&context.next_u64().to_string(), &context); let client_context = context.with_label("client"); let mut sync_db = H::init_db_with_config(client_context.clone(), sync_config.clone()).await; // Apply the same operations to both databases target_db = H::apply_ops(target_db, target_ops.clone()).await; sync_db = H::apply_ops(sync_db, target_ops.clone()).await; // commit already done in apply_ops // commit already done in apply_ops target_db .prune(target_db.inactivity_floor_loc().await) .await .unwrap(); sync_db .prune(sync_db.inactivity_floor_loc().await) .await .unwrap(); sync_db.sync().await.unwrap(); drop(sync_db); // Capture target state let sync_root = H::sync_target_root(&target_db); let verification_root = target_db.root(); let lower_bound = target_db.inactivity_floor_loc().await; let upper_bound = target_db.bounds().await.end; // sync_db should never ask the resolver for operations // because it is already complete. Use a resolver that always fails // to ensure that it's not being used. let resolver = resolver::tests::FailResolver::, Digest>::new(); let config = Config { db_config: sync_config, // Use same config to access same partitions fetch_batch_size: NZU64!(10), target: Target { root: sync_root, range: non_empty_range!(lower_bound, upper_bound), }, context: client_context.with_label("sync"), resolver, apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; let synced_db: H::Db = sync::sync(config).await.unwrap(); // Verify database state let bounds = synced_db.bounds().await; assert_eq!(bounds.end, upper_bound); assert_eq!(bounds.end, target_db.bounds().await.end); assert_eq!(synced_db.inactivity_floor_loc().await, lower_bound); // Verify the root digest matches the target assert_eq!(synced_db.root(), verification_root); // Verify state matches for sample operations (via key lookup) for target_op in &target_ops { if let Some(key) = target_op.key() { let target_value = target_db.get(key).await.unwrap(); let synced_value = synced_db.get(key).await.unwrap(); assert_eq!(target_value.is_some(), synced_value.is_some()); } } synced_db.destroy().await.unwrap(); target_db.destroy().await.unwrap(); }); } /// Test that the client fails to sync if the lower bound is decreased via target update. pub(crate) fn test_target_update_lower_bound_decrease() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate target database let mut target_db = H::init_db(context.with_label("target")).await; let target_ops = H::create_ops(50); target_db = H::apply_ops(target_db, target_ops).await; // commit already done in apply_ops // Capture initial target state let initial_lower_bound = target_db.inactivity_floor_loc().await; let initial_upper_bound = target_db.bounds().await.end; let initial_root = H::sync_target_root(&target_db); // Create client with initial target let (update_sender, update_receiver) = mpsc::channel(1); let target_db = Arc::new(target_db); let config = Config { context: context.with_label("client"), db_config: H::config(&context.next_u64().to_string(), &context), fetch_batch_size: NZU64!(5), target: Target { root: initial_root, range: non_empty_range!(initial_lower_bound, initial_upper_bound), }, resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 10, update_rx: Some(update_receiver), finish_rx: None, reached_target_tx: None, max_retained_roots: 1, }; let client: Engine = Engine::new(config).await.unwrap(); // Send target update with decreased lower bound update_sender .send(Target { root: initial_root, range: non_empty_range!( initial_lower_bound.checked_sub(1).unwrap(), initial_upper_bound.checked_add(1).unwrap() ), }) .await .unwrap(); let result = client.step().await; assert!(matches!( result, Err(sync::Error::Engine( sync::EngineError::SyncTargetMovedBackward { .. } )) )); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test that the client fails to sync if the upper bound is decreased via target update. pub(crate) fn test_target_update_upper_bound_decrease() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate target database let mut target_db = H::init_db(context.with_label("target")).await; let target_ops = H::create_ops(50); target_db = H::apply_ops(target_db, target_ops).await; // commit already done in apply_ops // Capture initial target state let initial_lower_bound = target_db.inactivity_floor_loc().await; let initial_upper_bound = target_db.bounds().await.end; let initial_root = H::sync_target_root(&target_db); // Create client with initial target let (update_sender, update_receiver) = mpsc::channel(1); let target_db = Arc::new(target_db); let config = Config { context: context.with_label("client"), db_config: H::config(&context.next_u64().to_string(), &context), fetch_batch_size: NZU64!(5), target: Target { root: initial_root, range: non_empty_range!(initial_lower_bound, initial_upper_bound), }, resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 10, update_rx: Some(update_receiver), finish_rx: None, reached_target_tx: None, max_retained_roots: 1, }; let client: Engine = Engine::new(config).await.unwrap(); // Send target update with decreased upper bound update_sender .send(Target { root: initial_root, range: non_empty_range!( initial_lower_bound, initial_upper_bound.checked_sub(1).unwrap() ), }) .await .unwrap(); let result = client.step().await; assert!(matches!( result, Err(sync::Error::Engine( sync::EngineError::SyncTargetMovedBackward { .. } )) )); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test that the client succeeds when bounds are updated (increased). pub(crate) fn test_target_update_bounds_increase() where Arc>: Resolver, Digest = Digest>, OpOf: Encode + Clone, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate target database let mut target_db = H::init_db(context.with_label("target")).await; let target_ops = H::create_ops(100); target_db = H::apply_ops(target_db, target_ops).await; // commit already done in apply_ops // Capture initial target state let initial_lower_bound = target_db.inactivity_floor_loc().await; let initial_upper_bound = target_db.bounds().await.end; let initial_root = H::sync_target_root(&target_db); // Apply more operations to the target database // (use different seed to avoid key collisions) let additional_ops = H::create_ops_seeded(1, 1); let new_verification_root = { target_db = H::apply_ops(target_db, additional_ops).await; // commit already done in apply_ops // Capture new target state let new_lower_bound = target_db.inactivity_floor_loc().await; let new_upper_bound = target_db.bounds().await.end; let new_sync_root = H::sync_target_root(&target_db); let new_verification_root = target_db.root(); // Create client with placeholder initial target (stale compared to final target) let (update_sender, update_receiver) = mpsc::channel(1); let target_db = Arc::new(target_db); let config = Config { context: context.with_label("client"), db_config: H::config(&context.next_u64().to_string(), &context), fetch_batch_size: NZU64!(1), target: Target { root: initial_root, range: non_empty_range!(initial_lower_bound, initial_upper_bound), }, resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: Some(update_receiver), finish_rx: None, reached_target_tx: None, max_retained_roots: 1, }; // Send target update with increased bounds update_sender .send(Target { root: new_sync_root, range: non_empty_range!(new_lower_bound, new_upper_bound), }) .await .unwrap(); // Complete the sync let synced_db: H::Db = sync::sync(config).await.unwrap(); // Verify the synced database has the expected final state assert_eq!(synced_db.root(), new_verification_root); assert_eq!(synced_db.bounds().await.end, new_upper_bound); assert_eq!(synced_db.inactivity_floor_loc().await, new_lower_bound); synced_db.destroy().await.unwrap(); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); new_verification_root }; let _ = new_verification_root; // Silence unused variable warning }); } /// Test that target updates can be sent even after the client is done (no panic). pub(crate) fn test_target_update_on_done_client() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate target database let mut target_db = H::init_db(context.with_label("target")).await; let target_ops = H::create_ops(10); target_db = H::apply_ops(target_db, target_ops).await; // commit already done in apply_ops // Capture target state let lower_bound = target_db.inactivity_floor_loc().await; let upper_bound = target_db.bounds().await.end; let sync_root = H::sync_target_root(&target_db); let verification_root = target_db.root(); // Create client with target that will complete immediately let (update_sender, update_receiver) = mpsc::channel(1); let target_db = Arc::new(target_db); let config = Config { context: context.with_label("client"), db_config: H::config(&context.next_u64().to_string(), &context), fetch_batch_size: NZU64!(20), target: Target { root: sync_root, range: non_empty_range!(lower_bound, upper_bound), }, resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 10, update_rx: Some(update_receiver), finish_rx: None, reached_target_tx: None, max_retained_roots: 1, }; // Complete the sync let synced_db: H::Db = sync::sync(config).await.unwrap(); // Attempt to apply a target update after sync is complete to verify // we don't panic let _ = update_sender .send(Target { // Dummy target update root: Digest::from([2u8; 32]), range: non_empty_range!(lower_bound + 1, upper_bound + 1), }) .await; // Verify the synced database has the expected state assert_eq!(synced_db.root(), verification_root); assert_eq!(synced_db.bounds().await.end, upper_bound); assert_eq!(synced_db.inactivity_floor_loc().await, lower_bound); synced_db.destroy().await.unwrap(); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test that explicit finish control waits for a finish signal even after reaching target. pub(crate) fn test_sync_waits_for_explicit_finish() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let mut target_db = H::init_db(context.with_label("target")).await; target_db = H::apply_ops(target_db, H::create_ops(10)).await; let initial_target = Target { root: H::sync_target_root(&target_db), range: non_empty_range!( target_db.inactivity_floor_loc().await, target_db.bounds().await.end ), }; target_db = H::apply_ops(target_db, H::create_ops_seeded(5, 1)).await; let updated_lower_bound = target_db.inactivity_floor_loc().await; let updated_upper_bound = target_db.bounds().await.end; let updated_target = Target { root: H::sync_target_root(&target_db), range: non_empty_range!(updated_lower_bound, updated_upper_bound), }; let updated_verification_root = target_db.root(); let (update_sender, update_receiver) = mpsc::channel(1); let (finish_sender, finish_receiver) = mpsc::channel(1); let (reached_sender, mut reached_receiver) = mpsc::channel(1); let target_db = Arc::new(target_db); let config = Config { context: context.with_label("client"), db_config: H::config(&context.next_u64().to_string(), &context), fetch_batch_size: NZU64!(10), target: initial_target.clone(), resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: Some(update_receiver), finish_rx: Some(finish_receiver), reached_target_tx: Some(reached_sender), max_retained_roots: 0, }; let sync_handle = sync::sync(config); pin_mut!(sync_handle); select! { _ = sync_handle.as_mut() => { panic!("sync completed before explicit finish signal"); }, reached = reached_receiver.recv() => { let reached = reached.expect("engine should report reached-target before finish"); assert_eq!(reached, initial_target); }, } assert!( sync_handle.as_mut().now_or_never().is_none(), "sync must wait for explicit finish signal after reaching target" ); update_sender .send(updated_target.clone()) .await .expect("target update channel should be open"); select! { _ = sync_handle.as_mut() => { panic!("sync completed before explicit finish signal for updated target"); }, reached = reached_receiver.recv() => { let reached = reached.expect("engine should report updated target before finish"); assert_eq!(reached, updated_target); }, } assert!( sync_handle.as_mut().now_or_never().is_none(), "sync must still wait for explicit finish signal after updated target is reached" ); finish_sender .send(()) .await .expect("finish signal channel should be open"); let synced_db: H::Db = sync_handle .await .expect("sync should succeed after finish signal"); assert_eq!(synced_db.root(), updated_verification_root); assert_eq!(synced_db.bounds().await.end, updated_upper_bound); assert_eq!(synced_db.inactivity_floor_loc().await, updated_lower_bound); synced_db.destroy().await.unwrap(); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test that a finish signal received before target completion still allows full sync. pub(crate) fn test_sync_handles_early_finish_signal() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let mut target_db = H::init_db(context.with_label("target")).await; target_db = H::apply_ops(target_db, H::create_ops(30)).await; let lower_bound = target_db.inactivity_floor_loc().await; let upper_bound = target_db.bounds().await.end; let target = Target { root: H::sync_target_root(&target_db), range: non_empty_range!(lower_bound, upper_bound), }; let verification_root = target_db.root(); let (finish_sender, finish_receiver) = mpsc::channel(1); let (reached_sender, mut reached_receiver) = mpsc::channel(1); finish_sender .send(()) .await .expect("finish signal channel should be open"); let target_db = Arc::new(target_db); let config = Config { context: context.with_label("client"), db_config: H::config(&context.next_u64().to_string(), &context), fetch_batch_size: NZU64!(3), target: target.clone(), resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: Some(finish_receiver), reached_target_tx: Some(reached_sender), max_retained_roots: 1, }; let synced_db: H::Db = sync::sync(config) .await .expect("sync should complete after early finish signal"); let reached = reached_receiver .recv() .await .expect("engine should report reached-target"); assert_eq!(reached, target); assert_eq!(synced_db.root(), verification_root); assert_eq!(synced_db.bounds().await.end, upper_bound); assert_eq!(synced_db.inactivity_floor_loc().await, lower_bound); synced_db.destroy().await.unwrap(); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test that dropping finish sender without sending is treated as an error. pub(crate) fn test_sync_fails_when_finish_sender_dropped() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let mut target_db = H::init_db(context.with_label("target")).await; target_db = H::apply_ops(target_db, H::create_ops(10)).await; let lower_bound = target_db.inactivity_floor_loc().await; let upper_bound = target_db.bounds().await.end; let (finish_sender, finish_receiver) = mpsc::channel(1); drop(finish_sender); let target_db = Arc::new(target_db); let config = Config { context: context.with_label("client"), db_config: H::config(&context.next_u64().to_string(), &context), fetch_batch_size: NZU64!(5), target: Target { root: H::sync_target_root(&target_db), range: non_empty_range!(lower_bound, upper_bound), }, resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: Some(finish_receiver), reached_target_tx: None, max_retained_roots: 1, }; let result: Result = sync::sync(config).await; assert!(matches!( result, Err(sync::Error::Engine(sync::EngineError::FinishChannelClosed)) )); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test that dropping reached-target receiver does not fail sync. pub(crate) fn test_sync_allows_dropped_reached_target_receiver() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let mut target_db = H::init_db(context.with_label("target")).await; target_db = H::apply_ops(target_db, H::create_ops(10)).await; let lower_bound = target_db.inactivity_floor_loc().await; let upper_bound = target_db.bounds().await.end; let verification_root = target_db.root(); let (reached_sender, reached_receiver) = mpsc::channel(1); drop(reached_receiver); let target_db = Arc::new(target_db); let config = Config { context: context.with_label("client"), db_config: H::config(&context.next_u64().to_string(), &context), fetch_batch_size: NZU64!(5), target: Target { root: H::sync_target_root(&target_db), range: non_empty_range!(lower_bound, upper_bound), }, resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: Some(reached_sender), max_retained_roots: 1, }; let synced_db: H::Db = sync::sync(config) .await .expect("sync should succeed when reached-target receiver is dropped"); assert_eq!(synced_db.root(), verification_root); assert_eq!(synced_db.bounds().await.end, upper_bound); assert_eq!(synced_db.inactivity_floor_loc().await, lower_bound); synced_db.destroy().await.unwrap(); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test that the client can handle target updates during sync execution. pub(crate) fn test_target_update_during_sync( initial_ops: usize, additional_ops: usize, ) where Arc>>>: Resolver, Digest = Digest>, OpOf: Encode + Clone, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate target database with initial operations let mut target_db = H::init_db(context.with_label("target")).await; let target_ops = H::create_ops(initial_ops); target_db = H::apply_ops(target_db, target_ops).await; // commit already done in apply_ops // Capture initial target state let initial_lower_bound = target_db.inactivity_floor_loc().await; let initial_upper_bound = target_db.bounds().await.end; let initial_sync_root = H::sync_target_root(&target_db); // Wrap target database for shared mutable access (using Option so we can take ownership) let target_db = Arc::new(AsyncRwLock::new(Some(target_db))); // Create client with initial target and small batch size let (update_sender, update_receiver) = mpsc::channel(1); // Step the client to process a batch let client = { let config = Config { context: context.with_label("client"), db_config: H::config(&context.next_u64().to_string(), &context), target: Target { root: initial_sync_root, range: non_empty_range!(initial_lower_bound, initial_upper_bound), }, resolver: target_db.clone(), fetch_batch_size: NZU64!(1), // Small batch size so we don't finish after one batch max_outstanding_requests: 10, apply_batch_size: 1024, update_rx: Some(update_receiver), finish_rx: None, reached_target_tx: None, max_retained_roots: 1, }; let mut client: Engine = Engine::new(config).await.unwrap(); loop { // Step the client until we have processed a batch of operations client = match client.step().await.unwrap() { NextStep::Continue(new_client) => new_client, NextStep::Complete(_) => panic!("client should not be complete"), }; let log_size = client.journal().size().await; if log_size > initial_lower_bound { break client; } } }; // Modify the target database by adding more operations // (use different seed to avoid key collisions) let additional_ops_data = H::create_ops_seeded(additional_ops, 1); let new_verification_root = { let mut db_guard = target_db.write().await; let db = db_guard.take().unwrap(); let db = H::apply_ops(db, additional_ops_data).await; // Capture new target state let new_lower_bound = db.inactivity_floor_loc().await; let new_upper_bound = db.bounds().await.end; let new_sync_root = H::sync_target_root(&db); let new_verification_root = db.root(); *db_guard = Some(db); // Send target update with new target update_sender .send(Target { root: new_sync_root, range: non_empty_range!(new_lower_bound, new_upper_bound), }) .await .unwrap(); new_verification_root }; // Complete the sync let synced_db = client.sync().await.unwrap(); // Verify the synced database has the expected final state assert_eq!(synced_db.root(), new_verification_root); // Verify the target database matches the synced database let target_db = Arc::try_unwrap(target_db).map_or_else( |_| panic!("Failed to unwrap Arc - still has references"), |rw_lock| rw_lock.into_inner().expect("db should be present"), ); { let synced_bounds = synced_db.bounds().await; let target_bounds = target_db.bounds().await; assert_eq!(synced_bounds.end, target_bounds.end); assert_eq!( synced_db.inactivity_floor_loc().await, target_db.inactivity_floor_loc().await ); assert_eq!(synced_db.root(), target_db.root()); } synced_db.destroy().await.unwrap(); target_db.destroy().await.unwrap(); }); } /// Test demonstrating that a synced database can be reopened and retain its state. pub(crate) fn test_sync_database_persistence() where Arc>: Resolver, Digest = Digest>, OpOf: Encode + Clone, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate a simple target database let mut target_db = H::init_db(context.with_label("target")).await; let target_ops = H::create_ops(10); target_db = H::apply_ops(target_db, target_ops).await; // commit already done in apply_ops // Capture target state let sync_root = H::sync_target_root(&target_db); let verification_root = target_db.root(); let lower_bound = target_db.inactivity_floor_loc().await; let upper_bound = target_db.bounds().await.end; // Perform sync let db_config = H::config(&context.next_u64().to_string(), &context); let client_context = context.with_label("client"); let target_db = Arc::new(target_db); let config = Config { db_config: db_config.clone(), fetch_batch_size: NZU64!(5), target: Target { root: sync_root, range: non_empty_range!(lower_bound, upper_bound), }, context: client_context.clone(), resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; let synced_db: H::Db = sync::sync(config).await.unwrap(); // Verify initial sync worked assert_eq!(synced_db.root(), verification_root); // Save state before dropping let expected_root = synced_db.root(); let expected_op_count = synced_db.bounds().await.end; let expected_inactivity_floor_loc = synced_db.inactivity_floor_loc().await; // Re-open the database drop(synced_db); let reopened_db = H::init_db_with_config(client_context.with_label("reopened"), db_config).await; // Verify the state is unchanged assert_eq!(reopened_db.root(), expected_root); assert_eq!(reopened_db.bounds().await.end, expected_op_count); assert_eq!( reopened_db.inactivity_floor_loc().await, expected_inactivity_floor_loc ); // Cleanup Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); reopened_db.destroy().await.unwrap(); }); } /// Test post-sync usability: after syncing, the database supports normal operations. pub(crate) fn test_sync_post_sync_usability() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let mut target_db = H::init_db(context.with_label("target")).await; let target_ops = H::create_ops(50); target_db = H::apply_ops(target_db, target_ops).await; let sync_root = H::sync_target_root(&target_db); let lower_bound = target_db.inactivity_floor_loc().await; let upper_bound = target_db.bounds().await.end; let target_db = Arc::new(target_db); let config = H::config(&context.next_u64().to_string(), &context); let config = Config { db_config: config, fetch_batch_size: NZU64!(100), target: Target { root: sync_root, range: non_empty_range!(lower_bound, upper_bound), }, context: context.with_label("client"), resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; let synced_db: H::Db = sync::sync(config).await.unwrap(); let root_after_sync = synced_db.root(); // Apply additional operations after sync. let more_ops = H::create_ops_seeded(10, 1); let synced_db = H::apply_ops(synced_db, more_ops).await; // Root should change after applying more ops. assert_ne!(synced_db.root(), root_after_sync); assert!(synced_db.bounds().await.end > upper_bound); synced_db.destroy().await.unwrap(); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } /// Test `from_sync_result` where the database has all operations in the target range. pub(crate) fn test_from_sync_result_nonempty_to_nonempty_exact_match() where DbOf: FromSyncTestable, OpOf: Encode + Clone, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let db_config = H::config(&context.next_u64().to_string(), &context); let mut db = H::init_db_with_config(context.with_label("source"), db_config.clone()).await; let ops = H::create_ops(100); db = H::apply_ops(db, ops).await; // commit already done in apply_ops let sync_lower_bound = db.inactivity_floor_loc().await; let bounds = db.bounds().await; let sync_upper_bound = bounds.end; let target_db_op_count = bounds.end; let target_db_inactivity_floor_loc = db.inactivity_floor_loc().await; let pinned_nodes = db.pinned_nodes_at(db.inactivity_floor_loc().await).await; let (_, journal) = db.into_log_components(); let sync_db: DbOf = as qmdb::sync::Database>::from_sync_result( context.with_label("synced"), db_config, journal, Some(pinned_nodes), sync_lower_bound..sync_upper_bound, 1024, ) .await .unwrap(); // Verify database state assert_eq!(sync_db.bounds().await.end, target_db_op_count); assert_eq!( sync_db.inactivity_floor_loc().await, target_db_inactivity_floor_loc ); assert_eq!(sync_db.inactivity_floor_loc().await, sync_lower_bound); sync_db.destroy().await.unwrap(); }); } /// Test `from_sync_result` where the database has some but not all operations in the target range. pub(crate) fn test_from_sync_result_nonempty_to_nonempty_partial_match() where DbOf: FromSyncTestable, OpOf: Encode + Clone, JournalOf: Contiguous, { const NUM_OPS: usize = 100; const NUM_ADDITIONAL_OPS: usize = 5; let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate two databases. let mut target_db = H::init_db(context.with_label("target")).await; let sync_db_config = H::config(&context.next_u64().to_string(), &context); let client_context = context.with_label("client"); let mut sync_db = H::init_db_with_config(client_context.clone(), sync_db_config.clone()).await; let original_ops = H::create_ops(NUM_OPS); target_db = H::apply_ops(target_db, original_ops.clone()).await; // commit already done in apply_ops target_db .prune(target_db.inactivity_floor_loc().await) .await .unwrap(); sync_db = H::apply_ops(sync_db, original_ops.clone()).await; // commit already done in apply_ops sync_db .prune(sync_db.inactivity_floor_loc().await) .await .unwrap(); sync_db.sync().await.unwrap(); drop(sync_db); // Add more operations to the target db // (use different seed to avoid key collisions) let more_ops = H::create_ops_seeded(NUM_ADDITIONAL_OPS, 1); target_db = H::apply_ops(target_db, more_ops).await; // commit already done in apply_ops // Capture target db state for comparison let bounds = target_db.bounds().await; let target_db_op_count = bounds.end; let target_db_inactivity_floor_loc = target_db.inactivity_floor_loc().await; let sync_lower_bound = target_db.inactivity_floor_loc().await; let sync_upper_bound = bounds.end; let target_hash = target_db.root(); // Get pinned nodes at the sync lower bound from the target db (which has all the data). let pinned_nodes = target_db.pinned_nodes_at(sync_lower_bound).await; let (mmr, journal) = target_db.into_log_components(); // Re-open `sync_db` using from_sync_result let sync_db: DbOf = as qmdb::sync::Database>::from_sync_result( client_context.with_label("synced"), sync_db_config, journal, Some(pinned_nodes), sync_lower_bound..sync_upper_bound, 1024, ) .await .unwrap(); // Verify database state assert_eq!(sync_db.bounds().await.end, target_db_op_count); assert_eq!( sync_db.inactivity_floor_loc().await, target_db_inactivity_floor_loc ); assert_eq!(sync_db.inactivity_floor_loc().await, sync_lower_bound); // Verify the root digest matches the target (verifies content integrity) assert_eq!(sync_db.root(), target_hash); sync_db.destroy().await.unwrap(); mmr.destroy().await.unwrap(); }); } /// Test `from_sync_result` with an empty destination database syncing to a non-empty source. /// This tests the scenario where a sync client starts fresh with no existing data. pub(crate) fn test_from_sync_result_empty_to_nonempty() where DbOf: FromSyncTestable, OpOf: Encode + Clone, JournalOf: Contiguous, { const NUM_OPS: usize = 100; let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate a source database let mut source_db = H::init_db(context.with_label("source")).await; let ops = H::create_ops(NUM_OPS); source_db = H::apply_ops(source_db, ops).await; // commit already done in apply_ops source_db .prune(source_db.inactivity_floor_loc().await) .await .unwrap(); let lower_bound = source_db.inactivity_floor_loc().await; let upper_bound = source_db.bounds().await.end; // Get pinned nodes and target hash before deconstructing source_db let pinned_nodes = source_db.pinned_nodes_at(lower_bound).await; let target_hash = source_db.root(); let target_op_count = source_db.bounds().await.end; let target_inactivity_floor = source_db.inactivity_floor_loc().await; let (mmr, journal) = source_db.into_log_components(); // Use a different config (simulating a new empty database) let new_db_config = H::config(&context.next_u64().to_string(), &context); let db: DbOf = as qmdb::sync::Database>::from_sync_result( context.with_label("synced"), new_db_config, journal, Some(pinned_nodes), lower_bound..upper_bound, 1024, ) .await .unwrap(); // Verify database state assert_eq!(db.bounds().await.end, target_op_count); assert_eq!(db.inactivity_floor_loc().await, target_inactivity_floor); assert_eq!(db.inactivity_floor_loc().await, lower_bound); // Verify the root digest matches the target assert_eq!(db.root(), target_hash); db.destroy().await.unwrap(); mmr.destroy().await.unwrap(); }); } /// Test `from_sync_result` with an empty source database syncing to an empty target database. pub(crate) fn test_from_sync_result_empty_to_empty() where DbOf: FromSyncTestable, OpOf: Encode + Clone, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create an empty database (initialized with a single CommitFloor operation) let source_db = H::init_db(context.with_label("source")).await; // An empty database has exactly 1 operation (the initial CommitFloor) assert_eq!(source_db.bounds().await.end, Location::new(1)); let target_hash = source_db.root(); let (mmr, journal) = source_db.into_log_components(); // Use a different config (simulating a new empty database) let new_db_config = H::config(&context.next_u64().to_string(), &context); let mut synced_db: DbOf = as qmdb::sync::Database>::from_sync_result( context.with_label("synced"), new_db_config, journal, None, Location::new(0)..Location::new(1), 1024, ) .await .unwrap(); // Verify database state assert_eq!(synced_db.bounds().await.end, Location::new(1)); assert_eq!(synced_db.inactivity_floor_loc().await, Location::new(0)); assert_eq!(synced_db.root(), target_hash); // Test that we can perform operations on the synced database let ops = H::create_ops(10); synced_db = H::apply_ops(synced_db, ops).await; // Verify the operations worked assert!(synced_db.bounds().await.end > Location::new(1)); synced_db.destroy().await.unwrap(); mmr.destroy().await.unwrap(); }); } /// A resolver wrapper that corrupts pinned nodes on the first request, then returns correct /// data on subsequent requests. #[derive(Clone)] struct CorruptFirstPinnedNodesResolver { inner: R, corrupted: Arc, } impl> Resolver for CorruptFirstPinnedNodesResolver { type Digest = Digest; type Op = R::Op; type Error = R::Error; async fn get_operations( &self, op_count: Location, start_loc: Location, max_ops: NonZeroU64, include_pinned_nodes: bool, cancel_rx: oneshot::Receiver<()>, ) -> Result, Self::Error> { let mut result = self .inner .get_operations( op_count, start_loc, max_ops, include_pinned_nodes, cancel_rx, ) .await?; // Corrupt pinned nodes only on the first request that includes them. if result.pinned_nodes.is_some() && !self .corrupted .swap(true, std::sync::atomic::Ordering::Relaxed) { if let Some(ref mut nodes) = result.pinned_nodes { if !nodes.is_empty() { nodes[0] = Digest::from([0xFFu8; 32]); } } } Ok(result) } } /// Test that corrupted pinned nodes on the first attempt are rejected and the sync /// succeeds on retry when the resolver returns correct data. pub(crate) fn test_sync_retries_bad_pinned_nodes() where Arc>: Resolver, Digest = Digest>, OpOf: Encode, JournalOf: Contiguous, { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Build a target database with some operations and prune so that pinned nodes are needed. let mut target_db = H::init_db(context.with_label("target")).await; let ops = H::create_ops(20); target_db = H::apply_ops(target_db, ops).await; target_db .prune(target_db.inactivity_floor_loc().await) .await .unwrap(); let sync_root = H::sync_target_root(&target_db); let lower_bound = target_db.inactivity_floor_loc().await; let upper_bound = target_db.bounds().await.end; let db_config = H::config(&context.next_u64().to_string(), &context); let resolver = CorruptFirstPinnedNodesResolver { inner: Arc::new(target_db), corrupted: Arc::new(std::sync::atomic::AtomicBool::new(false)), }; let config = sync::engine::Config { db_config, fetch_batch_size: NZU64!(100), target: Target { root: sync_root, range: non_empty_range!(lower_bound, upper_bound), }, context: context.with_label("client"), resolver, apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; // Sync should succeed on the second attempt after the first corrupted pinned nodes // are rejected. let synced_db: H::Db = sync::sync(config).await.unwrap(); assert_eq!(synced_db.root(), sync_root); synced_db.destroy().await.unwrap(); }); } mod harnesses { use super::SyncTestHarness; use crate::{qmdb::any::value::VariableEncoding, translator::TwoCap}; use commonware_cryptography::sha256::Digest; use commonware_runtime::{deterministic::Context, BufferPooler}; // ----- Ordered/Fixed ----- pub struct OrderedFixedHarness; impl SyncTestHarness for OrderedFixedHarness { type Db = crate::qmdb::any::ordered::fixed::test::AnyTest; fn sync_target_root(db: &Self::Db) -> Digest { db.root() } fn config( suffix: &str, pooler: &impl BufferPooler, ) -> crate::qmdb::any::FixedConfig { crate::qmdb::any::test::fixed_db_config(suffix, pooler) } fn create_ops( n: usize, ) -> Vec> { crate::qmdb::any::ordered::fixed::test::create_test_ops(n) } fn create_ops_seeded( n: usize, seed: u64, ) -> Vec> { crate::qmdb::any::ordered::fixed::test::create_test_ops_seeded(n, seed) } async fn init_db(ctx: Context) -> Self::Db { crate::qmdb::any::ordered::fixed::test::create_test_db(ctx).await } async fn init_db_with_config( ctx: Context, config: crate::qmdb::any::FixedConfig, ) -> Self::Db { Self::Db::init(ctx, config).await.unwrap() } async fn apply_ops( mut db: Self::Db, ops: Vec< crate::qmdb::any::ordered::fixed::Operation, >, ) -> Self::Db { crate::qmdb::any::ordered::fixed::test::apply_ops(&mut db, ops).await; let merkleized = db.new_batch().merkleize(&db, None::).await.unwrap(); db.apply_batch(merkleized).await.unwrap(); db } } // ----- Ordered/Variable ----- pub struct OrderedVariableHarness; impl SyncTestHarness for OrderedVariableHarness { type Db = crate::qmdb::any::ordered::variable::test::AnyTest; fn sync_target_root(db: &Self::Db) -> Digest { db.root() } fn config( suffix: &str, pooler: &impl BufferPooler, ) -> crate::qmdb::any::ordered::variable::test::VarConfig { crate::qmdb::any::ordered::variable::test::create_test_config( suffix.parse().unwrap_or(0), pooler, ) } fn create_ops_seeded( n: usize, seed: u64, ) -> Vec>> { crate::qmdb::any::ordered::variable::test::create_test_ops_seeded(n, seed) } fn create_ops( n: usize, ) -> Vec>> { crate::qmdb::any::ordered::variable::test::create_test_ops(n) } async fn init_db(ctx: Context) -> Self::Db { crate::qmdb::any::ordered::variable::test::create_test_db(ctx).await } async fn init_db_with_config( ctx: Context, config: crate::qmdb::any::ordered::variable::test::VarConfig, ) -> Self::Db { Self::Db::init(ctx, config).await.unwrap() } async fn apply_ops( mut db: Self::Db, ops: Vec< crate::qmdb::any::ordered::variable::Operation>, >, ) -> Self::Db { crate::qmdb::any::ordered::variable::test::apply_ops(&mut db, ops).await; let merkleized = db .new_batch() .merkleize(&db, None::>) .await .unwrap(); db.apply_batch(merkleized).await.unwrap(); db } } // ----- Unordered/Fixed ----- pub struct UnorderedFixedHarness; impl SyncTestHarness for UnorderedFixedHarness { type Db = crate::qmdb::any::unordered::fixed::test::AnyTest; fn sync_target_root(db: &Self::Db) -> Digest { db.root() } fn config( suffix: &str, pooler: &impl BufferPooler, ) -> crate::qmdb::any::FixedConfig { crate::qmdb::any::test::fixed_db_config(suffix, pooler) } fn create_ops_seeded( n: usize, seed: u64, ) -> Vec> { crate::qmdb::any::unordered::fixed::test::create_test_ops_seeded(n, seed) } fn create_ops( n: usize, ) -> Vec> { crate::qmdb::any::unordered::fixed::test::create_test_ops(n) } async fn init_db(ctx: Context) -> Self::Db { crate::qmdb::any::unordered::fixed::test::create_test_db(ctx).await } async fn init_db_with_config( ctx: Context, config: crate::qmdb::any::FixedConfig, ) -> Self::Db { Self::Db::init(ctx, config).await.unwrap() } async fn apply_ops( mut db: Self::Db, ops: Vec< crate::qmdb::any::unordered::fixed::Operation, >, ) -> Self::Db { crate::qmdb::any::unordered::fixed::test::apply_ops(&mut db, ops).await; let merkleized = db.new_batch().merkleize(&db, None::).await.unwrap(); db.apply_batch(merkleized).await.unwrap(); db } } // ----- Unordered/Variable ----- pub struct UnorderedVariableHarness; impl SyncTestHarness for UnorderedVariableHarness { type Db = crate::qmdb::any::unordered::variable::test::AnyTest; fn sync_target_root(db: &Self::Db) -> Digest { db.root() } fn config( suffix: &str, pooler: &impl BufferPooler, ) -> crate::qmdb::any::unordered::variable::test::VarConfig { crate::qmdb::any::unordered::variable::test::create_test_config( suffix.parse().unwrap_or(0), pooler, ) } fn create_ops( n: usize, ) -> Vec< crate::qmdb::any::unordered::Operation< crate::mmr::Family, Digest, VariableEncoding>, >, > { crate::qmdb::any::unordered::variable::test::create_test_ops(n) } fn create_ops_seeded(n: usize, seed: u64) -> Vec> { crate::qmdb::any::unordered::variable::test::create_test_ops_seeded(n, seed) } async fn init_db(ctx: Context) -> Self::Db { crate::qmdb::any::unordered::variable::test::create_test_db(ctx).await } async fn init_db_with_config( ctx: Context, config: crate::qmdb::any::unordered::variable::test::VarConfig, ) -> Self::Db { Self::Db::init(ctx, config).await.unwrap() } async fn apply_ops( mut db: Self::Db, ops: Vec< crate::qmdb::any::unordered::Operation< crate::mmr::Family, Digest, VariableEncoding>, >, >, ) -> Self::Db { crate::qmdb::any::unordered::variable::test::apply_ops(&mut db, ops).await; let merkleized = db .new_batch() .merkleize(&db, None::>) .await .unwrap(); db.apply_batch(merkleized).await.unwrap(); db } } } // ===== Test Generation Macro ===== /// Macro to generate all standard sync tests for a given harness. macro_rules! sync_tests_for_harness { ($harness:ty, $mod_name:ident) => { mod $mod_name { use super::harnesses; use commonware_macros::test_traced; use rstest::rstest; use std::num::NonZeroU64; #[test_traced] fn test_sync_empty_operations_no_panic() { super::test_sync_empty_operations_no_panic::<$harness>(); } #[test_traced] fn test_sync_subset_of_target_database() { super::test_sync_subset_of_target_database::<$harness>(1000); } #[rstest] #[case::small_batch_size_one(10, 1)] #[case::small_batch_size_gt_db_size(10, 20)] #[case::batch_size_one(1000, 1)] #[case::floor_div_db_batch_size(1000, 3)] #[case::floor_div_db_batch_size_2(1000, 999)] #[case::div_db_batch_size(1000, 100)] #[case::db_size_eq_batch_size(1000, 1000)] #[case::batch_size_gt_db_size(1000, 1001)] fn test_sync(#[case] target_db_ops: usize, #[case] fetch_batch_size: u64) { super::test_sync::<$harness>( target_db_ops, NonZeroU64::new(fetch_batch_size).unwrap(), ); } #[test_traced] fn test_sync_use_existing_db_partial_match() { super::test_sync_use_existing_db_partial_match::<$harness>(1000); } #[test_traced] fn test_sync_use_existing_db_exact_match() { super::test_sync_use_existing_db_exact_match::<$harness>(1000); } #[test_traced("WARN")] fn test_target_update_lower_bound_decrease() { super::test_target_update_lower_bound_decrease::<$harness>(); } #[test_traced("WARN")] fn test_target_update_upper_bound_decrease() { super::test_target_update_upper_bound_decrease::<$harness>(); } #[test_traced("WARN")] fn test_target_update_bounds_increase() { super::test_target_update_bounds_increase::<$harness>(); } #[test_traced("WARN")] fn test_target_update_on_done_client() { super::test_target_update_on_done_client::<$harness>(); } #[test_traced] fn test_sync_waits_for_explicit_finish() { super::test_sync_waits_for_explicit_finish::<$harness>(); } #[test_traced] fn test_sync_handles_early_finish_signal() { super::test_sync_handles_early_finish_signal::<$harness>(); } #[test_traced] fn test_sync_fails_when_finish_sender_dropped() { super::test_sync_fails_when_finish_sender_dropped::<$harness>(); } #[test_traced] fn test_sync_allows_dropped_reached_target_receiver() { super::test_sync_allows_dropped_reached_target_receiver::<$harness>(); } #[rstest] #[case(1, 1)] #[case(1, 2)] #[case(1, 100)] #[case(2, 1)] #[case(2, 2)] #[case(2, 100)] // Regression test: panicked when we didn't set pinned nodes after updating target #[case(20, 10)] #[case(100, 1)] #[case(100, 2)] #[case(100, 100)] #[case(100, 1000)] fn test_target_update_during_sync( #[case] initial_ops: usize, #[case] additional_ops: usize, ) { super::test_target_update_during_sync::<$harness>(initial_ops, additional_ops); } #[test_traced] fn test_sync_database_persistence() { super::test_sync_database_persistence::<$harness>(); } #[test_traced] fn test_sync_post_sync_usability() { super::test_sync_post_sync_usability::<$harness>(); } #[test_traced] fn test_sync_resolver_fails() { super::test_sync_resolver_fails::<$harness>(); } #[test_traced] fn test_sync_retries_bad_pinned_nodes() { super::test_sync_retries_bad_pinned_nodes::<$harness>(); } #[test_traced("WARN")] fn test_from_sync_result_empty_to_empty() { super::test_from_sync_result_empty_to_empty::<$harness>(); } #[test_traced] fn test_from_sync_result_empty_to_nonempty() { super::test_from_sync_result_empty_to_nonempty::<$harness>(); } #[test_traced] fn test_from_sync_result_nonempty_to_nonempty_partial_match() { super::test_from_sync_result_nonempty_to_nonempty_partial_match::<$harness>(); } #[test_traced] fn test_from_sync_result_nonempty_to_nonempty_exact_match() { super::test_from_sync_result_nonempty_to_nonempty_exact_match::<$harness>(); } } }; } sync_tests_for_harness!(harnesses::OrderedFixedHarness, ordered_fixed); sync_tests_for_harness!(harnesses::OrderedVariableHarness, ordered_variable); sync_tests_for_harness!(harnesses::UnorderedFixedHarness, unordered_fixed); sync_tests_for_harness!(harnesses::UnorderedVariableHarness, unordered_variable);