use crate::simulate::processed::ProcessedHeight; use commonware_consensus::{ marshal::{self, core::Variant, Identifier as MarshalIdentifier}, simplex::{mocks::scheme::Scheme as MockScheme, types::Finalization}, types::Height, Heightable, }; use commonware_cryptography::{ed25519, sha256, Digestible}; use commonware_runtime::{buffer::paged::CacheRef, Clock, Quota}; use commonware_storage::archive::immutable; use commonware_utils::{sync::Mutex, NZUsize, NZU16, NZU64}; use std::{ collections::{BTreeMap, HashMap}, num::{NonZeroU16, NonZeroU32, NonZeroU64, NonZeroUsize}, sync::Arc, time::Duration, }; pub(super) const EPOCH_LENGTH: NonZeroU64 = NZU64!(u64::MAX); pub(super) const NAMESPACE: &[u8] = b"stateful_e2e_test"; pub(super) const PAGE_SIZE: NonZeroU16 = NZU16!(1024); pub(super) const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10); pub(super) const IO_BUFFER_SIZE: NonZeroUsize = NZUsize!(2048); pub(super) const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX); pub(super) fn u64_to_digest(v: u64) -> sha256::Digest { let mut bytes = [0u8; 32]; bytes[..8].copy_from_slice(&v.to_be_bytes()); sha256::Digest::from(bytes) } pub(super) fn digest_to_u64(d: &sha256::Digest) -> u64 { let bytes: &[u8] = d.as_ref(); u64::from_be_bytes(bytes[..8].try_into().unwrap()) } pub(super) fn archive_config( prefix: &str, name: &str, page_cache: CacheRef, codec_config: C, ) -> immutable::Config { immutable::Config { metadata_partition: format!("{prefix}-{name}-metadata"), freezer_table_partition: format!("{prefix}-{name}-freezer-table"), freezer_table_initial_size: 64, freezer_table_resize_frequency: 10, freezer_table_resize_chunk_size: 10, freezer_key_partition: format!("{prefix}-{name}-freezer-key"), freezer_key_page_cache: page_cache, freezer_value_partition: format!("{prefix}-{name}-freezer-value"), freezer_value_target_size: 1024, freezer_value_compression: None, ordinal_partition: format!("{prefix}-{name}-ordinal"), items_per_section: NZU64!(10), codec_config, replay_buffer: IO_BUFFER_SIZE, freezer_key_write_buffer: IO_BUFFER_SIZE, freezer_value_write_buffer: IO_BUFFER_SIZE, ordinal_write_buffer: IO_BUFFER_SIZE, } } /// Per-validator state inspectable by test properties. /// /// Generic over the marshal variant so both single-db and multi-db engines /// can share the same state type and property implementations. #[derive(Clone)] pub(crate) struct MockValidatorState { pub(super) marshal: marshal::core::Mailbox, V>, pub(super) state_sync_entries: u64, pub(super) state_sync_height: Option, } impl PartialEq for MockValidatorState { fn eq(&self, other: &Self) -> bool { self.state_sync_entries == other.state_sync_entries && self.state_sync_height == other.state_sync_height } } impl MockValidatorState where V: Variant, V::ApplicationBlock: Digestible, { pub(crate) async fn digest_at_height(&self, height: u64) -> Option { self.marshal .get_block(MarshalIdentifier::Height(Height::new(height))) .await .map(|b| b.digest()) } pub(crate) const fn state_sync_height(&self) -> Option { self.state_sync_height } pub(crate) const fn state_sync_entries(&self) -> u64 { self.state_sync_entries } } pub(super) type MarshalMailboxOf = marshal::core::Mailbox, V>; /// Poll peers for a majority-agreed sync floor. pub(super) async fn fetch_majority_sync_floor( mailboxes: &Arc>>>, context: &impl Clock, me: &ed25519::PublicKey, ) -> Option<( Finalization, V::Commitment>, Height, )> where V: Variant, V::ApplicationBlock: Digestible, { for _ in 0..20 { let peers_mailboxes: Vec> = { let guard = mailboxes.lock(); guard .iter() .filter(|(peer, _)| *peer != me) .map(|(_, mailbox)| mailbox.clone()) .collect() }; // Collect latest heights from all peers. let mut peers: Vec<(MarshalMailboxOf, Height)> = Vec::new(); for mailbox in peers_mailboxes { if let Some(height) = mailbox .get_block(MarshalIdentifier::Latest) .await .map(|b| b.height()) { peers.push((mailbox, height)); } } if peers.is_empty() { context.sleep(Duration::from_millis(100)).await; continue; } // Find the highest height that a majority of peers have reached. let required = peers.len() / 2 + 1; let mut heights: Vec = peers.iter().map(|(_, h)| *h).collect(); heights.sort(); let quorum_height = heights[heights.len() - required]; // Count digests at quorum height and return the first finalization with majority agreement. let mut counts: HashMap)> = HashMap::new(); for (mailbox, h) in &peers { if *h < quorum_height { continue; } if let Some(digest) = mailbox .get_block(MarshalIdentifier::Height(quorum_height)) .await .map(|b| b.digest()) { counts .entry(digest) .and_modify(|(c, _)| *c += 1) .or_insert((1, mailbox.clone())); } } for (digest, (count, mailbox)) in counts { if count >= required { let finalization = mailbox .get_finalization(quorum_height) .await .expect("sync floor finalization must be available"); assert_eq!( V::commitment_to_inner(finalization.proposal.payload), digest ); return Some((finalization, quorum_height)); } } context.sleep(Duration::from_millis(100)).await; } None } impl ProcessedHeight for MockValidatorState where V: Variant, V::ApplicationBlock: Digestible, { async fn processed_height(&self) -> u64 { self.marshal .get_processed_height() .await .map_or(0, |height| height.get()) } }