//! Contiguous journals with position-based access. //! //! This module provides position-based journal implementations where items are stored //! contiguously and can be accessed by their position (0-indexed). Both [fixed]-size and //! [variable]-size item journals are supported. use super::Error; use futures::Stream; use std::{future::Future, num::NonZeroUsize, ops::Range}; use tracing::warn; pub mod fixed; pub mod variable; #[cfg(test)] mod tests; /// A reader guard that holds a consistent view of the journal. /// /// While this guard exists, operations that may modify the bounds (such as `append`, `prune`, and /// `rewind`) will block until the guard is dropped. This keeps bounds stable, so any position /// within `bounds()` is guaranteed readable. // // TODO(): Relax locking to allow `append` // since it doesn't invalidate reads within the cached bounds. pub trait Reader: Send + Sync { /// The type of items stored in the journal. type Item; /// Returns [start, end) with a guaranteed stable pruning boundary. fn bounds(&self) -> Range; /// Read the item at the given position. /// /// Guaranteed not to return [Error::ItemPruned] for positions within `bounds()`. fn read(&self, position: u64) -> impl Future> + Send; /// Read an item if it can be done synchronously (e.g. without I/O), returning `None` otherwise. /// /// Default implementation always returns `None`. fn try_read_sync(&self, _position: u64) -> Option { None } /// Return a stream of all items starting from `start_pos`. /// /// Because the reader holds the lock, validation and stream setup happen /// atomically with respect to `prune()`. fn replay( &self, buffer: NonZeroUsize, start_pos: u64, ) -> impl Future< Output = Result> + Send, Error>, > + Send; } /// Journals that support sequential append operations. /// /// Maintains a monotonically increasing position counter where each appended item receives a unique /// position starting from 0. pub trait Contiguous: Send + Sync { /// The type of items stored in the journal. type Item; /// Acquire a reader guard that holds a consistent view of the journal. /// /// While the returned guard exists, operations that need the journal's /// internal write lock (such as `append`, `prune`, and `rewind`) may block /// until the guard is dropped. This ensures any position within /// `reader.bounds()` remains readable. fn reader(&self) -> impl Future + '_> + Send; /// Return the total number of items that have been appended to the journal. /// /// This count is NOT affected by pruning. The next appended item will receive this /// position as its value. Equivalent to [`Reader::bounds`]`.end`. fn size(&self) -> impl Future + Send; } /// Items to append via [`Mutable::append_many`]. /// /// `Flat` wraps a single contiguous slice; `Nested` wraps multiple slices that are /// appended in order under a single lock acquisition. pub enum Many<'a, T> { /// A single contiguous slice of items. Flat(&'a [T]), /// Multiple slices of items, appended in order. Nested(&'a [&'a [T]]), } impl Many<'_, T> { /// Returns `true` if there are no items across all segments. pub fn is_empty(&self) -> bool { match self { Self::Flat(items) => items.is_empty(), Self::Nested(nested_items) => nested_items.iter().all(|items| items.is_empty()), } } } /// A [Contiguous] journal that supports appending, rewinding, and pruning. pub trait Mutable: Contiguous + Send + Sync { /// Append a new item to the journal, returning its position. /// /// Positions are consecutively increasing starting from 0. The position of each item /// is stable across pruning (i.e., if item X has position 5, it will always have /// position 5 even if earlier items are pruned). /// /// # Errors /// /// Returns an error if the underlying storage operation fails or if the item cannot /// be encoded. fn append( &mut self, item: &Self::Item, ) -> impl std::future::Future> + Send; /// Append items to the journal, returning the position of the last item appended. /// /// The default implementation calls [Self::append] in a loop. Concrete implementations /// may override this to acquire the write lock once for all items. /// /// Returns [Error::EmptyAppend] if items is empty. fn append_many<'a>( &'a mut self, items: Many<'a, Self::Item>, ) -> impl std::future::Future> + Send + 'a where Self::Item: Sync, { async move { if items.is_empty() { return Err(Error::EmptyAppend); } let mut last_pos = self.size().await; match items { Many::Flat(items) => { for item in items { last_pos = self.append(item).await?; } } Many::Nested(nested_items) => { for items in nested_items { for item in *items { last_pos = self.append(item).await?; } } } } Ok(last_pos) } } /// Prune items at positions strictly less than `min_position`. /// /// Returns `true` if any data was pruned, `false` otherwise. /// /// # Behavior /// /// - If `min_position > bounds.end`, the prune is capped to `bounds.end` (no error is returned) /// - Some items with positions less than `min_position` may be retained due to /// section/blob alignment /// - This operation is not atomic, but implementations guarantee the journal is left in a /// recoverable state if a crash occurs during pruning /// /// # Errors /// /// Returns an error if the underlying storage operation fails. fn prune( &mut self, min_position: u64, ) -> impl std::future::Future> + Send; /// Rewind the journal to the given size, discarding items from the end. /// /// After rewinding to size N, the journal will contain exactly N items (positions 0 to N-1), /// and the next append will receive position N. /// /// # Behavior /// /// - If `size > bounds.end`, returns [Error::InvalidRewind] /// - If `size == bounds.end`, this is a no-op /// - If `size < bounds.start`, returns [Error::ItemPruned] (can't rewind to pruned data) /// - This operation is not atomic, but implementations guarantee the journal is left in a /// recoverable state if a crash occurs during rewinding /// /// # Warnings /// /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called. /// /// # Errors /// /// Returns [Error::InvalidRewind] if size is invalid (too large or points to pruned data). /// Returns an error if the underlying storage operation fails. fn rewind(&mut self, size: u64) -> impl std::future::Future> + Send; /// Rewinds the journal to the last item matching `predicate`. If no item matches, the journal /// is rewound to the pruning boundary, discarding all unpruned items. /// /// # Warnings /// /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called. fn rewind_to<'a, P>( &'a mut self, mut predicate: P, ) -> impl std::future::Future> + Send + 'a where P: FnMut(&Self::Item) -> bool + Send + 'a, { async move { let (bounds, rewind_size) = { let reader = self.reader().await; let bounds = reader.bounds(); let mut rewind_size = bounds.end; while rewind_size > bounds.start { let item = reader.read(rewind_size - 1).await?; if predicate(&item) { break; } rewind_size -= 1; } (bounds, rewind_size) }; if rewind_size != bounds.end { let rewound_items = bounds.end - rewind_size; warn!( journal_size = bounds.end, rewound_items, "rewinding journal items" ); self.rewind(rewind_size).await?; } Ok(rewind_size) } } }