use crate::Error; use commonware_utils::{from_hex, hex}; #[cfg(unix)] use std::path::Path; use std::{path::PathBuf, sync::Arc}; use tokio::{fs, sync::Mutex}; #[cfg(not(unix))] mod fallback; #[cfg(unix)] mod unix; /// Syncs a directory to ensure directory entry changes are durable. /// On Unix, directory metadata (file creation/deletion) must be explicitly /// fsynced. #[cfg(unix)] async fn sync_dir(path: &Path) -> Result<(), Error> { let dir = fs::File::open(path).await.map_err(|e| { Error::BlobOpenFailed( path.to_string_lossy().to_string(), "directory".to_string(), e, ) })?; dir.sync_all().await.map_err(|e| { Error::BlobSyncFailed( path.to_string_lossy().to_string(), "directory".to_string(), e, ) }) } #[derive(Clone)] pub struct Config { pub storage_directory: PathBuf, pub maximum_buffer_size: usize, } impl Config { pub fn new(storage_directory: PathBuf, maximum_buffer_size: usize) -> Self { Self { storage_directory, maximum_buffer_size, } } } #[derive(Clone)] pub struct Storage { lock: Arc>, cfg: Config, } impl Storage { pub fn new(cfg: Config) -> Self { Self { lock: Arc::new(Mutex::new(())), cfg, } } } impl crate::Storage for Storage { #[cfg(unix)] type Blob = unix::Blob; #[cfg(not(unix))] type Blob = fallback::Blob; async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> { // Acquire the filesystem lock let _guard = self.lock.lock().await; // Construct the full path let path = self.cfg.storage_directory.join(partition).join(hex(name)); let parent = match path.parent() { Some(parent) => parent, None => return Err(Error::PartitionCreationFailed(partition.into())), }; // Check if partition exists before creating #[cfg(unix)] let parent_existed = parent.exists(); // Create the partition directory, if it does not exist fs::create_dir_all(parent) .await .map_err(|_| Error::PartitionCreationFailed(partition.into()))?; // Open the file, creating it if it doesn't exist let mut file = fs::OpenOptions::new() .read(true) .write(true) .create(true) .truncate(false) .open(&path) .await .map_err(|e| Error::BlobOpenFailed(partition.into(), hex(name), e))?; // Assume empty files are newly created. Existing empty files will be synced too; that's OK. let len = file.metadata().await.map_err(|_| Error::ReadFailed)?.len(); let newly_created = len == 0; // Only sync if we created a new file if newly_created { // Sync the file to ensure it is durable file.sync_all() .await .map_err(|e| Error::BlobSyncFailed(partition.into(), hex(name), e))?; // Windows doesn't have a notion of syncing a directory entry to ensure that it's // durably persisted. See https://github.com/commonwarexyz/monorepo/issues/2026. #[cfg(unix)] { // Sync the parent directory to ensure the directory entry is durable. sync_dir(parent).await?; // Sync storage directory if parent directory did not exist if !parent_existed { sync_dir(&self.cfg.storage_directory).await?; } } } // Set the maximum buffer size file.set_max_buf_size(self.cfg.maximum_buffer_size); #[cfg(unix)] { // Convert to a blocking std::fs::File let file = file.into_std().await; // Construct the blob Ok((Self::Blob::new(partition.into(), name, file), len)) } #[cfg(not(unix))] { // Construct the blob Ok((Self::Blob::new(partition.into(), name, file), len)) } } async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> { // Acquire the filesystem lock let _guard = self.lock.lock().await; // Remove all related files let path = self.cfg.storage_directory.join(partition); if let Some(name) = name { let blob_path = path.join(hex(name)); fs::remove_file(blob_path) .await .map_err(|_| Error::BlobMissing(partition.into(), hex(name)))?; // Sync the partition directory to ensure the removal is durable. // Windows doesn't have a notion of syncing a directory entry to ensure that it's // durably persisted. See https://github.com/commonwarexyz/monorepo/issues/2026. #[cfg(unix)] sync_dir(&path).await?; } else { fs::remove_dir_all(&path) .await .map_err(|_| Error::PartitionMissing(partition.into()))?; // Sync the storage directory to ensure the removal is durable. // Windows doesn't have a notion of syncing a directory entry to ensure that it's // durably persisted. See https://github.com/commonwarexyz/monorepo/issues/2026. #[cfg(unix)] sync_dir(&self.cfg.storage_directory).await?; } Ok(()) } async fn scan(&self, partition: &str) -> Result>, Error> { // Acquire the filesystem lock let _guard = self.lock.lock().await; // Scan the partition directory let path = self.cfg.storage_directory.join(partition); let mut entries = fs::read_dir(path) .await .map_err(|_| Error::PartitionMissing(partition.into()))?; let mut blobs = Vec::new(); while let Some(entry) = entries.next_entry().await.map_err(|_| Error::ReadFailed)? { let file_type = entry.file_type().await.map_err(|_| Error::ReadFailed)?; if !file_type.is_file() { return Err(Error::PartitionCorrupt(partition.into())); } if let Some(name) = entry.file_name().to_str() { let name = from_hex(name).ok_or(Error::PartitionCorrupt(partition.into()))?; blobs.push(name); } } Ok(blobs) } } #[cfg(test)] mod tests { use super::*; use crate::storage::tests::run_storage_tests; use rand::{Rng as _, SeedableRng}; use std::env; #[tokio::test] async fn test_storage() { let mut rng = rand::rngs::StdRng::from_entropy(); let storage_directory = env::temp_dir().join(format!("storage_tokio_{}", rng.gen::())); let config = Config::new(storage_directory, 2 * 1024 * 1024); let storage = Storage::new(config); run_storage_tests(storage).await; } }