use super::Header; use crate::{Buf, BufferPool, Error, IoBufs, IoBufsMut}; use cfg_if::cfg_if; use commonware_formatting::hex; use std::{ fs::File, io::IoSlice, os::{fd::AsRawFd, unix::fs::FileExt}, sync::Arc, }; use tokio::task; // Cap iovec batch size: larger iovecs reduce syscall count but increase // per-write kernel setup overhead. const IOVEC_BATCH_SIZE: usize = 32; #[derive(Clone)] pub struct Blob { partition: String, name: Vec, file: Arc, pool: BufferPool, } impl Blob { pub fn new(partition: String, name: &[u8], file: File, pool: BufferPool) -> Self { Self { partition, name: name.into(), file: Arc::new(file), pool, } } fn write_single_at(file: &File, offset: u64, buf: &[u8]) -> Result<(), Error> { file.write_all_at(buf, offset)?; Ok(()) } fn write_vectored_at( file: &File, mut offset: u64, mut bufs: IoBufs, flags: Option, ) -> Result<(), Error> { while bufs.has_remaining() { let mut io_slices = [IoSlice::new(&[]); IOVEC_BATCH_SIZE]; let io_slices_len = bufs.chunks_vectored(&mut io_slices); assert!( io_slices_len > 0, "chunks_vectored should produce at least one slice when bufs has remaining" ); cfg_if! { if #[cfg(target_os = "linux")] { // SAFETY: `IoSlice` is ABI-compatible with `libc::iovec` on Unix. // `io_slices` points to valid readable buffers held alive for this syscall. let ret = unsafe { libc::pwritev2( file.as_raw_fd(), io_slices.as_ptr().cast::(), io_slices_len as i32, offset.try_into().map_err(|_| Error::OffsetOverflow)?, flags.unwrap_or(0), ) }; } else { assert!(flags.is_none(), "flags are only supported on Linux"); // SAFETY: `IoSlice` is ABI-compatible with `libc::iovec` on Unix. // `io_slices` points to valid readable buffers held alive for this syscall. let ret = unsafe { libc::pwritev( file.as_raw_fd(), io_slices.as_ptr().cast::(), io_slices_len as i32, offset.try_into().map_err(|_| Error::OffsetOverflow)?, ) }; } } if ret < 0 { let err = std::io::Error::last_os_error(); if err.kind() == std::io::ErrorKind::Interrupted { continue; } return Err(err.into()); } let bytes_written = ret as usize; if bytes_written == 0 { return Err(Error::WriteFailed); } bufs.advance(bytes_written); offset = offset .checked_add(bytes_written as u64) .ok_or(Error::OffsetOverflow)?; } Ok(()) } } impl crate::Blob for Blob { async fn read_at(&self, offset: u64, len: usize) -> Result { self.read_at_buf(offset, len, self.pool.alloc(len)).await } async fn read_at_buf( &self, offset: u64, len: usize, bufs: impl Into + Send, ) -> Result { let mut bufs = bufs.into(); // SAFETY: `len` bytes are filled via read_exact below. unsafe { bufs.set_len(len) }; let file = self.file.clone(); let pool = self.pool.clone(); let offset = offset .checked_add(Header::SIZE_U64) .ok_or(Error::OffsetOverflow)?; task::spawn_blocking(move || { if let Some(buf) = bufs.as_single_mut() { // Read directly into the single buffer (zero-copy). file.read_exact_at(buf.as_mut(), offset)?; } else { // Read into a temporary contiguous buffer and copy back to preserve structure. // SAFETY: `len` bytes are filled via read_exact_at below. let mut temp = unsafe { pool.alloc_len(len) }; file.read_exact_at(temp.as_mut(), offset)?; bufs.copy_from_slice(temp.as_ref()); } Ok(bufs) }) .await .map_err(|_| Error::ReadFailed)? } async fn write_at(&self, offset: u64, bufs: impl Into + Send) -> Result<(), Error> { let bufs = bufs.into(); let file = self.file.clone(); let offset = offset .checked_add(Header::SIZE_U64) .ok_or(Error::OffsetOverflow)?; task::spawn_blocking(move || match bufs.try_into_single() { Ok(buf) => Self::write_single_at(&file, offset, buf.as_ref()), Err(bufs) => Self::write_vectored_at(&file, offset, bufs, None), }) .await .map_err(|_| Error::WriteFailed)? } async fn write_at_sync( &self, offset: u64, bufs: impl Into + Send, ) -> Result<(), Error> { let bufs = bufs.into(); let file = self.file.clone(); let offset = offset .checked_add(Header::SIZE_U64) .ok_or(Error::OffsetOverflow)?; if !bufs.has_remaining() { return Ok(()); } cfg_if! { if #[cfg(target_os = "linux")] { task::spawn_blocking(move || { Self::write_vectored_at(&file, offset, bufs, Some(libc::RWF_SYNC)) }) .await .map_err(|_| Error::WriteFailed)? } else { let partition = self.partition.clone(); let name = self.name.clone(); task::spawn_blocking(move || { Self::write_vectored_at(&file, offset, bufs, None)?; file.sync_all().map_err(|e| { Error::BlobSyncFailed(partition, hex(&name), e) }) }) .await .map_err(|_| Error::WriteFailed)? } } } async fn resize(&self, len: u64) -> Result<(), Error> { let file = self.file.clone(); let len = len .checked_add(Header::SIZE_U64) .ok_or(Error::OffsetOverflow)?; task::spawn_blocking(move || file.set_len(len)) .await .map_err(|e| e.into()) .and_then(|r| r) .map_err(|e| Error::BlobResizeFailed(self.partition.clone(), hex(&self.name), e))?; Ok(()) } async fn sync(&self) -> Result<(), Error> { let file = self.file.clone(); task::spawn_blocking(move || file.sync_all()) .await .map_err(|e| e.into()) .and_then(|r| r) .map_err(|e| Error::BlobSyncFailed(self.partition.clone(), hex(&self.name), e))?; Ok(()) } }