use crate::{ index::unordered::Index, journal::{ authenticated, contiguous::{Mutable, Reader as _}, Error as JournalError, }, merkle::{ journaled::{self, Journaled}, mmr, Location, }, qmdb::{ any::ValueEncoding, build_snapshot_from_log, immutable::{self, Operation}, operation::Key, sync::{self}, Error, }, translator::Translator, Context, Persistable, }; use commonware_codec::EncodeShared; use commonware_cryptography::Hasher; use std::ops::Range; type StandardHasher = crate::merkle::hasher::Standard; impl sync::Database for immutable::Immutable where E: Context, K: Key, V: ValueEncoding, C: Mutable> + Persistable + sync::Journal>, C::Item: EncodeShared, C::Config: Clone + Send, H: Hasher, T: Translator, { type Op = Operation; type Journal = C; type Hasher = H; type Config = immutable::Config; type Digest = H::Digest; type Context = E; /// Returns an [Immutable](immutable::Immutable) initialized from data collected in the sync process. /// /// # Behavior /// /// This method handles different initialization scenarios based on existing data: /// - If the Merkle journal is empty or the last item is before the range start, it creates a /// fresh Merkle structure from the provided `pinned_nodes` /// - If the Merkle journal has data but is incomplete (has length < range end), missing /// operations from the log are applied to bring it up to the target state /// - If the Merkle journal has data beyond the range end, it is rewound to match the sync /// target /// /// # Returns /// /// A [super::Immutable] db populated with the state from the given range. /// The pruning boundary is set to the range start. async fn from_sync_result( context: Self::Context, db_config: Self::Config, log: Self::Journal, pinned_nodes: Option>, range: Range, apply_batch_size: usize, ) -> Result> { let hasher = StandardHasher::new(); // Initialize Merkle structure for sync let merkle = Journaled::init_sync( context.with_label("merkle"), journaled::SyncConfig { config: db_config.merkle_config.clone(), range, pinned_nodes, }, &hasher, ) .await?; let journal = authenticated::Journal::<_, _, _, _>::from_components( merkle, log, hasher, apply_batch_size as u64, ) .await?; let mut snapshot: Index = Index::new(context.with_label("snapshot"), db_config.translator.clone()); let last_commit_loc = { // Get the start of the log. let reader = journal.journal.reader().await; let bounds = reader.bounds(); let start_loc = mmr::Location::new(bounds.start); // Build snapshot from the log build_snapshot_from_log::( start_loc, &reader, &mut snapshot, |_, _| {}, ) .await?; Location::new(bounds.end.checked_sub(1).expect("commit should exist")) }; let db = Self { journal, snapshot, last_commit_loc, }; db.sync().await?; Ok(db) } fn root(&self) -> Self::Digest { self.root() } } #[cfg(test)] mod tests { use crate::{ merkle::mmr::Location, qmdb::{ immutable, immutable::variable::Operation, sync::{ self, engine::{Config, NextStep}, Engine, Target, }, }, translator::TwoCap, }; use commonware_cryptography::{sha256, Sha256}; use commonware_macros::test_traced; use commonware_math::algebra::Random; use commonware_runtime::{ buffer::paged::CacheRef, deterministic, BufferPooler, Metrics, Runner as _, }; use commonware_utils::{ channel::mpsc, non_empty_range, test_rng_seeded, NZUsize, NZU16, NZU64, }; use rand::RngCore as _; use rstest::rstest; use std::{ collections::HashMap, num::{NonZeroU16, NonZeroU64, NonZeroUsize}, sync::Arc, }; /// Type alias for sync tests with simple codec config type ImmutableSyncTest = immutable::variable::Db< crate::merkle::mmr::Family, deterministic::Context, sha256::Digest, sha256::Digest, Sha256, crate::translator::TwoCap, >; /// Create a simple config for sync tests fn create_sync_config( suffix: &str, pooler: &impl BufferPooler, ) -> immutable::variable::Config { const PAGE_SIZE: NonZeroU16 = NZU16!(77); const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9); const ITEMS_PER_SECTION: NonZeroU64 = NZU64!(5); let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE); immutable::Config { merkle_config: crate::merkle::journaled::Config { journal_partition: format!("journal-{suffix}"), metadata_partition: format!("metadata-{suffix}"), items_per_blob: NZU64!(11), write_buffer: NZUsize!(1024), thread_pool: None, page_cache: page_cache.clone(), }, log: crate::journal::contiguous::variable::Config { partition: format!("log-{suffix}"), items_per_section: ITEMS_PER_SECTION, compression: None, codec_config: ((), ()), page_cache, write_buffer: NZUsize!(1024), }, translator: TwoCap, } } /// Create a test database with unique partition names async fn create_test_db(mut context: deterministic::Context) -> ImmutableSyncTest { let seed = context.next_u64(); let config = create_sync_config(&format!("sync-test-{seed}"), &context); ImmutableSyncTest::init(context, config).await.unwrap() } /// Create n random Set operations using the default seed (0). /// create_test_ops(n) is a prefix of create_test_ops(n') for n < n'. fn create_test_ops(n: usize) -> Vec> { create_test_ops_seeded(n, 0) } /// Create n random Set operations using a specific seed. /// Use different seeds when you need non-overlapping keys in the same test. fn create_test_ops_seeded( n: usize, seed: u64, ) -> Vec> { let mut rng = test_rng_seeded(seed); let mut ops = Vec::new(); for _i in 0..n { let key = sha256::Digest::random(&mut rng); let value = sha256::Digest::random(&mut rng); ops.push(Operation::Set(key, value)); } ops } /// Applies the given operations and commits the database. async fn apply_ops( db: &mut ImmutableSyncTest, ops: Vec>, metadata: Option, ) { let mut batch = db.new_batch(); for op in ops { match op { Operation::Set(key, value) => { batch = batch.set(key, value); } Operation::Commit(_metadata) => { panic!("Commit operation not supported in apply_ops"); } } } let merkleized = batch.merkleize(db, metadata); db.apply_batch(merkleized).await.unwrap(); } #[rstest] #[case::singleton_batch_size_one(1, NZU64!(1))] #[case::singleton_batch_size_gt_db_size(1, NZU64!(2))] #[case::batch_size_one(1000, NZU64!(1))] #[case::floor_div_db_batch_size(1000, NZU64!(3))] #[case::floor_div_db_batch_size_2(1000, NZU64!(999))] #[case::div_db_batch_size(1000, NZU64!(100))] #[case::db_size_eq_batch_size(1000, NZU64!(1000))] #[case::batch_size_gt_db_size(1000, NZU64!(1001))] fn test_sync(#[case] target_db_ops: usize, #[case] fetch_batch_size: NonZeroU64) { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let mut target_db = create_test_db(context.with_label("target")).await; let target_db_ops = create_test_ops(target_db_ops); apply_ops(&mut target_db, target_db_ops.clone(), Some(Sha256::fill(1))).await; let bounds = target_db.bounds().await; let target_op_count = bounds.end; let target_oldest_retained_loc = bounds.start; let target_root = target_db.root(); // Capture target database state before moving into config let mut expected_kvs: HashMap = HashMap::new(); for op in &target_db_ops { if let Operation::Set(key, value) = op { expected_kvs.insert(*key, *value); } } let db_config = create_sync_config(&format!("sync_client_{}", context.next_u64()), &context); let target_db = Arc::new(target_db); let config = Config { db_config: db_config.clone(), fetch_batch_size, target: Target { root: target_root, range: non_empty_range!(target_oldest_retained_loc, target_op_count), }, context: context.with_label("client"), resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; let got_db: ImmutableSyncTest = sync::sync(config).await.unwrap(); // Verify database state let bounds = got_db.bounds().await; assert_eq!(bounds.end, target_op_count); assert_eq!(bounds.start, target_oldest_retained_loc); // Verify the root digest matches the target assert_eq!(got_db.root(), target_root); // Verify that the synced database matches the target state for (key, expected_value) in &expected_kvs { let synced_value = got_db.get(key).await.unwrap(); assert_eq!(synced_value, Some(*expected_value)); } // Put more key-value pairs into both databases let mut new_ops = Vec::new(); let mut rng = test_rng_seeded(1); let mut new_kvs: HashMap = HashMap::new(); for _i in 0..expected_kvs.len() { let key = sha256::Digest::random(&mut rng); let value = sha256::Digest::random(&mut rng); new_ops.push(Operation::Set(key, value)); new_kvs.insert(key, value); } // Apply new operations to both databases. let mut got_db = got_db; apply_ops(&mut got_db, new_ops.clone(), None).await; let mut target_db = Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("target_db should have no other references")); apply_ops(&mut target_db, new_ops.clone(), None).await; // Verify both databases have the new values for (key, expected_value) in &new_kvs { let synced_value = got_db.get(key).await.unwrap(); assert_eq!(synced_value, Some(*expected_value)); let target_value = target_db.get(key).await.unwrap(); assert_eq!(target_value, Some(*expected_value)); } got_db.destroy().await.unwrap(); target_db.destroy().await.unwrap(); }); } /// Test that sync works when the target database is initially empty #[test_traced("WARN")] fn test_sync_empty_to_nonempty() { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create an empty target database let mut target_db = create_test_db(context.with_label("target")).await; // Commit to establish a valid root apply_ops(&mut target_db, vec![], Some(Sha256::fill(1))).await; let bounds = target_db.bounds().await; let target_op_count = bounds.end; let target_oldest_retained_loc = bounds.start; let target_root = target_db.root(); let db_config = create_sync_config(&format!("empty_sync_{}", context.next_u64()), &context); let target_db = Arc::new(target_db); let config = Config { db_config, fetch_batch_size: NZU64!(10), target: Target { root: target_root, range: non_empty_range!(target_oldest_retained_loc, target_op_count), }, context: context.with_label("client"), resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; let got_db: ImmutableSyncTest = sync::sync(config).await.unwrap(); // Verify database state let bounds = got_db.bounds().await; assert_eq!(bounds.end, target_op_count); assert_eq!(bounds.start, target_oldest_retained_loc); assert_eq!(got_db.root(), target_root); assert_eq!(got_db.get_metadata().await.unwrap(), Some(Sha256::fill(1))); got_db.destroy().await.unwrap(); let target_db = Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("Failed to unwrap Arc - still has references")); target_db.destroy().await.unwrap(); }); } /// Test demonstrating that a synced database can be reopened and retain its state. #[test_traced("WARN")] fn test_sync_database_persistence() { let executor = deterministic::Runner::default(); executor.start(|context| async move { // Create and populate a simple target database let mut target_db = create_test_db(context.with_label("target")).await; let target_ops = create_test_ops(10); apply_ops(&mut target_db, target_ops.clone(), Some(Sha256::fill(0))).await; // Capture target state let target_root = target_db.root(); let bounds = target_db.bounds().await; let lower_bound = bounds.start; let op_count = bounds.end; // Perform sync let db_config = create_sync_config("persistence-test", &context); let client_context = context.with_label("client"); let target_db = Arc::new(target_db); let config = Config { db_config: db_config.clone(), fetch_batch_size: NZU64!(5), target: Target { root: target_root, range: non_empty_range!(lower_bound, op_count), }, context: client_context.clone(), resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap(); // Verify initial sync worked assert_eq!(synced_db.root(), target_root); // Save state before closing let expected_root = synced_db.root(); let bounds = synced_db.bounds().await; let expected_op_count = bounds.end; let expected_oldest_retained_loc = bounds.start; // Drop & reopen the database to test persistence synced_db.sync().await.unwrap(); drop(synced_db); let reopened_db = ImmutableSyncTest::init(context.with_label("reopened"), db_config) .await .unwrap(); // Verify state is preserved assert_eq!(reopened_db.root(), expected_root); let bounds = reopened_db.bounds().await; assert_eq!(bounds.end, expected_op_count); assert_eq!(bounds.start, expected_oldest_retained_loc); // Verify data integrity for op in &target_ops { if let Operation::Set(key, value) = op { let stored_value = reopened_db.get(key).await.unwrap(); assert_eq!(stored_value, Some(*value)); } } reopened_db.destroy().await.unwrap(); let target_db = Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("Failed to unwrap Arc - still has references")); target_db.destroy().await.unwrap(); }); } /// Test that target updates work correctly during sync #[test_traced("WARN")] fn test_target_update_during_sync() { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate initial target database let mut target_db = create_test_db(context.with_label("target")).await; let initial_ops = create_test_ops(50); apply_ops(&mut target_db, initial_ops.clone(), None).await; // Capture the state after first commit let bounds = target_db.bounds().await; let initial_lower_bound = bounds.start; let initial_upper_bound = bounds.end; let initial_root = target_db.root(); // Add more operations to create the extended target // (use different seed to avoid key collisions) let additional_ops = create_test_ops_seeded(25, 1); apply_ops(&mut target_db, additional_ops.clone(), None).await; let final_upper_bound = target_db.bounds().await.end; let final_root = target_db.root(); // Wrap target database for shared mutable access let target_db = Arc::new(target_db); // Create client with initial smaller target and very small batch size let (update_sender, update_receiver) = mpsc::channel(1); let client = { let config = Config { context: context.with_label("client"), db_config: create_sync_config( &format!("update_test_{}", context.next_u64()), &context, ), target: Target { root: initial_root, range: non_empty_range!(initial_lower_bound, initial_upper_bound), }, resolver: target_db.clone(), fetch_batch_size: NZU64!(2), // Very small batch size to ensure multiple batches needed max_outstanding_requests: 10, apply_batch_size: 1024, update_rx: Some(update_receiver), finish_rx: None, reached_target_tx: None, max_retained_roots: 1, }; let mut client: Engine = Engine::new(config).await.unwrap(); loop { // Step the client until we have processed a batch of operations client = match client.step().await.unwrap() { NextStep::Continue(new_client) => new_client, NextStep::Complete(_) => panic!("client should not be complete"), }; let log_size = crate::journal::contiguous::Contiguous::size(client.journal()).await; if log_size > initial_lower_bound { break client; } } }; // Send target update with SAME lower bound but higher upper bound update_sender .send(Target { root: final_root, range: non_empty_range!(initial_lower_bound, final_upper_bound), }) .await .unwrap(); // Complete the sync let synced_db = client.sync().await.unwrap(); // Verify the synced database has the expected final state assert_eq!(synced_db.root(), final_root); // Verify the target database matches the synced database let target_db = Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("Failed to unwrap Arc - still has references")); { let bounds = synced_db.bounds().await; let target_bounds = target_db.bounds().await; assert_eq!(bounds.end, target_bounds.end); assert_eq!(bounds.start, target_bounds.start); assert_eq!(synced_db.root(), target_db.root()); } // Verify all expected operations are present in the synced database let all_ops = [initial_ops, additional_ops].concat(); for op in &all_ops { if let Operation::Set(key, value) = op { let synced_value = synced_db.get(key).await.unwrap(); assert_eq!(synced_value, Some(*value)); } } synced_db.destroy().await.unwrap(); target_db.destroy().await.unwrap(); }); } /// Test that sync works when target database has operations beyond the requested range /// of operations to sync. #[test] fn test_sync_subset_of_target_database() { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let mut target_db = create_test_db(context.with_label("target")).await; let target_ops = create_test_ops(30); // Apply all but the last operation apply_ops(&mut target_db, target_ops[..29].to_vec(), None).await; let target_root = target_db.root(); let bounds = target_db.bounds().await; let lower_bound = bounds.start; let op_count = bounds.end; // Add final op after capturing the range apply_ops(&mut target_db, target_ops[29..].to_vec(), None).await; let target_db = Arc::new(target_db); let config = Config { db_config: create_sync_config(&format!("subset_{}", context.next_u64()), &context), fetch_batch_size: NZU64!(10), target: Target { root: target_root, range: non_empty_range!(lower_bound, op_count), }, context: context.with_label("client"), resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap(); // Verify state matches the specified range assert_eq!(synced_db.root(), target_root); assert_eq!(synced_db.bounds().await.end, op_count); synced_db.destroy().await.unwrap(); let target_db = Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc")); target_db.destroy().await.unwrap(); }); } // Test syncing where the sync client has some but not all of the operations in the target // database. #[test] fn test_sync_use_existing_db_partial_match() { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let original_ops = create_test_ops(50); // Create two databases let mut target_db = create_test_db(context.with_label("target")).await; let sync_db_config = create_sync_config(&format!("partial_{}", context.next_u64()), &context); let client_context = context.with_label("client"); let mut sync_db: ImmutableSyncTest = immutable::variable::Db::init(client_context.clone(), sync_db_config.clone()) .await .unwrap(); // Apply the same operations to both databases apply_ops(&mut target_db, original_ops.clone(), None).await; apply_ops(&mut sync_db, original_ops.clone(), None).await; drop(sync_db); // Add one more operation and commit the target database // (use different seed to avoid key collisions) let last_op = create_test_ops_seeded(1, 1); apply_ops(&mut target_db, last_op.clone(), None).await; let root = target_db.root(); let bounds = target_db.bounds().await; let lower_bound = bounds.start; let upper_bound = bounds.end; // Up to the last operation // Reopen the sync database and sync it to the target database let target_db = Arc::new(target_db); let config = Config { db_config: sync_db_config, // Use same config as before fetch_batch_size: NZU64!(10), target: Target { root, range: non_empty_range!(lower_bound, upper_bound), }, context: context.with_label("sync"), resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap(); // Verify database state assert_eq!(sync_db.bounds().await.end, upper_bound); assert_eq!(sync_db.root(), root); sync_db.destroy().await.unwrap(); let target_db = Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc")); target_db.destroy().await.unwrap(); }); } /// Test case where existing database on disk exactly matches the sync target #[test] fn test_sync_use_existing_db_exact_match() { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { let target_ops = create_test_ops(40); // Create two databases let mut target_db = create_test_db(context.with_label("target")).await; let sync_config = create_sync_config(&format!("exact_{}", context.next_u64()), &context); let client_context = context.with_label("client"); let mut sync_db: ImmutableSyncTest = immutable::variable::Db::init(client_context.clone(), sync_config.clone()) .await .unwrap(); // Apply the same operations to both databases apply_ops(&mut target_db, target_ops.clone(), None).await; apply_ops(&mut sync_db, target_ops.clone(), None).await; drop(sync_db); // Prepare target let root = target_db.root(); let bounds = target_db.bounds().await; let lower_bound = bounds.start; let upper_bound = bounds.end; // Sync should complete immediately without fetching let resolver = Arc::new(target_db); let config = Config { db_config: sync_config, fetch_batch_size: NZU64!(10), target: Target { root, range: non_empty_range!(lower_bound, upper_bound), }, context: context.with_label("sync"), resolver: resolver.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, finish_rx: None, reached_target_tx: None, max_retained_roots: 8, }; let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap(); assert_eq!(sync_db.bounds().await.end, upper_bound); assert_eq!(sync_db.root(), root); sync_db.destroy().await.unwrap(); let target_db = Arc::try_unwrap(resolver).unwrap_or_else(|_| panic!("failed to unwrap Arc")); target_db.destroy().await.unwrap(); }); } /// Test that the client fails to sync if the lower bound is decreased #[test_traced("WARN")] fn test_target_update_lower_bound_decrease() { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate target database let mut target_db = create_test_db(context.with_label("target")).await; let target_ops = create_test_ops(100); apply_ops(&mut target_db, target_ops, None).await; target_db.prune(Location::new(10)).await.unwrap(); // Capture initial target state let bounds = target_db.bounds().await; let initial_lower_bound = bounds.start; let initial_upper_bound = bounds.end; let initial_root = target_db.root(); // Create client with initial target let (update_sender, update_receiver) = mpsc::channel(1); let target_db = Arc::new(target_db); let config = Config { context: context.with_label("client"), db_config: create_sync_config(&format!("lb-dec-{}", context.next_u64()), &context), fetch_batch_size: NZU64!(5), target: Target { root: initial_root, range: non_empty_range!(initial_lower_bound, initial_upper_bound), }, resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 10, update_rx: Some(update_receiver), finish_rx: None, reached_target_tx: None, max_retained_roots: 1, }; let client: Engine = Engine::new(config).await.unwrap(); // Send target update with decreased lower bound update_sender .send(Target { root: initial_root, range: non_empty_range!( initial_lower_bound.checked_sub(1).unwrap(), initial_upper_bound ), }) .await .unwrap(); let result = client.step().await; assert!(matches!( result, Err(sync::Error::Engine( sync::EngineError::SyncTargetMovedBackward { .. } )) )); let target_db = Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc")); target_db.destroy().await.unwrap(); }); } /// Test that the client fails to sync if the upper bound is decreased #[test_traced("WARN")] fn test_target_update_upper_bound_decrease() { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate target database let mut target_db = create_test_db(context.with_label("target")).await; let target_ops = create_test_ops(50); apply_ops(&mut target_db, target_ops, None).await; // Capture initial target state let bounds = target_db.bounds().await; let initial_lower_bound = bounds.start; let initial_upper_bound = bounds.end; let initial_root = target_db.root(); // Create client with initial target let (update_sender, update_receiver) = mpsc::channel(1); let target_db = Arc::new(target_db); let config = Config { context: context.with_label("client"), db_config: create_sync_config(&format!("ub-dec-{}", context.next_u64()), &context), fetch_batch_size: NZU64!(5), target: Target { root: initial_root, range: non_empty_range!(initial_lower_bound, initial_upper_bound), }, resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 10, update_rx: Some(update_receiver), finish_rx: None, reached_target_tx: None, max_retained_roots: 1, }; let client: Engine = Engine::new(config).await.unwrap(); // Send target update with decreased upper bound update_sender .send(Target { root: initial_root, range: non_empty_range!(initial_lower_bound, initial_upper_bound - 1), }) .await .unwrap(); let result = client.step().await; assert!(matches!( result, Err(sync::Error::Engine( sync::EngineError::SyncTargetMovedBackward { .. } )) )); let target_db = Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc")); target_db.destroy().await.unwrap(); }); } /// Test that the client succeeds when bounds are updated #[test_traced("WARN")] fn test_target_update_bounds_increase() { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate target database let mut target_db = create_test_db(context.with_label("target")).await; let target_ops = create_test_ops(100); apply_ops(&mut target_db, target_ops.clone(), None).await; // Capture initial target state let bounds = target_db.bounds().await; let initial_lower_bound = bounds.start; let initial_upper_bound = bounds.end; let initial_root = target_db.root(); // Apply more operations to the target database // (use different seed to avoid key collisions) let more_ops = create_test_ops_seeded(5, 1); apply_ops(&mut target_db, more_ops, None).await; target_db.prune(Location::new(10)).await.unwrap(); apply_ops(&mut target_db, vec![], None).await; // Capture final target state let bounds = target_db.bounds().await; let final_lower_bound = bounds.start; let final_upper_bound = bounds.end; let final_root = target_db.root(); // Assert we're actually updating the bounds assert_ne!(final_lower_bound, initial_lower_bound); assert_ne!(final_upper_bound, initial_upper_bound); // Create client with initial target let (update_sender, update_receiver) = mpsc::channel(1); let target_db = Arc::new(target_db); let config = Config { context: context.with_label("client"), db_config: create_sync_config( &format!("bounds_inc_{}", context.next_u64()), &context, ), fetch_batch_size: NZU64!(1), target: Target { root: initial_root, range: non_empty_range!(initial_lower_bound, initial_upper_bound), }, resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: Some(update_receiver), finish_rx: None, reached_target_tx: None, max_retained_roots: 1, }; // Send target update with increased upper bound update_sender .send(Target { root: final_root, range: non_empty_range!(final_lower_bound, final_upper_bound), }) .await .unwrap(); // Complete the sync let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap(); // Verify the synced database has the expected state assert_eq!(synced_db.root(), final_root); let bounds = synced_db.bounds().await; assert_eq!(bounds.end, final_upper_bound); assert_eq!(bounds.start, final_lower_bound); synced_db.destroy().await.unwrap(); let target_db = Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("Failed to unwrap Arc - still has references")); target_db.destroy().await.unwrap(); }); } /// Test that target updates can be sent even after the client is done #[test_traced("WARN")] fn test_target_update_on_done_client() { let executor = deterministic::Runner::default(); executor.start(|mut context| async move { // Create and populate target database let mut target_db = create_test_db(context.with_label("target")).await; let target_ops = create_test_ops(10); apply_ops(&mut target_db, target_ops, None).await; // Capture target state let bounds = target_db.bounds().await; let lower_bound = bounds.start; let upper_bound = bounds.end; let root = target_db.root(); // Create client with target that will complete immediately let (update_sender, update_receiver) = mpsc::channel(1); let target_db = Arc::new(target_db); let config = Config { context: context.with_label("client"), db_config: create_sync_config(&format!("done_{}", context.next_u64()), &context), fetch_batch_size: NZU64!(20), target: Target { root, range: non_empty_range!(lower_bound, upper_bound), }, resolver: target_db.clone(), apply_batch_size: 1024, max_outstanding_requests: 10, update_rx: Some(update_receiver), finish_rx: None, reached_target_tx: None, max_retained_roots: 1, }; // Complete the sync let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap(); // Attempt to apply a target update after sync is complete to verify we don't panic let _ = update_sender .send(Target { root: sha256::Digest::from([2u8; 32]), range: non_empty_range!(lower_bound + 1, upper_bound + 1), }) .await; // Verify the synced database has the expected state assert_eq!(synced_db.root(), root); let bounds = synced_db.bounds().await; assert_eq!(bounds.end, upper_bound); assert_eq!(bounds.start, lower_bound); synced_db.destroy().await.unwrap(); Arc::try_unwrap(target_db) .unwrap_or_else(|_| panic!("failed to unwrap Arc")) .destroy() .await .unwrap(); }); } }