use super::Header; use commonware_codec::Encode; use commonware_utils::{hex, StableBuf}; use sha2::{Digest, Sha256}; use std::{ collections::BTreeMap, ops::RangeInclusive, sync::{Arc, Mutex, RwLock}, }; /// In-memory storage implementation for the commonware runtime. #[derive(Clone)] pub struct Storage { partitions: Arc>>, } impl Default for Storage { fn default() -> Self { Self { partitions: Arc::new(Mutex::new(BTreeMap::new())), } } } impl Storage { /// Compute a [Sha256] digest of all blob contents. pub fn audit(&self) -> [u8; 32] { let partitions = self.partitions.lock().unwrap(); let mut hasher = Sha256::new(); for (partition_name, blobs) in partitions.iter() { for (blob_name, content) in blobs.iter() { hasher.update(partition_name.as_bytes()); hasher.update(blob_name); hasher.update(content); } } hasher.finalize().into() } } impl crate::Storage for Storage { type Blob = Blob; async fn open_versioned( &self, partition: &str, name: &[u8], versions: RangeInclusive, ) -> Result<(Self::Blob, u64, u16), crate::Error> { super::validate_partition_name(partition)?; let mut partitions = self.partitions.lock().unwrap(); let partition_entry = partitions.entry(partition.into()).or_default(); let content = partition_entry.entry(name.into()).or_default(); let raw_len = content.len() as u64; let (blob_version, logical_len) = if Header::missing(raw_len) { // New or corrupted blob - truncate and write default header with latest version let (header, blob_version) = Header::new(&versions); content.clear(); content.extend_from_slice(&header.encode()); (blob_version, 0) } else { // Existing blob - read and validate header let mut header_bytes = [0u8; Header::SIZE]; header_bytes.copy_from_slice(&content[..Header::SIZE]); Header::from(header_bytes, raw_len, &versions) .map_err(|e| e.into_error(partition, name))? }; Ok(( Blob::new( self.partitions.clone(), partition.into(), name, content.clone(), ), logical_len, blob_version, )) } async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), crate::Error> { super::validate_partition_name(partition)?; let mut partitions = self.partitions.lock().unwrap(); match name { Some(name) => { partitions .get_mut(partition) .ok_or(crate::Error::PartitionMissing(partition.into()))? .remove(name) .ok_or(crate::Error::BlobMissing(partition.into(), hex(name)))?; } None => { partitions .remove(partition) .ok_or(crate::Error::PartitionMissing(partition.into()))?; } } Ok(()) } async fn scan(&self, partition: &str) -> Result>, crate::Error> { super::validate_partition_name(partition)?; let partitions = self.partitions.lock().unwrap(); let partition = partitions .get(partition) .ok_or(crate::Error::PartitionMissing(partition.into()))?; let mut results = Vec::with_capacity(partition.len()); for name in partition.keys() { results.push(name.clone()); } results.sort(); // Ensure deterministic output Ok(results) } } type Partition = BTreeMap, Vec>; #[derive(Clone)] pub struct Blob { partitions: Arc>>, partition: String, name: Vec, content: Arc>>, } impl Blob { fn new( partitions: Arc>>, partition: String, name: &[u8], content: Vec, ) -> Self { Self { partitions, partition, name: name.into(), content: Arc::new(RwLock::new(content)), } } } impl crate::Blob for Blob { async fn read_at( &self, buf: impl Into + Send, offset: u64, ) -> Result { let mut buf = buf.into(); let offset = offset .checked_add(Header::SIZE_U64) .ok_or(crate::Error::OffsetOverflow)?; let offset: usize = offset .try_into() .map_err(|_| crate::Error::OffsetOverflow)?; let content = self.content.read().unwrap(); let content_len = content.len(); if offset + buf.len() > content_len { return Err(crate::Error::BlobInsufficientLength); } buf.put_slice(&content[offset..offset + buf.len()]); Ok(buf) } async fn write_at( &self, buf: impl Into + Send, offset: u64, ) -> Result<(), crate::Error> { let buf = buf.into(); let offset = offset .checked_add(Header::SIZE_U64) .ok_or(crate::Error::OffsetOverflow)?; let offset: usize = offset .try_into() .map_err(|_| crate::Error::OffsetOverflow)?; let mut content = self.content.write().unwrap(); let required = offset + buf.len(); if required > content.len() { content.resize(required, 0); } content[offset..offset + buf.len()].copy_from_slice(buf.as_ref()); Ok(()) } async fn resize(&self, len: u64) -> Result<(), crate::Error> { let len = len .checked_add(Header::SIZE_U64) .ok_or(crate::Error::OffsetOverflow)?; let len: usize = len.try_into().map_err(|_| crate::Error::OffsetOverflow)?; let mut content = self.content.write().unwrap(); content.resize(len, 0); Ok(()) } async fn sync(&self) -> Result<(), crate::Error> { // Create new content for partition let new_content = self.content.read().unwrap().clone(); // Update partition content let mut partitions = self.partitions.lock().unwrap(); let partition = partitions .get_mut(&self.partition) .ok_or(crate::Error::PartitionMissing(self.partition.clone()))?; let content = partition .get_mut(&self.name) .ok_or(crate::Error::BlobMissing( self.partition.clone(), hex(&self.name), ))?; *content = new_content; Ok(()) } } #[cfg(test)] mod tests { use super::{Header, *}; use crate::{storage::tests::run_storage_tests, Blob, Storage as _}; #[tokio::test] async fn test_memory_storage() { let storage = Storage::default(); run_storage_tests(storage).await; } #[tokio::test] async fn test_blob_header_handling() { let storage = Storage::default(); // New blob returns logical size 0 let (blob, size) = storage.open("partition", b"test").await.unwrap(); assert_eq!(size, 0, "new blob should have logical size 0"); // Verify raw storage has 8 bytes (header only) { let partitions = storage.partitions.lock().unwrap(); let partition = partitions.get("partition").unwrap(); let raw_content = partition.get(&b"test".to_vec()).unwrap(); assert_eq!( raw_content.len(), Header::SIZE, "raw storage should have 8-byte header" ); } // Write at logical offset 0 stores at raw offset 8 let data = b"hello world"; blob.write_at(data.to_vec(), 0).await.unwrap(); blob.sync().await.unwrap(); // Verify raw storage layout { let partitions = storage.partitions.lock().unwrap(); let partition = partitions.get("partition").unwrap(); let raw_content = partition.get(&b"test".to_vec()).unwrap(); assert_eq!(raw_content.len(), Header::SIZE + data.len()); assert_eq!(&raw_content[..Header::MAGIC_LENGTH], &Header::MAGIC); assert_eq!(&raw_content[Header::SIZE..], data); } // Read at logical offset 0 returns data from raw offset 8 let read_buf = blob.read_at(vec![0u8; data.len()], 0).await.unwrap(); assert_eq!(read_buf.as_ref(), data); // Corrupted blob recovery (0 < raw_size < 8) { let mut partitions = storage.partitions.lock().unwrap(); let partition = partitions.get_mut("partition").unwrap(); partition.insert(b"corrupted".to_vec(), vec![0u8; 2]); } // Opening should truncate and write fresh header let (_blob, size) = storage.open("partition", b"corrupted").await.unwrap(); assert_eq!(size, 0, "corrupted blob should return logical size 0"); // Verify raw storage now has proper 8-byte header { let partitions = storage.partitions.lock().unwrap(); let partition = partitions.get("partition").unwrap(); let raw_content = partition.get(&b"corrupted".to_vec()).unwrap(); assert_eq!( raw_content.len(), Header::SIZE, "corrupted blob should be reset to header-only" ); } } #[tokio::test] async fn test_blob_magic_mismatch() { let storage = Storage::default(); // Manually insert a blob with invalid magic bytes { let mut partitions = storage.partitions.lock().unwrap(); let partition = partitions.entry("partition".into()).or_default(); partition.insert(b"bad_magic".to_vec(), vec![0u8; Header::SIZE]); } // Opening should fail with corrupt error let result = storage.open("partition", b"bad_magic").await; assert!( matches!(result, Err(crate::Error::BlobCorrupt(_, _, reason)) if reason.contains("invalid magic")) ); } }