use crate::{append_fixed_random_data, get_fixed_journal}; use commonware_runtime::{ benchmarks::{context, tokio}, tokio::{Config, Context, Runner}, Runner as _, Supervisor as _, }; use commonware_storage::journal::contiguous::{fixed::Journal, Reader as _}; use commonware_utils::{sequence::FixedBytes, NZU64}; use criterion::{criterion_group, Criterion}; use futures::future::try_join_all; use rand::{rngs::StdRng, Rng, SeedableRng}; use std::{ hint::black_box, num::NonZeroU64, time::{Duration, Instant}, }; /// Partition name to use in the journal config. const PARTITION: &str = "test-partition"; /// Value of items_per_blob to use in the journal config. const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(10_000); /// Number of items to write to the journal we will be reading from. const ITEMS_TO_WRITE: u64 = 5_000_000; /// Size of each journal item in bytes. const ITEM_SIZE: usize = 32; /// Read `items_to_read` random items from the given `journal`, awaiting each /// result before continuing. async fn bench_run_serial(journal: &Journal>, items_to_read: usize) { let reader = journal.reader().await; let mut rng = StdRng::seed_from_u64(0); for _ in 0..items_to_read { let pos = rng.gen_range(0..ITEMS_TO_WRITE); black_box(reader.read(pos).await.expect("failed to read data")); } } /// Concurrently read (via try_join_all) `items_to_read` random items from the given `journal`. async fn bench_run_concurrent( journal: &Journal>, items_to_read: usize, ) { let reader = journal.reader().await; let mut rng = StdRng::seed_from_u64(0); let mut futures = Vec::with_capacity(items_to_read); for _ in 0..items_to_read { let pos = rng.gen_range(0..ITEMS_TO_WRITE); futures.push(reader.read(pos)); } try_join_all(futures).await.expect("failed to read data"); } /// Batch-read `items_to_read` random items via `read_many`. async fn bench_run_read_many( journal: &Journal>, items_to_read: usize, ) { let reader = journal.reader().await; let mut rng = StdRng::seed_from_u64(0); let mut positions: Vec = (0..items_to_read) .map(|_| rng.gen_range(0..ITEMS_TO_WRITE)) .collect(); positions.sort_unstable(); positions.dedup(); black_box( reader .read_many(&positions) .await .expect("failed to read data"), ); } fn bench_fixed_read_random(c: &mut Criterion) { let cfg = Config::default(); let mut initialized = false; let runner = tokio::Runner::new(cfg.clone()); for mode in ["serial", "concurrent", "read_many"] { for items_to_read in [100, 1_000, 10_000, 100_000] { c.bench_function( &format!( "{}/mode={} items={} size={}", module_path!(), mode, items_to_read, ITEM_SIZE ), |b| { // Setup: populate journal (once, on first sample). if !initialized { Runner::new(cfg.clone()).start(|ctx| async move { let mut j = get_fixed_journal(ctx, PARTITION, ITEMS_PER_BLOB).await; append_fixed_random_data::<_, ITEM_SIZE>(&mut j, ITEMS_TO_WRITE).await; j.sync().await.unwrap(); }); initialized = true; } // Benchmark: measure read time. b.to_async(&runner).iter_custom(|iters| async move { let ctx = context::get::(); let j = get_fixed_journal(ctx.child("storage"), PARTITION, ITEMS_PER_BLOB) .await; let mut duration = Duration::ZERO; for _ in 0..iters { let start = Instant::now(); match mode { "serial" => bench_run_serial(&j, items_to_read).await, "concurrent" => bench_run_concurrent(&j, items_to_read).await, "read_many" => bench_run_read_many(&j, items_to_read).await, _ => unreachable!(), } duration += start.elapsed(); } duration }); }, ); } } // Cleanup: destroy journal. if initialized { Runner::new(cfg).start(|context| async move { let j = get_fixed_journal::(context, PARTITION, ITEMS_PER_BLOB).await; j.destroy().await.unwrap(); }); } } criterion_group! { name = benches; config = Criterion::default().sample_size(10); targets = bench_fixed_read_random }