#![no_main] use arbitrary::Arbitrary; use commonware_cryptography::Sha256; use commonware_runtime::{buffer::paged::CacheRef, deterministic, BufferPooler, Metrics, Runner}; use commonware_storage::{ qmdb::{ any::{ unordered::fixed::{Db, Operation as FixedOperation}, FixedConfig as Config, }, store::LogStore as _, sync, }, translator::TwoCap, }; use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64}; use libfuzzer_sys::fuzz_target; use std::{num::NonZeroU16, sync::Arc}; type Key = FixedBytes<32>; type Value = FixedBytes<32>; type FixedDb = Db; const MAX_OPERATIONS: usize = 50; #[derive(Debug)] enum Operation { // Basic ops to build source state Update { key: [u8; 32], value: [u8; 32] }, Delete { key: [u8; 32] }, Commit, Prune, // Sync scenarios SyncFull { fetch_batch_size: u64 }, // Failure simulation SimulateFailure, } impl<'a> Arbitrary<'a> for Operation { fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { let choice: u8 = u.arbitrary()?; match choice % 8 { 0 | 1 => { let key = u.arbitrary()?; let value = u.arbitrary()?; Ok(Operation::Update { key, value }) } 2 => { let key = u.arbitrary()?; Ok(Operation::Delete { key }) } 3 => Ok(Operation::Commit), 4 => Ok(Operation::Prune), 5 => { let fetch_batch_size = u.arbitrary()?; Ok(Operation::SyncFull { fetch_batch_size }) } 6 => Ok(Operation::SimulateFailure {}), 7 => { let key = u.arbitrary()?; Ok(Operation::Delete { key }) } _ => unreachable!(), } } } #[derive(Debug)] struct FuzzInput { ops: Vec, commit_counter: u64, } impl<'a> Arbitrary<'a> for FuzzInput { fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { let num_ops = u.int_in_range(1..=MAX_OPERATIONS)?; let ops = (0..num_ops) .map(|_| Operation::arbitrary(u)) .collect::, _>>()?; Ok(FuzzInput { ops, commit_counter: 0, }) } } const PAGE_SIZE: NonZeroU16 = NZU16!(129); fn test_config(test_name: &str, pooler: &impl BufferPooler) -> Config { Config { mmr_journal_partition: format!("{test_name}-mmr"), mmr_metadata_partition: format!("{test_name}-meta"), mmr_items_per_blob: NZU64!(3), mmr_write_buffer: NZUsize!(1024), log_journal_partition: format!("{test_name}-log"), log_items_per_blob: NZU64!(3), log_write_buffer: NZUsize!(1024), translator: TwoCap, thread_pool: None, page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, NZUsize!(1)), } } async fn test_sync< R: sync::resolver::Resolver< Digest = commonware_cryptography::sha256::Digest, Op = FixedOperation, >, >( context: deterministic::Context, resolver: R, target: sync::Target, fetch_batch_size: u64, test_name: &str, sync_id: usize, ) -> bool { let db_config = test_config(test_name, &context); let expected_root = target.root; let sync_config: sync::engine::Config = sync::engine::Config { context: context.with_label("sync").with_attribute("id", sync_id), update_rx: None, db_config, fetch_batch_size: NZU64!((fetch_batch_size % 100) + 1), target, resolver, apply_batch_size: 100, max_outstanding_requests: 10, }; if let Ok(synced) = sync::sync(sync_config).await { let actual_root = synced.root(); assert_eq!( actual_root, expected_root, "Synced root must match target root" ); synced.destroy().await.is_ok() } else { false } } const TEST_NAME: &str = "qmdb-any-fixed-fuzz-test"; fn fuzz(mut input: FuzzInput) { let runner = deterministic::Runner::default(); runner.start(|context| async move { let cfg = test_config(TEST_NAME, &context); let mut db = FixedDb::init(context.clone(), cfg) .await .expect("Failed to init source db"); let mut restarts = 0usize; let mut sync_id = 0; let mut pending_writes: Vec<(Key, Option)> = Vec::new(); for op in &input.ops { match op { Operation::Update { key, value } => { pending_writes.push((Key::new(*key), Some(Value::new(*value)))); } Operation::Delete { key } => { pending_writes.push((Key::new(*key), None)); } Operation::Commit => { let mut commit_id = [0u8; 32]; if input.commit_counter == 0 { assert!(db.get_metadata().await.unwrap().is_none()); } else { commit_id[..8].copy_from_slice(&input.commit_counter.to_be_bytes()); assert_eq!( db.get_metadata().await.unwrap().unwrap(), FixedBytes::new(commit_id), ); } input.commit_counter += 1; commit_id[..8].copy_from_slice(&input.commit_counter.to_be_bytes()); let finalized = { let mut batch = db.new_batch(); for (k, v) in pending_writes.drain(..) { batch.write(k, v); } batch .merkleize(Some(FixedBytes::new(commit_id))) .await .unwrap() .finalize() }; db.apply_batch(finalized) .await .expect("Commit should not fail"); } Operation::Prune => { db.prune(db.inactivity_floor_loc()) .await .expect("Prune should not fail"); } Operation::SyncFull { fetch_batch_size } => { if db.bounds().await.end == 0 { continue; } input.commit_counter += 1; let mut commit_id = [0u8; 32]; commit_id[..8].copy_from_slice(&input.commit_counter.to_be_bytes()); let finalized = { let mut batch = db.new_batch(); for (k, v) in pending_writes.drain(..) { batch.write(k, v); } batch .merkleize(Some(FixedBytes::new(commit_id))) .await .unwrap() .finalize() }; db.apply_batch(finalized) .await .expect("commit should not fail"); let target = sync::Target { root: db.root(), range: db.inactivity_floor_loc()..db.bounds().await.end, }; let wrapped_src = Arc::new(db); let _result = test_sync( context.clone(), wrapped_src.clone(), target, *fetch_batch_size, &format!("full_{sync_id}"), sync_id, ) .await; db = Arc::try_unwrap(wrapped_src) .unwrap_or_else(|_| panic!("Failed to unwrap src")); sync_id += 1; } Operation::SimulateFailure => { // Simulate unclean shutdown by dropping the db without committing pending_writes.clear(); drop(db); let cfg = test_config(TEST_NAME, &context); db = FixedDb::init( context .with_label("db") .with_attribute("instance", restarts), cfg, ) .await .expect("Failed to init source db"); restarts += 1; } } } let finalized = { let mut batch = db.new_batch(); for (k, v) in pending_writes.drain(..) { batch.write(k, v); } batch.merkleize(None).await.unwrap().finalize() }; db.apply_batch(finalized) .await .expect("commit should not fail"); db.destroy().await.expect("Destroy should not fail"); }); } fuzz_target!(|input: FuzzInput| { fuzz(input); });