//! Position-based journal for variable-length items. //! //! This journal enforces section fullness: all non-final sections are full and synced. //! On init, only the last section needs to be replayed to determine the exact size. use super::Reader as _; use crate::{ journal::{ contiguous::{fixed, Contiguous, Mutable}, segmented::variable, Error, }, Persistable, }; use commonware_codec::{Codec, CodecShared}; use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage}; use commonware_utils::{ sync::{AsyncRwLockReadGuard, UpgradableAsyncRwLock}, NZUsize, }; #[commonware_macros::stability(ALPHA)] use core::ops::Range; use futures::{stream, Stream, StreamExt as _}; use std::num::{NonZeroU64, NonZeroUsize}; #[commonware_macros::stability(ALPHA)] use tracing::debug; use tracing::warn; const REPLAY_BUFFER_SIZE: NonZeroUsize = NZUsize!(1024); /// Suffix appended to the base partition name for the data journal. const DATA_SUFFIX: &str = "_data"; /// Suffix appended to the base partition name for the offsets journal. const OFFSETS_SUFFIX: &str = "_offsets"; /// Calculate the section number for a given position. /// /// # Arguments /// /// * `position` - The absolute position in the journal /// * `items_per_section` - The number of items stored in each section /// /// # Returns /// /// The section number where the item at `position` should be stored. /// /// # Examples /// /// ```ignore /// // With 10 items per section: /// assert_eq!(position_to_section(0, 10), 0); // position 0 -> section 0 /// assert_eq!(position_to_section(9, 10), 0); // position 9 -> section 0 /// assert_eq!(position_to_section(10, 10), 1); // position 10 -> section 1 /// assert_eq!(position_to_section(25, 10), 2); // position 25 -> section 2 /// assert_eq!(position_to_section(30, 10), 3); // position 30 -> section 3 /// ``` const fn position_to_section(position: u64, items_per_section: u64) -> u64 { position / items_per_section } /// Configuration for a [Journal]. #[derive(Clone)] pub struct Config { /// Base partition name. Sub-partitions will be created by appending DATA_SUFFIX and OFFSETS_SUFFIX. pub partition: String, /// The number of items to store in each section. /// /// Once set, this value cannot be changed across restarts. /// All non-final sections will be full and persisted. pub items_per_section: NonZeroU64, /// Optional compression level for stored items. pub compression: Option, /// [Codec] configuration for encoding/decoding items. pub codec_config: C, /// Page cache for buffering reads from the underlying storage. pub page_cache: CacheRef, /// Write buffer size for each section. pub write_buffer: NonZeroUsize, } impl Config { /// Returns the partition name for the data journal. fn data_partition(&self) -> String { format!("{}{}", self.partition, DATA_SUFFIX) } /// Returns the partition name for the offsets journal. fn offsets_partition(&self) -> String { format!("{}{}", self.partition, OFFSETS_SUFFIX) } } /// Inner journal state protected by a lock for interior mutability. struct Inner { /// The underlying variable-length data journal. data: variable::Journal, /// The next position to be assigned on append (total items appended). /// /// # Invariant /// /// Always >= `pruning_boundary`. Equal when data journal is empty or fully pruned. size: u64, /// The position before which all items have been pruned. /// /// After normal operation and pruning, the value is section-aligned. /// After `init_at_size(N)`, the value may be mid-section. /// /// # Invariant /// /// Never decreases (pruning only moves forward). pruning_boundary: u64, } impl Inner { /// Read the item at the given position using the provided offsets reader. /// /// # Errors /// /// - Returns [Error::ItemPruned] if the item at `position` has been pruned. /// - Returns [Error::ItemOutOfRange] if `position` is beyond the journal size. /// - Returns other errors if storage or decoding fails. async fn read( &self, position: u64, items_per_section: u64, offsets: &impl super::Reader, ) -> Result { if position >= self.size { return Err(Error::ItemOutOfRange(position)); } if position < self.pruning_boundary { return Err(Error::ItemPruned(position)); } let offset = offsets.read(position).await?; let section = position_to_section(position, items_per_section); self.data.get(section, offset).await } } /// A contiguous journal with variable-size entries. /// /// This journal manages section assignment automatically, allowing callers to append items /// sequentially without manually tracking section numbers. /// /// # Repair /// /// Like /// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505) /// and /// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445), /// the first invalid data read will be considered the new end of the journal (and the /// underlying [Blob](commonware_runtime::Blob) will be truncated to the last valid item). Repair occurs during /// init via the underlying segmented journals. /// /// # Invariants /// /// ## 1. Section Fullness /// /// All non-final sections are full (`items_per_section` items) and persisted. This ensures /// that on `init()`, we only need to replay the last section to determine the exact size. /// /// ## 2. Data Journal is Source of Truth /// /// The data journal is always the source of truth. The offsets journal is an index /// that may temporarily diverge during crashes. Divergences are automatically /// aligned during init(): /// * If offsets.size() < data.size(): Rebuild missing offsets by replaying data. /// (This can happen if we crash after writing data journal but before writing offsets journal) /// * If offsets.size() > data.size(): Rewind offsets to match data size. /// (This can happen if we crash after rewinding data journal but before rewinding offsets journal) /// * If offsets.bounds().start < data.bounds().start: Prune offsets to match /// (This can happen if we crash after pruning data journal but before pruning offsets journal) /// /// Note that we don't recover from the case where offsets.bounds().start > /// data.bounds().start. This should never occur because we always prune the data journal /// before the offsets journal. pub struct Journal { /// Inner state for data journal metadata. /// /// Serializes persistence and write operations (`sync`, `append`, `prune`, `rewind`) to prevent /// race conditions while allowing concurrent reads during sync. inner: UpgradableAsyncRwLock>, /// Index mapping positions to byte offsets within the data journal. /// The section can be calculated from the position using items_per_section. offsets: fixed::Journal, /// The number of items per section. /// /// # Invariant /// /// This value is immutable after initialization and must remain consistent /// across restarts. Changing this value will result in data loss or corruption. items_per_section: u64, } /// A reader guard that holds a consistent snapshot of the variable journal's bounds. pub struct Reader<'a, E: Clock + Storage + Metrics, V: Codec> { guard: AsyncRwLockReadGuard<'a, Inner>, offsets: fixed::Reader<'a, E, u64>, items_per_section: u64, } impl super::Reader for Reader<'_, E, V> { type Item = V; fn bounds(&self) -> std::ops::Range { self.guard.pruning_boundary..self.guard.size } async fn read(&self, position: u64) -> Result { self.guard .read(position, self.items_per_section, &self.offsets) .await } async fn replay( &self, buffer_size: NonZeroUsize, start_pos: u64, ) -> Result> + Send, Error> { // Validate bounds. if start_pos < self.guard.pruning_boundary { return Err(Error::ItemPruned(start_pos)); } if start_pos > self.guard.size { return Err(Error::ItemOutOfRange(start_pos)); } // Get the starting offset and section. For empty range (start_pos == size), // use a section beyond existing data so data.replay returns empty naturally. let (start_section, start_offset) = if start_pos < self.guard.size { let offset = self.offsets.read(start_pos).await?; let section = position_to_section(start_pos, self.items_per_section); (section, offset) } else { (u64::MAX, 0) }; let inner_stream = self .guard .data .replay(start_section, start_offset, buffer_size) .await?; // Map the stream to add positions. let stream = inner_stream .zip(stream::iter(start_pos..)) .map(|(result, pos)| result.map(|(_section, _offset, _size, item)| (pos, item))); Ok(stream) } } impl Journal { /// Initialize a contiguous variable journal. /// /// # Crash Recovery /// /// The data journal is the source of truth. If the offsets journal is inconsistent /// it will be updated to match the data journal. pub async fn init(context: E, cfg: Config) -> Result { let items_per_section = cfg.items_per_section.get(); let data_partition = cfg.data_partition(); let offsets_partition = cfg.offsets_partition(); // Initialize underlying variable data journal let mut data = variable::Journal::init( context.with_label("data"), variable::Config { partition: data_partition, compression: cfg.compression, codec_config: cfg.codec_config, page_cache: cfg.page_cache.clone(), write_buffer: cfg.write_buffer, }, ) .await?; // Initialize offsets journal let mut offsets = fixed::Journal::init( context.with_label("offsets"), fixed::Config { partition: offsets_partition, items_per_blob: cfg.items_per_section, page_cache: cfg.page_cache, write_buffer: cfg.write_buffer, }, ) .await?; // Validate and align offsets journal to match data journal let (pruning_boundary, size) = Self::align_journals(&mut data, &mut offsets, items_per_section).await?; Ok(Self { inner: UpgradableAsyncRwLock::new(Inner { data, size, pruning_boundary, }), offsets, items_per_section, }) } /// Initialize an empty [Journal] at the given logical `size`. /// /// Returns a journal with journal.bounds() == Range{start: size, end: size} /// and next append at position `size`. #[commonware_macros::stability(ALPHA)] pub async fn init_at_size(context: E, cfg: Config, size: u64) -> Result { // Initialize empty data journal let data = variable::Journal::init( context.with_label("data"), variable::Config { partition: cfg.data_partition(), compression: cfg.compression, codec_config: cfg.codec_config.clone(), page_cache: cfg.page_cache.clone(), write_buffer: cfg.write_buffer, }, ) .await?; // Initialize offsets journal at the target size let offsets = fixed::Journal::init_at_size( context.with_label("offsets"), fixed::Config { partition: cfg.offsets_partition(), items_per_blob: cfg.items_per_section, page_cache: cfg.page_cache, write_buffer: cfg.write_buffer, }, size, ) .await?; Ok(Self { inner: UpgradableAsyncRwLock::new(Inner { data, size, pruning_boundary: size, }), offsets, items_per_section: cfg.items_per_section.get(), }) } /// Initialize a [Journal] for use in state sync. /// /// The bounds are item locations (not section numbers). This function prepares the /// on-disk journal so that subsequent appends go to the correct physical location for the /// requested range. /// /// Behavior by existing on-disk state: /// - Fresh (no data): returns an empty journal. /// - Stale (all data strictly before `range.start`): destroys existing data and returns an /// empty journal. /// - Overlap within [`range.start`, `range.end`]: /// - Prunes toward `range.start` (section-aligned, so some items before /// `range.start` may be retained) /// - Unexpected data beyond `range.end`: returns [crate::qmdb::Error::UnexpectedData]. /// /// # Arguments /// - `context`: storage context /// - `cfg`: journal configuration /// - `range`: range of item locations to retain /// /// # Returns /// A contiguous journal ready for sync operations. The journal's size will be within the range. /// /// # Errors /// Returns [crate::qmdb::Error::UnexpectedData] if existing data extends beyond `range.end`. #[commonware_macros::stability(ALPHA)] pub(crate) async fn init_sync( context: E, cfg: Config, range: Range, ) -> Result { assert!(!range.is_empty(), "range must not be empty"); debug!( range.start, range.end, items_per_section = cfg.items_per_section.get(), "initializing contiguous variable journal for sync" ); // Initialize contiguous journal let journal = Self::init(context.with_label("journal"), cfg.clone()).await?; let size = journal.size().await; // No existing data - initialize at the start of the sync range if needed if size == 0 { if range.start == 0 { debug!("no existing journal data, returning empty journal"); return Ok(journal); } else { debug!( range.start, "no existing journal data, initializing at sync range start" ); journal.destroy().await?; return Self::init_at_size(context, cfg, range.start).await; } } // Check if data exceeds the sync range if size > range.end { return Err(Error::ItemOutOfRange(size)); } // If all existing data is before our sync range, destroy and recreate fresh if size <= range.start { // All data is stale (ends at or before range.start) debug!( size, range.start, "existing journal data is stale, re-initializing at start position" ); journal.destroy().await?; return Self::init_at_size(context, cfg, range.start).await; } // Prune to lower bound if needed let bounds = journal.reader().await.bounds(); if !bounds.is_empty() && bounds.start < range.start { debug!( oldest_pos = bounds.start, range.start, "pruning journal to sync range start" ); journal.prune(range.start).await?; } Ok(journal) } /// Rewind the journal to the given size, discarding items from the end. /// /// After rewinding to size N, the journal will contain exactly N items, and the next append /// will receive position N. /// /// # Errors /// /// Returns [Error::InvalidRewind] if `size` is larger than current size. /// Returns [Error::ItemPruned] if `size` is smaller than the pruning boundary. /// /// # Warning /// /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called. pub async fn rewind(&self, size: u64) -> Result<(), Error> { let mut inner = self.inner.write().await; // Validate rewind target match size.cmp(&inner.size) { std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)), std::cmp::Ordering::Equal => return Ok(()), // No-op std::cmp::Ordering::Less => {} } // Rewind never updates the pruning boundary. if size < inner.pruning_boundary { return Err(Error::ItemPruned(size)); } // Read the offset of the first item to discard (at position 'size'). let discard_offset = { let offsets_reader = self.offsets.reader().await; offsets_reader.read(size).await? }; let discard_section = position_to_section(size, self.items_per_section); inner .data .rewind_to_offset(discard_section, discard_offset) .await?; self.offsets.rewind(size).await?; // Update our size inner.size = size; Ok(()) } /// Append a new item to the journal, returning its position. /// /// The position returned is a stable, consecutively increasing value starting from 0. /// This position remains constant after pruning. /// /// When a section becomes full, both the data journal and offsets journal are persisted /// to maintain the invariant that all non-final sections are full and consistent. /// /// # Errors /// /// Returns an error if the underlying storage operation fails or if the item cannot /// be encoded. /// /// Errors may leave the journal in an inconsistent state. The journal should be closed and /// reopened to trigger alignment in [Journal::init]. pub async fn append(&self, item: &V) -> Result { // Mutating operations are serialized by taking the write guard. let mut inner = self.inner.write().await; // Calculate which section this position belongs to let section = position_to_section(inner.size, self.items_per_section); // Append to data journal, get offset let (offset, _size) = inner.data.append(section, item).await?; // Append offset to offsets journal let offsets_pos = self.offsets.append(&offset).await?; assert_eq!(offsets_pos, inner.size); // Return the current position let position = inner.size; inner.size += 1; // Return early if no sync is needed (section not full). if !inner.size.is_multiple_of(self.items_per_section) { return Ok(position); } // The section was filled and must be synced. Downgrade so readers can continue during the // sync while mutators remain blocked. let inner = inner.downgrade_to_upgradable(); futures::try_join!(inner.data.sync(section), self.offsets.sync())?; Ok(position) } /// Acquire a reader guard that holds a consistent view of the journal. pub async fn reader(&self) -> Reader<'_, E, V> { Reader { guard: self.inner.read().await, offsets: self.offsets.reader().await, items_per_section: self.items_per_section, } } /// Return the total number of items in the journal, irrespective of pruning. The next value /// appended to the journal will be at this position. pub async fn size(&self) -> u64 { self.inner.read().await.size } /// Prune items at positions strictly less than `min_position`. /// /// Returns `true` if any data was pruned, `false` otherwise. /// /// # Errors /// /// Returns an error if the underlying storage operation fails. /// /// Errors may leave the journal in an inconsistent state. The journal should be closed and /// reopened to trigger alignment in [Journal::init]. pub async fn prune(&self, min_position: u64) -> Result { let mut inner = self.inner.write().await; if min_position <= inner.pruning_boundary { return Ok(false); } // Cap min_position to size to maintain the invariant pruning_boundary <= size let min_position = min_position.min(inner.size); // Calculate section number let min_section = position_to_section(min_position, self.items_per_section); let pruned = inner.data.prune(min_section).await?; if pruned { let new_oldest = (min_section * self.items_per_section).max(inner.pruning_boundary); inner.pruning_boundary = new_oldest; self.offsets.prune(new_oldest).await?; } Ok(pruned) } /// Durably persist the journal. /// /// This is faster than `sync()` but recovery will be required on startup if a crash occurs /// before the next call to `sync()`. pub async fn commit(&self) -> Result<(), Error> { // Serialize with append/prune/rewind so section selection is stable, while still allowing // concurrent readers. let inner = self.inner.upgradable_read().await; let section = position_to_section(inner.size, self.items_per_section); inner.data.sync(section).await } /// Durably persist the journal and ensure recovery is not required on startup. /// /// This is slower than `commit()` but ensures the journal doesn't require recovery on startup. pub async fn sync(&self) -> Result<(), Error> { // Serialize with append/prune/rewind so section selection is stable, while still allowing // concurrent readers. let inner = self.inner.upgradable_read().await; // Persist only the current (final) section of the data journal. // All non-final sections are already persisted per Invariant #1. let section = position_to_section(inner.size, self.items_per_section); // Persist both journals concurrently. These journals may not exist yet if the // previous section was just filled. This is checked internally. futures::try_join!(inner.data.sync(section), self.offsets.sync())?; Ok(()) } /// Remove any underlying blobs created by the journal. /// /// This destroys both the data journal and the offsets journal. pub async fn destroy(self) -> Result<(), Error> { let inner = self.inner.into_inner(); inner.data.destroy().await?; self.offsets.destroy().await } /// Clear all data and reset the journal to a new starting position. /// /// Unlike `destroy`, this keeps the journal alive so it can be reused. /// After clearing, the journal will behave as if initialized with `init_at_size(new_size)`. #[commonware_macros::stability(ALPHA)] pub(crate) async fn clear_to_size(&self, new_size: u64) -> Result<(), Error> { let mut inner = self.inner.write().await; inner.data.clear().await?; self.offsets.clear_to_size(new_size).await?; inner.size = new_size; inner.pruning_boundary = new_size; Ok(()) } /// Align the offsets journal and data journal to be consistent in case a crash occurred /// on a previous run and left the journals in an inconsistent state. /// /// The data journal is the source of truth. This function scans it to determine /// what SHOULD be in the offsets journal, then fixes any mismatches. /// /// # Returns /// /// Returns `(pruning_boundary, size)` for the contiguous journal. async fn align_journals( data: &mut variable::Journal, offsets: &mut fixed::Journal, items_per_section: u64, ) -> Result<(u64, u64), Error> { // === Handle empty data journal case === let items_in_last_section = match data.newest_section() { Some(last_section) => { let stream = data.replay(last_section, 0, REPLAY_BUFFER_SIZE).await?; futures::pin_mut!(stream); let mut count = 0u64; while let Some(result) = stream.next().await { result?; // Propagate replay errors (corruption, etc.) count += 1; } count } None => 0, }; // Data journal is empty if there are no sections or if there is one section and it has no items. // The latter should only occur if a crash occured after opening a data journal blob but // before writing to it. let data_empty = data.is_empty() || (data.num_sections() == 1 && items_in_last_section == 0); if data_empty { let offsets_bounds = { let offsets_reader = offsets.reader().await; offsets_reader.bounds() }; let size = offsets_bounds.end; if !data.is_empty() { // A section exists but contains 0 items. This can happen in two cases: // 1. Rewind crash: we rewound the data journal but crashed before rewinding offsets // 2. First append crash: we opened the first section blob but crashed before writing to it // In both cases, calculate target position from the first remaining section // SAFETY: data is non-empty (checked above) let data_first_section = data.oldest_section().unwrap(); let data_section_start = data_first_section * items_per_section; let target_pos = data_section_start.max(offsets_bounds.start); warn!("crash repair: clearing offsets to {target_pos} (empty section crash)"); offsets.clear_to_size(target_pos).await?; return Ok((target_pos, target_pos)); } // data.blobs is empty. This can happen in two cases: // 1. We completely pruned the data journal but crashed before pruning // the offsets journal. // 2. The data journal was never opened. if !offsets_bounds.is_empty() && offsets_bounds.start < size { // Offsets has unpruned entries but data is gone - clear to match empty state. // We use clear_to_size (not prune) to ensure bounds.start == bounds.end, // even when size is mid-section. warn!("crash repair: clearing offsets to {size} (prune-all crash)"); offsets.clear_to_size(size).await?; } return Ok((size, size)); } // === Handle non-empty data journal case === let data_first_section = data.oldest_section().unwrap(); let data_last_section = data.newest_section().unwrap(); // data_oldest_pos is ALWAYS section-aligned because it's computed from the section index. // This differs from offsets bounds start which can be mid-section after init_at_size. let data_oldest_pos = data_first_section * items_per_section; // Align pruning state // We always prune data before offsets, so offsets should never be "ahead" by a section. { let offsets_bounds = { let offsets_reader = offsets.reader().await; offsets_reader.bounds() }; match ( offsets_bounds.is_empty(), offsets_bounds.start.cmp(&data_oldest_pos), ) { (true, _) => { // Offsets journal is empty but data journal isn't. // It should always be in the same section as the data journal, though. let offsets_first_section = offsets_bounds.start / items_per_section; if offsets_first_section != data_first_section { return Err(Error::Corruption(format!( "offsets journal empty at section {offsets_first_section} != data section {data_first_section}" ))); } warn!( "crash repair: offsets journal empty at {}, will rebuild from data", offsets_bounds.start ); } (false, std::cmp::Ordering::Less) => { // Offsets behind on pruning -- prune to catch up warn!("crash repair: pruning offsets journal to {data_oldest_pos}"); offsets.prune(data_oldest_pos).await?; } (false, std::cmp::Ordering::Greater) => { // Compare sections: same section = valid, different section = corruption. if offsets_bounds.start / items_per_section > data_first_section { return Err(Error::Corruption(format!( "offsets oldest pos ({}) > data oldest pos ({data_oldest_pos})", offsets_bounds.start ))); } } (false, std::cmp::Ordering::Equal) => { // Both journals are pruned to the same position. } } } // Compute the correct logical size // Uses bounds.start from offsets as the anchor because it tracks the exact starting // position, which may be mid-section after init_at_size. // // Note: Corruption checks above ensure bounds.start is in data_first_section, // so the subtraction in oldest_items cannot underflow. // Re-fetch bounds since prune may have been called above. let (offsets_bounds, data_size) = { let offsets_reader = offsets.reader().await; let offsets_bounds = offsets_reader.bounds(); let data_size = if data_first_section == data_last_section { offsets_bounds.start + items_in_last_section } else { let oldest_items = (data_first_section + 1) * items_per_section - offsets_bounds.start; let middle_items = (data_last_section - data_first_section - 1) * items_per_section; offsets_bounds.start + oldest_items + middle_items + items_in_last_section }; (offsets_bounds, data_size) }; // Align sizes let offsets_size = offsets_bounds.end; if offsets_size > data_size { // Crashed after writing offsets but before writing data. warn!("crash repair: rewinding offsets from {offsets_size} to {data_size}"); offsets.rewind(data_size).await?; } else if offsets_size < data_size { // Crashed after writing data but before writing offsets. Self::add_missing_offsets(data, offsets, offsets_size, items_per_section).await?; } // Final invariant checks let pruning_boundary = { let offsets_reader = offsets.reader().await; let offsets_bounds = offsets_reader.bounds(); assert_eq!(offsets_bounds.end, data_size); // After alignment, offsets and data must be in the same section. // We return bounds.start from offsets as the true boundary. assert!( !offsets_bounds.is_empty(), "offsets should have data after alignment" ); assert_eq!( offsets_bounds.start / items_per_section, data_first_section, "offsets and data should be in same oldest section" ); offsets_bounds.start }; offsets.sync().await?; Ok((pruning_boundary, data_size)) } /// Rebuild missing offset entries by replaying the data journal and /// appending the missing entries to the offsets journal. /// /// The data journal is the source of truth. This function brings the offsets /// journal up to date by replaying data items and indexing their positions. /// /// # Warning /// /// - Panics if data journal is empty /// - Panics if `offsets_size` >= `data.size()` async fn add_missing_offsets( data: &variable::Journal, offsets: &mut fixed::Journal, offsets_size: u64, items_per_section: u64, ) -> Result<(), Error> { assert!( !data.is_empty(), "rebuild_offsets called with empty data journal" ); // Find where to start replaying let (start_section, resume_offset, skip_first) = { let offsets_reader = offsets.reader().await; let offsets_bounds = offsets_reader.bounds(); if offsets_bounds.is_empty() { // Offsets empty -- start from first data section // SAFETY: data is non-empty (checked above) let first_section = data.oldest_section().unwrap(); (first_section, 0, false) } else if offsets_bounds.start < offsets_size { // Offsets has items -- resume from last indexed position let last_offset = offsets_reader.read(offsets_size - 1).await?; let last_section = position_to_section(offsets_size - 1, items_per_section); (last_section, last_offset, true) } else { // Offsets fully pruned but data has items -- start from first data section // SAFETY: data is non-empty (checked above) let first_section = data.oldest_section().unwrap(); (first_section, 0, false) } }; // Replay data journal from start position through the end and index all items. // The data journal is the source of truth, so we consume the entire stream. // (replay streams from start_section onwards through all subsequent sections) let stream = data .replay(start_section, resume_offset, REPLAY_BUFFER_SIZE) .await?; futures::pin_mut!(stream); let mut skipped_first = false; while let Some(result) = stream.next().await { let (_section, offset, _size, _item) = result?; // Skip first item if resuming from last indexed offset if skip_first && !skipped_first { skipped_first = true; continue; } offsets.append(&offset).await?; } Ok(()) } } // Implement Contiguous trait for variable-length items impl Contiguous for Journal { type Item = V; async fn reader(&self) -> impl super::Reader + '_ { Self::reader(self).await } async fn size(&self) -> u64 { Self::size(self).await } } impl Mutable for Journal { async fn append(&mut self, item: &Self::Item) -> Result { Self::append(self, item).await } async fn prune(&mut self, min_position: u64) -> Result { Self::prune(self, min_position).await } async fn rewind(&mut self, size: u64) -> Result<(), Error> { Self::rewind(self, size).await } } impl Persistable for Journal { type Error = Error; async fn commit(&self) -> Result<(), Error> { self.commit().await } async fn sync(&self) -> Result<(), Error> { self.sync().await } async fn destroy(self) -> Result<(), Error> { self.destroy().await } } #[cfg(test)] impl Journal { /// Test helper: Read the item at the given position. pub(crate) async fn read(&self, position: u64) -> Result { self.reader().await.read(position).await } /// Test helper: Return the bounds of the journal. pub(crate) async fn bounds(&self) -> std::ops::Range { self.reader().await.bounds() } /// Test helper: Prune the internal data journal directly (simulates crash scenario). pub(crate) async fn test_prune_data(&self, section: u64) -> Result { let mut inner = self.inner.write().await; inner.data.prune(section).await } /// Test helper: Prune the internal offsets journal directly (simulates crash scenario). pub(crate) async fn test_prune_offsets(&self, position: u64) -> Result { self.offsets.prune(position).await } /// Test helper: Rewind the internal offsets journal directly (simulates crash scenario). pub(crate) async fn test_rewind_offsets(&self, position: u64) -> Result<(), Error> { self.offsets.rewind(position).await } /// Test helper: Get the size of the internal offsets journal. pub(crate) async fn test_offsets_size(&self) -> u64 { self.offsets.size().await } /// Test helper: Append directly to the internal data journal (simulates crash scenario). pub(crate) async fn test_append_data( &self, section: u64, item: V, ) -> Result<(u64, u32), Error> { let mut inner = self.inner.write().await; inner.data.append(section, &item).await } /// Test helper: Sync the internal data journal. pub(crate) async fn test_sync_data(&self) -> Result<(), Error> { let inner = self.inner.read().await; inner .data .sync(inner.data.newest_section().unwrap_or(0)) .await } } #[cfg(test)] mod tests { use super::*; use crate::journal::contiguous::tests::run_contiguous_tests; use commonware_macros::test_traced; use commonware_runtime::{buffer::paged::CacheRef, deterministic, Metrics, Runner}; use commonware_utils::{NZUsize, NZU16, NZU64}; use futures::FutureExt as _; use std::num::NonZeroU16; // Use some jank sizes to exercise boundary conditions. const PAGE_SIZE: NonZeroU16 = NZU16!(101); const PAGE_CACHE_SIZE: usize = 2; // Larger page sizes for tests that need more buffer space. const LARGE_PAGE_SIZE: NonZeroU16 = NZU16!(1024); const SMALL_PAGE_SIZE: NonZeroU16 = NZU16!(512); /// Test that complete offsets partition loss after pruning is detected as unrecoverable. /// /// When the offsets partition is completely lost and the data has been pruned, we cannot /// rebuild the index with correct position alignment (would require creating placeholder blobs). /// This is a genuine external failure that should be detected and reported clearly. #[test_traced] fn test_variable_offsets_partition_loss_after_prune_unrecoverable() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "offsets-loss-after-prune".into(), items_per_section: NZU64!(10), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), write_buffer: NZUsize!(1024), }; // === Phase 1: Create journal with data and prune === let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone()) .await .unwrap(); // Append 40 items across 4 sections (0-3) for i in 0..40u64 { journal.append(&(i * 100)).await.unwrap(); } // Prune to position 20 (removes sections 0-1, keeps sections 2-3) journal.prune(20).await.unwrap(); let bounds = journal.bounds().await; assert_eq!(bounds.start, 20); assert_eq!(bounds.end, 40); journal.sync().await.unwrap(); drop(journal); // === Phase 2: Simulate complete offsets partition loss === // Remove both the offsets data partition and its metadata partition context .remove(&format!("{}-blobs", cfg.offsets_partition()), None) .await .expect("Failed to remove offsets blobs partition"); context .remove(&format!("{}-metadata", cfg.offsets_partition()), None) .await .expect("Failed to remove offsets metadata partition"); // === Phase 3: Verify this is detected as unrecoverable === let result = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()).await; assert!(matches!(result, Err(Error::Corruption(_)))); }); } /// Test that init aligns state when data is pruned/lost but offsets survives. /// /// This handles both: /// 1. Crash during prune-all (data pruned, offsets not yet) /// 2. External data partition loss /// /// In both cases, we align by pruning offsets to match. #[test_traced] fn test_variable_align_data_offsets_mismatch() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "data-loss-test".into(), items_per_section: NZU64!(10), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), write_buffer: NZUsize!(1024), }; // === Setup: Create journal with data === let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone()) .await .unwrap(); // Append 20 items across 2 sections for i in 0..20u64 { variable.append(&(i * 100)).await.unwrap(); } variable.sync().await.unwrap(); drop(variable); // === Simulate data loss: Delete data partition but keep offsets === context .remove(&cfg.data_partition(), None) .await .expect("Failed to remove data partition"); // === Verify init aligns the mismatch === let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()) .await .expect("Should align offsets to match empty data"); // Size should be preserved assert_eq!(journal.size().await, 20); // But no items remain (both journals pruned) assert!(journal.bounds().await.is_empty()); // All reads should fail with ItemPruned for i in 0..20 { assert!(matches!( journal.read(i).await, Err(crate::journal::Error::ItemPruned(_)) )); } // Can append new data starting at position 20 let pos = journal.append(&999).await.unwrap(); assert_eq!(pos, 20); assert_eq!(journal.read(20).await.unwrap(), 999); journal.destroy().await.unwrap(); }); } /// Test replay behavior for variable-length items. #[test_traced] fn test_variable_replay() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "replay".into(), items_per_section: NZU64!(10), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), write_buffer: NZUsize!(1024), }; // Initialize journal let journal = Journal::<_, u64>::init(context, cfg).await.unwrap(); // Append 40 items across 4 sections (0-3) for i in 0..40u64 { journal.append(&(i * 100)).await.unwrap(); } // Test 1: Full replay { let reader = journal.reader().await; let stream = reader.replay(NZUsize!(20), 0).await.unwrap(); futures::pin_mut!(stream); for i in 0..40u64 { let (pos, item) = stream.next().await.unwrap().unwrap(); assert_eq!(pos, i); assert_eq!(item, i * 100); } assert!(stream.next().await.is_none()); } // Test 2: Partial replay from middle of section { let reader = journal.reader().await; let stream = reader.replay(NZUsize!(20), 15).await.unwrap(); futures::pin_mut!(stream); for i in 15..40u64 { let (pos, item) = stream.next().await.unwrap().unwrap(); assert_eq!(pos, i); assert_eq!(item, i * 100); } assert!(stream.next().await.is_none()); } // Test 3: Partial replay from section boundary { let reader = journal.reader().await; let stream = reader.replay(NZUsize!(20), 20).await.unwrap(); futures::pin_mut!(stream); for i in 20..40u64 { let (pos, item) = stream.next().await.unwrap().unwrap(); assert_eq!(pos, i); assert_eq!(item, i * 100); } assert!(stream.next().await.is_none()); } // Test 4: Prune and verify replay from pruned journal.prune(20).await.unwrap(); { let reader = journal.reader().await; let res = reader.replay(NZUsize!(20), 0).await; assert!(matches!(res, Err(crate::journal::Error::ItemPruned(_)))); } { let reader = journal.reader().await; let res = reader.replay(NZUsize!(20), 19).await; assert!(matches!(res, Err(crate::journal::Error::ItemPruned(_)))); } // Test 5: Replay from exactly at pruning boundary after prune { let reader = journal.reader().await; let stream = reader.replay(NZUsize!(20), 20).await.unwrap(); futures::pin_mut!(stream); for i in 20..40u64 { let (pos, item) = stream.next().await.unwrap().unwrap(); assert_eq!(pos, i); assert_eq!(item, i * 100); } assert!(stream.next().await.is_none()); } // Test 6: Replay from the end { let reader = journal.reader().await; let stream = reader.replay(NZUsize!(20), 40).await.unwrap(); futures::pin_mut!(stream); assert!(stream.next().await.is_none()); } // Test 7: Replay beyond the end (should error) { let reader = journal.reader().await; let res = reader.replay(NZUsize!(20), 41).await; assert!(matches!( res, Err(crate::journal::Error::ItemOutOfRange(41)) )); } journal.destroy().await.unwrap(); }); } #[test_traced] fn test_variable_contiguous() { let executor = deterministic::Runner::default(); executor.start(|context| async move { run_contiguous_tests(move |test_name: String, idx: usize| { let label = test_name.replace('-', "_"); let context = context.with_label(&format!("{label}_{idx}")); async move { let cfg = Config { partition: format!("generic-test-{test_name}"), items_per_section: NZU64!(10), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), write_buffer: NZUsize!(1024), }; Journal::<_, u64>::init(context, cfg).await } .boxed() }) .await; }); } /// Test multiple sequential prunes with Variable-specific guarantees. #[test_traced] fn test_variable_multiple_sequential_prunes() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "sequential-prunes".into(), items_per_section: NZU64!(10), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), write_buffer: NZUsize!(1024), }; let journal = Journal::<_, u64>::init(context, cfg).await.unwrap(); // Append items across 4 sections: [0-9], [10-19], [20-29], [30-39] for i in 0..40u64 { journal.append(&(i * 100)).await.unwrap(); } // Initial state: all items accessible let bounds = journal.bounds().await; assert_eq!(bounds.start, 0); assert_eq!(bounds.end, 40); // First prune: remove section 0 (positions 0-9) let pruned = journal.prune(10).await.unwrap(); assert!(pruned); // Variable-specific guarantee: oldest is EXACTLY at section boundary assert_eq!(journal.bounds().await.start, 10); // Items 0-9 should be pruned, 10+ should be accessible assert!(matches!( journal.read(0).await, Err(crate::journal::Error::ItemPruned(_)) )); assert_eq!(journal.read(10).await.unwrap(), 1000); assert_eq!(journal.read(19).await.unwrap(), 1900); // Second prune: remove section 1 (positions 10-19) let pruned = journal.prune(20).await.unwrap(); assert!(pruned); // Variable-specific guarantee: oldest is EXACTLY at section boundary assert_eq!(journal.bounds().await.start, 20); // Items 0-19 should be pruned, 20+ should be accessible assert!(matches!( journal.read(10).await, Err(crate::journal::Error::ItemPruned(_)) )); assert!(matches!( journal.read(19).await, Err(crate::journal::Error::ItemPruned(_)) )); assert_eq!(journal.read(20).await.unwrap(), 2000); assert_eq!(journal.read(29).await.unwrap(), 2900); // Third prune: remove section 2 (positions 20-29) let pruned = journal.prune(30).await.unwrap(); assert!(pruned); // Variable-specific guarantee: oldest is EXACTLY at section boundary assert_eq!(journal.bounds().await.start, 30); // Items 0-29 should be pruned, 30+ should be accessible assert!(matches!( journal.read(20).await, Err(crate::journal::Error::ItemPruned(_)) )); assert!(matches!( journal.read(29).await, Err(crate::journal::Error::ItemPruned(_)) )); assert_eq!(journal.read(30).await.unwrap(), 3000); assert_eq!(journal.read(39).await.unwrap(), 3900); // Size should still be 40 (pruning doesn't affect size) assert_eq!(journal.size().await, 40); journal.destroy().await.unwrap(); }); } /// Test that pruning all data and re-initializing preserves positions. #[test_traced] fn test_variable_prune_all_then_reinit() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "prune-all-reinit".into(), items_per_section: NZU64!(10), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), write_buffer: NZUsize!(1024), }; // === Phase 1: Create journal and append data === let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone()) .await .unwrap(); for i in 0..100u64 { journal.append(&(i * 100)).await.unwrap(); } let bounds = journal.bounds().await; assert_eq!(bounds.end, 100); assert_eq!(bounds.start, 0); // === Phase 2: Prune all data === let pruned = journal.prune(100).await.unwrap(); assert!(pruned); // All data is pruned - no items remain let bounds = journal.bounds().await; assert_eq!(bounds.end, 100); assert!(bounds.is_empty()); // All reads should fail with ItemPruned for i in 0..100 { assert!(matches!( journal.read(i).await, Err(crate::journal::Error::ItemPruned(_)) )); } journal.sync().await.unwrap(); drop(journal); // === Phase 3: Re-init and verify position preserved === let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()) .await .unwrap(); // Size should be preserved, but no items remain let bounds = journal.bounds().await; assert_eq!(bounds.end, 100); assert!(bounds.is_empty()); // All reads should still fail for i in 0..100 { assert!(matches!( journal.read(i).await, Err(crate::journal::Error::ItemPruned(_)) )); } // === Phase 4: Append new data === // Next append should get position 100 journal.append(&10000).await.unwrap(); let bounds = journal.bounds().await; assert_eq!(bounds.end, 101); // Now we have one item at position 100 assert_eq!(bounds.start, 100); // Can read the new item assert_eq!(journal.read(100).await.unwrap(), 10000); // Old positions still fail assert!(matches!( journal.read(99).await, Err(crate::journal::Error::ItemPruned(_)) )); journal.destroy().await.unwrap(); }); } /// Test recovery from crash after data journal pruned but before offsets journal. #[test_traced] fn test_variable_recovery_prune_crash_offsets_behind() { let executor = deterministic::Runner::default(); executor.start(|context| async move { // === Setup: Create Variable wrapper with data === let cfg = Config { partition: "recovery-prune-crash".into(), items_per_section: NZU64!(10), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), write_buffer: NZUsize!(1024), }; let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone()) .await .unwrap(); // Append 40 items across 4 sections to both journals for i in 0..40u64 { variable.append(&(i * 100)).await.unwrap(); } // Prune to position 10 normally (both data and offsets journals pruned) variable.prune(10).await.unwrap(); assert_eq!(variable.bounds().await.start, 10); // === Simulate crash: Prune data journal but not offsets journal === // Manually prune data journal to section 2 (position 20) variable.test_prune_data(2).await.unwrap(); // Offsets journal still has data from position 10-19 variable.sync().await.unwrap(); drop(variable); // === Verify recovery === let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()) .await .unwrap(); // Init should auto-repair: offsets journal pruned to match data journal let bounds = variable.bounds().await; assert_eq!(bounds.start, 20); assert_eq!(bounds.end, 40); // Reads before position 20 should fail (pruned from both journals) assert!(matches!( variable.read(10).await, Err(crate::journal::Error::ItemPruned(_)) )); // Reads at position 20+ should succeed assert_eq!(variable.read(20).await.unwrap(), 2000); assert_eq!(variable.read(39).await.unwrap(), 3900); variable.destroy().await.unwrap(); }); } /// Test recovery detects corruption when offsets journal pruned ahead of data journal. /// /// Simulates an impossible state (offsets journal pruned more than data journal) which /// should never happen due to write ordering. Verifies that init() returns corruption error. #[test_traced] fn test_variable_recovery_offsets_ahead_corruption() { let executor = deterministic::Runner::default(); executor.start(|context| async move { // === Setup: Create Variable wrapper with data === let cfg = Config { partition: "recovery-offsets-ahead".into(), items_per_section: NZU64!(10), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), write_buffer: NZUsize!(1024), }; let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone()) .await .unwrap(); // Append 40 items across 4 sections to both journals for i in 0..40u64 { variable.append(&(i * 100)).await.unwrap(); } // Prune offsets journal ahead of data journal (impossible state) variable.test_prune_offsets(20).await.unwrap(); // Prune to position 20 variable.test_prune_data(1).await.unwrap(); // Only prune data journal to section 1 (position 10) variable.sync().await.unwrap(); drop(variable); // === Verify corruption detected === let result = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()).await; assert!(matches!(result, Err(Error::Corruption(_)))); }); } /// Test recovery from crash after appending to data journal but before appending to offsets journal. #[test_traced] fn test_variable_recovery_append_crash_offsets_behind() { let executor = deterministic::Runner::default(); executor.start(|context| async move { // === Setup: Create Variable wrapper with partial data === let cfg = Config { partition: "recovery-append-crash".into(), items_per_section: NZU64!(10), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), write_buffer: NZUsize!(1024), }; let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone()) .await .unwrap(); // Append 15 items to both journals (fills section 0, partial section 1) for i in 0..15u64 { variable.append(&(i * 100)).await.unwrap(); } assert_eq!(variable.size().await, 15); // Manually append 5 more items directly to data journal only for i in 15..20u64 { variable.test_append_data(1, i * 100).await.unwrap(); } // Offsets journal still has only 15 entries variable.sync().await.unwrap(); drop(variable); // === Verify recovery === let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()) .await .unwrap(); // Init should rebuild offsets journal from data journal replay let bounds = variable.bounds().await; assert_eq!(bounds.end, 20); assert_eq!(bounds.start, 0); // All items should be readable from both journals for i in 0..20u64 { assert_eq!(variable.read(i).await.unwrap(), i * 100); } // Offsets journal should be fully rebuilt to match data journal assert_eq!(variable.test_offsets_size().await, 20); variable.destroy().await.unwrap(); }); } /// Test recovery from multiple prune operations with crash. #[test_traced] fn test_variable_recovery_multiple_prunes_crash() { let executor = deterministic::Runner::default(); executor.start(|context| async move { // === Setup: Create Variable wrapper with data === let cfg = Config { partition: "recovery-multiple-prunes".into(), items_per_section: NZU64!(10), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), write_buffer: NZUsize!(1024), }; let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone()) .await .unwrap(); // Append 50 items across 5 sections to both journals for i in 0..50u64 { variable.append(&(i * 100)).await.unwrap(); } // Prune to position 10 normally (both data and offsets journals pruned) variable.prune(10).await.unwrap(); assert_eq!(variable.bounds().await.start, 10); // === Simulate crash: Multiple prunes on data journal, not on offsets journal === // Manually prune data journal to section 3 (position 30) variable.test_prune_data(3).await.unwrap(); // Offsets journal still thinks oldest is position 10 variable.sync().await.unwrap(); drop(variable); // === Verify recovery === let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()) .await .unwrap(); // Init should auto-repair: offsets journal pruned to match data journal let bounds = variable.bounds().await; assert_eq!(bounds.start, 30); assert_eq!(bounds.end, 50); // Reads before position 30 should fail (pruned from both journals) assert!(matches!( variable.read(10).await, Err(crate::journal::Error::ItemPruned(_)) )); assert!(matches!( variable.read(20).await, Err(crate::journal::Error::ItemPruned(_)) )); // Reads at position 30+ should succeed assert_eq!(variable.read(30).await.unwrap(), 3000); assert_eq!(variable.read(49).await.unwrap(), 4900); variable.destroy().await.unwrap(); }); } /// Test recovery from crash during rewind operation. /// /// Simulates a crash after offsets.rewind() completes but before data.rewind() completes. /// This creates a situation where offsets journal has been rewound but data journal still /// contains items across multiple sections. Verifies that init() correctly rebuilds the /// offsets index across all sections to match the data journal. #[test_traced] fn test_variable_recovery_rewind_crash_multi_section() { let executor = deterministic::Runner::default(); executor.start(|context| async move { // === Setup: Create Variable wrapper with data across multiple sections === let cfg = Config { partition: "recovery-rewind-crash".into(), items_per_section: NZU64!(10), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), write_buffer: NZUsize!(1024), }; let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone()) .await .unwrap(); // Append 25 items across 3 sections (section 0: 0-9, section 1: 10-19, section 2: 20-24) for i in 0..25u64 { variable.append(&(i * 100)).await.unwrap(); } assert_eq!(variable.size().await, 25); // === Simulate crash during rewind(5) === // Rewind offsets journal to size 5 (keeps positions 0-4) variable.test_rewind_offsets(5).await.unwrap(); // CRASH before data.rewind() completes - data still has all 3 sections variable.sync().await.unwrap(); drop(variable); // === Verify recovery === let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()) .await .unwrap(); // Init should rebuild offsets[5-24] from data journal across all 3 sections let bounds = variable.bounds().await; assert_eq!(bounds.end, 25); assert_eq!(bounds.start, 0); // All items should be readable - offsets rebuilt correctly across all sections for i in 0..25u64 { assert_eq!(variable.read(i).await.unwrap(), i * 100); } // Verify offsets journal fully rebuilt assert_eq!(variable.test_offsets_size().await, 25); // Verify next append gets position 25 let pos = variable.append(&2500).await.unwrap(); assert_eq!(pos, 25); assert_eq!(variable.read(25).await.unwrap(), 2500); variable.destroy().await.unwrap(); }); } /// Test recovery from crash after data sync but before offsets sync when journal was /// previously emptied by pruning. #[test_traced] fn test_variable_recovery_empty_offsets_after_prune_and_append() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "recovery-empty-after-prune".into(), items_per_section: NZU64!(10), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), write_buffer: NZUsize!(1024), }; // === Phase 1: Create journal with one full section === let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone()) .await .unwrap(); // Append 10 items (positions 0-9), fills section 0 for i in 0..10u64 { journal.append(&(i * 100)).await.unwrap(); } let bounds = journal.bounds().await; assert_eq!(bounds.end, 10); assert_eq!(bounds.start, 0); // === Phase 2: Prune to create empty journal === journal.prune(10).await.unwrap(); let bounds = journal.bounds().await; assert_eq!(bounds.end, 10); assert!(bounds.is_empty()); // Empty! // === Phase 3: Append directly to data journal to simulate crash === // Manually append to data journal only (bypassing Variable's append logic) // This simulates the case where data was synced but offsets wasn't for i in 10..20u64 { journal.test_append_data(1, i * 100).await.unwrap(); } // Sync the data journal (section 1) journal.test_sync_data().await.unwrap(); // Do NOT sync offsets journal - simulates crash before offsets.sync() // Close without syncing offsets drop(journal); // === Phase 4: Verify recovery succeeds === let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()) .await .expect("Should recover from crash after data sync but before offsets sync"); // All data should be recovered let bounds = journal.bounds().await; assert_eq!(bounds.end, 20); assert_eq!(bounds.start, 10); // All items from position 10-19 should be readable for i in 10..20u64 { assert_eq!(journal.read(i).await.unwrap(), i * 100); } // Items 0-9 should be pruned for i in 0..10 { assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_)))); } journal.destroy().await.unwrap(); }); } /// Test that offsets index is rebuilt from data after sync writes data but not offsets. #[test_traced] fn test_variable_concurrent_sync_recovery() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "concurrent-sync-recovery".into(), items_per_section: NZU64!(10), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), write_buffer: NZUsize!(1024), }; let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone()) .await .unwrap(); // Append items across a section boundary for i in 0..15u64 { journal.append(&(i * 100)).await.unwrap(); } // Manually sync only data to simulate crash during concurrent sync journal.commit().await.unwrap(); // Simulate a crash (offsets not synced) drop(journal); let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()) .await .unwrap(); // Data should be intact and offsets rebuilt assert_eq!(journal.size().await, 15); for i in 0..15u64 { assert_eq!(journal.read(i).await.unwrap(), i * 100); } journal.destroy().await.unwrap(); }); } #[test_traced] fn test_init_at_size_zero() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "init-at-size-zero".into(), items_per_section: NZU64!(5), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)), write_buffer: NZUsize!(1024), }; let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 0) .await .unwrap(); // Size should be 0 assert_eq!(journal.size().await, 0); // No oldest retained position (empty journal) assert!(journal.bounds().await.is_empty()); // Next append should get position 0 let pos = journal.append(&100).await.unwrap(); assert_eq!(pos, 0); assert_eq!(journal.size().await, 1); assert_eq!(journal.read(0).await.unwrap(), 100); journal.destroy().await.unwrap(); }); } #[test_traced] fn test_init_at_size_section_boundary() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "init-at-size-boundary".into(), items_per_section: NZU64!(5), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)), write_buffer: NZUsize!(1024), }; // Initialize at position 10 (exactly at section 1 boundary with items_per_section=5) let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 10) .await .unwrap(); // Size should be 10 let bounds = journal.bounds().await; assert_eq!(bounds.end, 10); // No data yet, so no oldest retained position assert!(bounds.is_empty()); // Next append should get position 10 let pos = journal.append(&1000).await.unwrap(); assert_eq!(pos, 10); assert_eq!(journal.size().await, 11); assert_eq!(journal.read(10).await.unwrap(), 1000); // Can continue appending let pos = journal.append(&1001).await.unwrap(); assert_eq!(pos, 11); assert_eq!(journal.read(11).await.unwrap(), 1001); journal.destroy().await.unwrap(); }); } #[test_traced] fn test_init_at_size_mid_section() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "init-at-size-mid".into(), items_per_section: NZU64!(5), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)), write_buffer: NZUsize!(1024), }; // Initialize at position 7 (middle of section 1 with items_per_section=5) let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 7) .await .unwrap(); // Size should be 7 let bounds = journal.bounds().await; assert_eq!(bounds.end, 7); // No data yet, so no oldest retained position assert!(bounds.is_empty()); // Next append should get position 7 let pos = journal.append(&700).await.unwrap(); assert_eq!(pos, 7); assert_eq!(journal.size().await, 8); assert_eq!(journal.read(7).await.unwrap(), 700); journal.destroy().await.unwrap(); }); } #[test_traced] fn test_init_at_size_persistence() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "init-at-size-persist".into(), items_per_section: NZU64!(5), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)), write_buffer: NZUsize!(1024), }; // Initialize at position 15 let journal = Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 15) .await .unwrap(); // Append some items for i in 0..5u64 { let pos = journal.append(&(1500 + i)).await.unwrap(); assert_eq!(pos, 15 + i); } assert_eq!(journal.size().await, 20); // Sync and reopen journal.sync().await.unwrap(); drop(journal); let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()) .await .unwrap(); // Size and data should be preserved let bounds = journal.bounds().await; assert_eq!(bounds.end, 20); assert_eq!(bounds.start, 15); // Verify data for i in 0..5u64 { assert_eq!(journal.read(15 + i).await.unwrap(), 1500 + i); } // Can continue appending let pos = journal.append(&9999).await.unwrap(); assert_eq!(pos, 20); assert_eq!(journal.read(20).await.unwrap(), 9999); journal.destroy().await.unwrap(); }); } #[test_traced] fn test_init_at_size_persistence_without_data() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "init-at-size-persist-empty".into(), items_per_section: NZU64!(5), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)), write_buffer: NZUsize!(1024), }; // Initialize at position 15 let journal = Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 15) .await .unwrap(); let bounds = journal.bounds().await; assert_eq!(bounds.end, 15); assert!(bounds.is_empty()); // Drop without writing any data drop(journal); // Reopen and verify size persisted let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()) .await .unwrap(); let bounds = journal.bounds().await; assert_eq!(bounds.end, 15); assert!(bounds.is_empty()); // Can append starting at position 15 let pos = journal.append(&1500).await.unwrap(); assert_eq!(pos, 15); assert_eq!(journal.read(15).await.unwrap(), 1500); journal.destroy().await.unwrap(); }); } /// Test init_at_size with mid-section value persists correctly across restart. #[test_traced] fn test_init_at_size_mid_section_persistence() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "init-at-size-mid-section".into(), items_per_section: NZU64!(5), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)), write_buffer: NZUsize!(1024), }; // Initialize at position 7 (mid-section, 7 % 5 = 2) let journal = Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7) .await .unwrap(); // Append 3 items at positions 7, 8, 9 (fills rest of section 1) for i in 0..3u64 { let pos = journal.append(&(700 + i)).await.unwrap(); assert_eq!(pos, 7 + i); } let bounds = journal.bounds().await; assert_eq!(bounds.end, 10); assert_eq!(bounds.start, 7); // Sync and reopen journal.sync().await.unwrap(); drop(journal); // Reopen let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()) .await .unwrap(); // Size and bounds.start should be preserved correctly let bounds = journal.bounds().await; assert_eq!(bounds.end, 10); assert_eq!(bounds.start, 7); // Verify data for i in 0..3u64 { assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i); } // Positions before 7 should be pruned assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6)))); journal.destroy().await.unwrap(); }); } /// Test init_at_size mid-section with data spanning multiple sections. #[test_traced] fn test_init_at_size_mid_section_multi_section_persistence() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "init-at-size-multi-section".into(), items_per_section: NZU64!(5), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)), write_buffer: NZUsize!(1024), }; // Initialize at position 7 (mid-section) let journal = Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7) .await .unwrap(); // Append 8 items: positions 7-14 (section 1: 3 items, section 2: 5 items) for i in 0..8u64 { let pos = journal.append(&(700 + i)).await.unwrap(); assert_eq!(pos, 7 + i); } let bounds = journal.bounds().await; assert_eq!(bounds.end, 15); assert_eq!(bounds.start, 7); // Sync and reopen journal.sync().await.unwrap(); drop(journal); // Reopen let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()) .await .unwrap(); // Verify state preserved let bounds = journal.bounds().await; assert_eq!(bounds.end, 15); assert_eq!(bounds.start, 7); // Verify all data for i in 0..8u64 { assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i); } journal.destroy().await.unwrap(); }); } /// Regression test: data-empty crash repair must preserve mid-section pruning boundary. #[test_traced] fn test_align_journals_data_empty_mid_section_pruning_boundary() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "align-journals-mid-section-pruning-boundary".into(), items_per_section: NZU64!(5), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)), write_buffer: NZUsize!(1024), }; // Phase 1: Create data and offsets, then simulate data-only pruning crash. let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone()) .await .unwrap(); for i in 0..7u64 { journal.append(&(100 + i)).await.unwrap(); } journal.sync().await.unwrap(); // Simulate crash after data was cleared but before offsets were pruned. journal.inner.write().await.data.clear().await.unwrap(); drop(journal); // Phase 2: Init triggers data-empty repair and should treat journal as fully pruned at size 7. let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()) .await .unwrap(); let bounds = journal.bounds().await; assert_eq!(bounds.end, 7); assert!(bounds.is_empty()); // Append one item at position 7. let pos = journal.append(&777).await.unwrap(); assert_eq!(pos, 7); assert_eq!(journal.size().await, 8); assert_eq!(journal.read(7).await.unwrap(), 777); // Sync only the data journal to simulate a crash before offsets are synced. let section = 7 / cfg.items_per_section.get(); journal .inner .write() .await .data .sync(section) .await .unwrap(); drop(journal); // Phase 3: Reopen and verify we did not lose the appended item. let journal = Journal::<_, u64>::init(context.with_label("third"), cfg.clone()) .await .unwrap(); let bounds = journal.bounds().await; assert_eq!(bounds.end, 8); assert_eq!(bounds.start, 7); assert_eq!(journal.read(7).await.unwrap(), 777); journal.destroy().await.unwrap(); }); } /// Test crash recovery: init_at_size + append + crash with data synced but offsets not. #[test_traced] fn test_init_at_size_crash_data_synced_offsets_not() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "init-at-size-crash-recovery".into(), items_per_section: NZU64!(5), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)), write_buffer: NZUsize!(1024), }; // Initialize at position 7 (mid-section) let journal = Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7) .await .unwrap(); // Append 3 items for i in 0..3u64 { journal.append(&(700 + i)).await.unwrap(); } // Sync only the data journal, not offsets (simulate crash) journal.inner.write().await.data.sync(1).await.unwrap(); // Don't sync offsets - simulates crash after data write but before offsets write drop(journal); // Reopen - should recover by rebuilding offsets from data let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()) .await .unwrap(); // Verify recovery let bounds = journal.bounds().await; assert_eq!(bounds.end, 10); assert_eq!(bounds.start, 7); // Verify data is accessible for i in 0..3u64 { assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i); } journal.destroy().await.unwrap(); }); } #[test_traced] fn test_prune_does_not_move_oldest_retained_backwards() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "prune-no-backwards".into(), items_per_section: NZU64!(5), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)), write_buffer: NZUsize!(1024), }; let journal = Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7) .await .unwrap(); // Append a few items at positions 7..9 for i in 0..3u64 { let pos = journal.append(&(700 + i)).await.unwrap(); assert_eq!(pos, 7 + i); } assert_eq!(journal.bounds().await.start, 7); // Prune to a position within the same section should not move bounds.start backwards. journal.prune(8).await.unwrap(); assert_eq!(journal.bounds().await.start, 7); assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6)))); assert_eq!(journal.read(7).await.unwrap(), 700); journal.destroy().await.unwrap(); }); } #[test_traced] fn test_init_at_size_large_offset() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "init-at-size-large".into(), items_per_section: NZU64!(5), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)), write_buffer: NZUsize!(1024), }; // Initialize at a large position (position 1000) let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 1000) .await .unwrap(); let bounds = journal.bounds().await; assert_eq!(bounds.end, 1000); // No data yet, so no oldest retained position assert!(bounds.is_empty()); // Next append should get position 1000 let pos = journal.append(&100000).await.unwrap(); assert_eq!(pos, 1000); assert_eq!(journal.read(1000).await.unwrap(), 100000); journal.destroy().await.unwrap(); }); } #[test_traced] fn test_init_at_size_prune_and_append() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "init-at-size-prune".into(), items_per_section: NZU64!(5), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)), write_buffer: NZUsize!(1024), }; // Initialize at position 20 let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 20) .await .unwrap(); // Append items 20-29 for i in 0..10u64 { journal.append(&(2000 + i)).await.unwrap(); } assert_eq!(journal.size().await, 30); // Prune to position 25 journal.prune(25).await.unwrap(); let bounds = journal.bounds().await; assert_eq!(bounds.end, 30); assert_eq!(bounds.start, 25); // Verify remaining items are readable for i in 25..30u64 { assert_eq!(journal.read(i).await.unwrap(), 2000 + (i - 20)); } // Continue appending let pos = journal.append(&3000).await.unwrap(); assert_eq!(pos, 30); journal.destroy().await.unwrap(); }); } /// Test `init_sync` when there is no existing data on disk. #[test_traced] fn test_init_sync_no_existing_data() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "test-fresh-start".into(), items_per_section: NZU64!(5), compression: None, codec_config: (), write_buffer: NZUsize!(1024), page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)), }; // Initialize journal with sync boundaries when no existing data exists let lower_bound = 10; let upper_bound = 26; let journal = Journal::init_sync(context.clone(), cfg.clone(), lower_bound..upper_bound) .await .expect("Failed to initialize journal with sync boundaries"); let bounds = journal.bounds().await; assert_eq!(bounds.end, lower_bound); assert!(bounds.is_empty()); // Append items using the contiguous API let pos1 = journal.append(&42u64).await.unwrap(); assert_eq!(pos1, lower_bound); assert_eq!(journal.read(pos1).await.unwrap(), 42u64); let pos2 = journal.append(&43u64).await.unwrap(); assert_eq!(pos2, lower_bound + 1); assert_eq!(journal.read(pos2).await.unwrap(), 43u64); journal.destroy().await.unwrap(); }); } /// Test `init_sync` when there is existing data that overlaps with the sync target range. #[test_traced] fn test_init_sync_existing_data_overlap() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "test-overlap".into(), items_per_section: NZU64!(5), compression: None, codec_config: (), write_buffer: NZUsize!(1024), page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)), }; // Create initial journal with data in multiple sections let journal = Journal::::init(context.clone(), cfg.clone()) .await .expect("Failed to create initial journal"); // Add data at positions 0-19 (sections 0-3 with items_per_section=5) for i in 0..20u64 { journal.append(&(i * 100)).await.unwrap(); } journal.sync().await.unwrap(); drop(journal); // Initialize with sync boundaries that overlap with existing data // lower_bound: 8 (section 1), upper_bound: 31 (last location 30, section 6) let lower_bound = 8; let upper_bound = 31; let journal = Journal::::init_sync( context.clone(), cfg.clone(), lower_bound..upper_bound, ) .await .expect("Failed to initialize journal with overlap"); assert_eq!(journal.size().await, 20); // Verify oldest retained is pruned to lower_bound's section boundary (5) assert_eq!(journal.bounds().await.start, 5); // Section 1 starts at position 5 // Verify data integrity: positions before 5 are pruned assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_)))); assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_)))); // Positions 5-19 should be accessible assert_eq!(journal.read(5).await.unwrap(), 500); assert_eq!(journal.read(8).await.unwrap(), 800); assert_eq!(journal.read(19).await.unwrap(), 1900); // Position 20+ should not exist yet assert!(matches!( journal.read(20).await, Err(Error::ItemOutOfRange(_)) )); // Assert journal can accept new items let pos = journal.append(&999).await.unwrap(); assert_eq!(pos, 20); assert_eq!(journal.read(20).await.unwrap(), 999); journal.destroy().await.unwrap(); }); } /// Test `init_sync` with invalid parameters. #[should_panic] #[test_traced] fn test_init_sync_invalid_parameters() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "test-invalid".into(), items_per_section: NZU64!(5), compression: None, codec_config: (), write_buffer: NZUsize!(1024), page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)), }; #[allow(clippy::reversed_empty_ranges)] let _result = Journal::::init_sync( context.clone(), cfg, 10..5, // invalid range: lower > upper ) .await; }); } /// Test `init_sync` when existing data exactly matches the sync range. #[test_traced] fn test_init_sync_existing_data_exact_match() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let items_per_section = NZU64!(5); let cfg = Config { partition: "test-exact-match".into(), items_per_section, compression: None, codec_config: (), write_buffer: NZUsize!(1024), page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)), }; // Create initial journal with data exactly matching sync range let journal = Journal::::init(context.clone(), cfg.clone()) .await .expect("Failed to create initial journal"); // Add data at positions 0-19 (sections 0-3 with items_per_section=5) for i in 0..20u64 { journal.append(&(i * 100)).await.unwrap(); } journal.sync().await.unwrap(); drop(journal); // Initialize with sync boundaries that exactly match existing data let lower_bound = 5; // section 1 let upper_bound = 20; // section 3 let journal = Journal::::init_sync( context.clone(), cfg.clone(), lower_bound..upper_bound, ) .await .expect("Failed to initialize journal with exact match"); assert_eq!(journal.size().await, 20); // Verify pruning to lower bound (section 1 boundary = position 5) assert_eq!(journal.bounds().await.start, 5); // Section 1 starts at position 5 // Verify positions before 5 are pruned assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_)))); assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_)))); // Positions 5-19 should be accessible assert_eq!(journal.read(5).await.unwrap(), 500); assert_eq!(journal.read(10).await.unwrap(), 1000); assert_eq!(journal.read(19).await.unwrap(), 1900); // Position 20+ should not exist yet assert!(matches!( journal.read(20).await, Err(Error::ItemOutOfRange(_)) )); // Assert journal can accept new operations let pos = journal.append(&999).await.unwrap(); assert_eq!(pos, 20); assert_eq!(journal.read(20).await.unwrap(), 999); journal.destroy().await.unwrap(); }); } /// Test `init_sync` when existing data exceeds the sync target range. /// This tests that UnexpectedData error is returned when existing data goes beyond the upper bound. #[test_traced] fn test_init_sync_existing_data_exceeds_upper_bound() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let items_per_section = NZU64!(5); let cfg = Config { partition: "test-unexpected-data".into(), items_per_section, compression: None, codec_config: (), write_buffer: NZUsize!(1024), page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)), }; // Create initial journal with data beyond sync range let journal = Journal::::init( context.with_label("initial"), cfg.clone(), ) .await .expect("Failed to create initial journal"); // Add data at positions 0-29 (sections 0-5 with items_per_section=5) for i in 0..30u64 { journal.append(&(i * 1000)).await.unwrap(); } journal.sync().await.unwrap(); drop(journal); // Initialize with sync boundaries that are exceeded by existing data let lower_bound = 8; // section 1 for (i, upper_bound) in (9..29).enumerate() { let result = Journal::::init_sync( context.with_label(&format!("sync_{i}")), cfg.clone(), lower_bound..upper_bound, ) .await; // Should return ItemOutOfRange error since data exists beyond upper_bound assert!(matches!(result, Err(Error::ItemOutOfRange(_)))); } }); } /// Test `init_sync` when all existing data is stale (before lower bound). #[test_traced] fn test_init_sync_existing_data_stale() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let items_per_section = NZU64!(5); let cfg = Config { partition: "test-stale".into(), items_per_section, compression: None, codec_config: (), write_buffer: NZUsize!(1024), page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)), }; // Create initial journal with stale data let journal = Journal::::init( context.with_label("first"), cfg.clone(), ) .await .expect("Failed to create initial journal"); // Add data at positions 0-9 (sections 0-1 with items_per_section=5) for i in 0..10u64 { journal.append(&(i * 100)).await.unwrap(); } journal.sync().await.unwrap(); drop(journal); // Initialize with sync boundaries beyond all existing data let lower_bound = 15; // section 3 let upper_bound = 26; // last element in section 5 let journal = Journal::::init_sync( context.with_label("second"), cfg.clone(), lower_bound..upper_bound, ) .await .expect("Failed to initialize journal with stale data"); assert_eq!(journal.size().await, 15); // Verify fresh journal (all old data destroyed, starts at position 15) assert!(journal.bounds().await.is_empty()); // Verify old positions don't exist assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_)))); assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_)))); assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_)))); journal.destroy().await.unwrap(); }); } /// Test `init_sync` with section boundary edge cases. #[test_traced] fn test_init_sync_section_boundaries() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let items_per_section = NZU64!(5); let cfg = Config { partition: "test-boundaries".into(), items_per_section, compression: None, codec_config: (), write_buffer: NZUsize!(1024), page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)), }; // Create journal with data at section boundaries let journal = Journal::::init(context.clone(), cfg.clone()) .await .expect("Failed to create initial journal"); // Add data at positions 0-24 (sections 0-4 with items_per_section=5) for i in 0..25u64 { journal.append(&(i * 100)).await.unwrap(); } journal.sync().await.unwrap(); drop(journal); // Test sync boundaries exactly at section boundaries let lower_bound = 15; // Exactly at section boundary (15/5 = 3) let upper_bound = 25; // Last element exactly at section boundary (24/5 = 4) let journal = Journal::::init_sync( context.clone(), cfg.clone(), lower_bound..upper_bound, ) .await .expect("Failed to initialize journal at boundaries"); assert_eq!(journal.size().await, 25); // Verify oldest retained is at section 3 boundary (position 15) assert_eq!(journal.bounds().await.start, 15); // Verify positions before 15 are pruned assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_)))); assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_)))); // Verify positions 15-24 are accessible assert_eq!(journal.read(15).await.unwrap(), 1500); assert_eq!(journal.read(20).await.unwrap(), 2000); assert_eq!(journal.read(24).await.unwrap(), 2400); // Position 25+ should not exist yet assert!(matches!( journal.read(25).await, Err(Error::ItemOutOfRange(_)) )); // Assert journal can accept new operations let pos = journal.append(&999).await.unwrap(); assert_eq!(pos, 25); assert_eq!(journal.read(25).await.unwrap(), 999); journal.destroy().await.unwrap(); }); } /// Test `init_sync` when range.start and range.end-1 are in the same section. #[test_traced] fn test_init_sync_same_section_bounds() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let items_per_section = NZU64!(5); let cfg = Config { partition: "test-same-section".into(), items_per_section, compression: None, codec_config: (), write_buffer: NZUsize!(1024), page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)), }; // Create journal with data in multiple sections let journal = Journal::::init(context.clone(), cfg.clone()) .await .expect("Failed to create initial journal"); // Add data at positions 0-14 (sections 0-2 with items_per_section=5) for i in 0..15u64 { journal.append(&(i * 100)).await.unwrap(); } journal.sync().await.unwrap(); drop(journal); // Test sync boundaries within the same section let lower_bound = 10; // operation 10 (section 2: 10/5 = 2) let upper_bound = 15; // Last operation 14 (section 2: 14/5 = 2) let journal = Journal::::init_sync( context.clone(), cfg.clone(), lower_bound..upper_bound, ) .await .expect("Failed to initialize journal with same-section bounds"); assert_eq!(journal.size().await, 15); // Both operations are in section 2, so sections 0, 1 should be pruned, section 2 retained // Oldest retained position should be at section 2 boundary (position 10) assert_eq!(journal.bounds().await.start, 10); // Verify positions before 10 are pruned assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_)))); assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_)))); // Verify positions 10-14 are accessible assert_eq!(journal.read(10).await.unwrap(), 1000); assert_eq!(journal.read(11).await.unwrap(), 1100); assert_eq!(journal.read(14).await.unwrap(), 1400); // Position 15+ should not exist yet assert!(matches!( journal.read(15).await, Err(Error::ItemOutOfRange(_)) )); // Assert journal can accept new operations let pos = journal.append(&999).await.unwrap(); assert_eq!(pos, 15); assert_eq!(journal.read(15).await.unwrap(), 999); journal.destroy().await.unwrap(); }); } /// Test contiguous variable journal with items_per_section=1. /// /// This is a regression test for a bug where reading from size()-1 fails /// when using items_per_section=1, particularly after pruning and restart. #[test_traced] fn test_single_item_per_section() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "single-item-per-section".into(), items_per_section: NZU64!(1), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), write_buffer: NZUsize!(1024), }; // === Test 1: Basic single item operation === let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone()) .await .unwrap(); // Verify empty state let bounds = journal.bounds().await; assert_eq!(bounds.end, 0); assert!(bounds.is_empty()); // Append 1 item (value = position * 100, so position 0 has value 0) let pos = journal.append(&0).await.unwrap(); assert_eq!(pos, 0); assert_eq!(journal.size().await, 1); // Sync journal.sync().await.unwrap(); // Read from size() - 1 let value = journal.read(journal.size().await - 1).await.unwrap(); assert_eq!(value, 0); // === Test 2: Multiple items with single item per section === for i in 1..10u64 { let pos = journal.append(&(i * 100)).await.unwrap(); assert_eq!(pos, i); assert_eq!(journal.size().await, i + 1); // Verify we can read the just-appended item at size() - 1 let value = journal.read(journal.size().await - 1).await.unwrap(); assert_eq!(value, i * 100); } // Verify all items can be read for i in 0..10u64 { assert_eq!(journal.read(i).await.unwrap(), i * 100); } journal.sync().await.unwrap(); // === Test 3: Pruning with single item per section === // Prune to position 5 (removes positions 0-4) let pruned = journal.prune(5).await.unwrap(); assert!(pruned); // Size should still be 10 assert_eq!(journal.size().await, 10); // bounds.start should be 5 assert_eq!(journal.bounds().await.start, 5); // Reading from bounds.end - 1 (position 9) should still work let value = journal.read(journal.size().await - 1).await.unwrap(); assert_eq!(value, 900); // Reading from pruned positions should return ItemPruned for i in 0..5 { assert!(matches!( journal.read(i).await, Err(crate::journal::Error::ItemPruned(_)) )); } // Reading from retained positions should work for i in 5..10u64 { assert_eq!(journal.read(i).await.unwrap(), i * 100); } // Append more items after pruning for i in 10..15u64 { let pos = journal.append(&(i * 100)).await.unwrap(); assert_eq!(pos, i); // Verify we can read from size() - 1 let value = journal.read(journal.size().await - 1).await.unwrap(); assert_eq!(value, i * 100); } journal.sync().await.unwrap(); drop(journal); // === Test 4: Restart persistence with single item per section === let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()) .await .unwrap(); // Verify size is preserved assert_eq!(journal.size().await, 15); // Verify bounds.start is preserved assert_eq!(journal.bounds().await.start, 5); // Reading from bounds.end - 1 should work after restart let value = journal.read(journal.size().await - 1).await.unwrap(); assert_eq!(value, 1400); // Reading all retained positions should work for i in 5..15u64 { assert_eq!(journal.read(i).await.unwrap(), i * 100); } journal.destroy().await.unwrap(); // === Test 5: Restart after pruning with non-zero index (KEY SCENARIO) === // Fresh journal for this test let journal = Journal::<_, u64>::init(context.with_label("third"), cfg.clone()) .await .unwrap(); // Append 10 items (positions 0-9) for i in 0..10u64 { journal.append(&(i * 1000)).await.unwrap(); } // Prune to position 5 (removes positions 0-4) journal.prune(5).await.unwrap(); let bounds = journal.bounds().await; assert_eq!(bounds.end, 10); assert_eq!(bounds.start, 5); // Sync and restart journal.sync().await.unwrap(); drop(journal); // Re-open journal let journal = Journal::<_, u64>::init(context.with_label("fourth"), cfg.clone()) .await .unwrap(); // Verify state after restart let bounds = journal.bounds().await; assert_eq!(bounds.end, 10); assert_eq!(bounds.start, 5); // KEY TEST: Reading from bounds.end - 1 (position 9) should work let value = journal.read(journal.size().await - 1).await.unwrap(); assert_eq!(value, 9000); // Verify all retained positions (5-9) work for i in 5..10u64 { assert_eq!(journal.read(i).await.unwrap(), i * 1000); } journal.destroy().await.unwrap(); // === Test 6: Prune all items (edge case) === // This tests the scenario where prune removes everything. // Callers must check bounds().is_empty() before reading. let journal = Journal::<_, u64>::init(context.with_label("fifth"), cfg.clone()) .await .unwrap(); for i in 0..5u64 { journal.append(&(i * 100)).await.unwrap(); } journal.sync().await.unwrap(); // Prune all items journal.prune(5).await.unwrap(); let bounds = journal.bounds().await; assert_eq!(bounds.end, 5); // Size unchanged assert!(bounds.is_empty()); // All pruned // bounds.end - 1 = 4, but position 4 is pruned let result = journal.read(journal.size().await - 1).await; assert!(matches!(result, Err(crate::journal::Error::ItemPruned(4)))); // After appending, reading works again journal.append(&500).await.unwrap(); let bounds = journal.bounds().await; assert_eq!(bounds.start, 5); assert_eq!(journal.read(bounds.end - 1).await.unwrap(), 500); journal.destroy().await.unwrap(); }); } #[test_traced] fn test_variable_journal_clear_to_size() { let executor = deterministic::Runner::default(); executor.start(|context| async move { let cfg = Config { partition: "clear-test".into(), items_per_section: NZU64!(10), compression: None, codec_config: (), page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), write_buffer: NZUsize!(1024), }; let journal = Journal::<_, u64>::init(context.with_label("journal"), cfg.clone()) .await .unwrap(); // Append 25 items (spanning multiple sections) for i in 0..25u64 { journal.append(&(i * 100)).await.unwrap(); } let bounds = journal.bounds().await; assert_eq!(bounds.end, 25); assert_eq!(bounds.start, 0); journal.sync().await.unwrap(); // Clear to position 100, effectively resetting the journal journal.clear_to_size(100).await.unwrap(); let bounds = journal.bounds().await; assert_eq!(bounds.end, 100); assert!(bounds.is_empty()); // Old positions should fail for i in 0..25 { assert!(matches!( journal.read(i).await, Err(crate::journal::Error::ItemPruned(_)) )); } // Verify size persists after restart without writing any data drop(journal); let journal = Journal::<_, u64>::init(context.with_label("journal_after_clear"), cfg.clone()) .await .unwrap(); let bounds = journal.bounds().await; assert_eq!(bounds.end, 100); assert!(bounds.is_empty()); // Append new data starting at position 100 for i in 100..105u64 { let pos = journal.append(&(i * 100)).await.unwrap(); assert_eq!(pos, i); } let bounds = journal.bounds().await; assert_eq!(bounds.end, 105); assert_eq!(bounds.start, 100); // New positions should be readable for i in 100..105u64 { assert_eq!(journal.read(i).await.unwrap(), i * 100); } // Sync and re-init to verify persistence journal.sync().await.unwrap(); drop(journal); let journal = Journal::<_, u64>::init(context.with_label("journal_reopened"), cfg) .await .unwrap(); let bounds = journal.bounds().await; assert_eq!(bounds.end, 105); assert_eq!(bounds.start, 100); for i in 100..105u64 { assert_eq!(journal.read(i).await.unwrap(), i * 100); } journal.destroy().await.unwrap(); }); } }