//! Ordered, reliable broadcast across reconfigurable participants. //! //! # Concepts //! //! The system has two types of network participants: `sequencers` and `validators`. Their sets may //! overlap and are defined by the current `epoch`, a monotonically increasing integer. This module //! can handle reconfiguration of these sets across different epochs. //! //! Sequencers broadcast data. The smallest unit of data is a `chunk`. Sequencers broadcast `node`s //! that contain a chunk and a threshold signature over the previous chunk, forming a linked chain //! of nodes from each sequencer. //! //! Validators verify and sign chunks using partial signatures. These can be combined to recover a //! threshold signature, ensuring a quorum verifies each chunk. The threshold signature allows //! external parties to confirm that the chunk was reliably broadcast. //! //! Network participants persist any new nodes to a journal. This enables recovery from crashes and //! ensures that sequencers do not broadcast conflicting chunks and that validators do not sign //! them. "Conflicting" chunks are chunks from the same sequencer at the same height with different //! payloads. //! //! # Design //! //! The core of the module is the [Engine]. It is responsible for: //! - Broadcasting nodes (if a sequencer) //! - Signing chunks (if a validator) //! - Tracking the latest chunk in each sequencer’s chain //! - Recovering threshold signatures from partial signatures for each chunk //! - Notifying other actors of new chunks and threshold signatures //! //! # Acknowledgements //! //! [Autobahn](https://arxiv.org/abs/2401.10369) provided the insight that a succinct //! proof-of-availability could be produced by linking sequencer broadcasts. pub mod types; cfg_if::cfg_if! { if #[cfg(not(target_arch = "wasm32"))] { mod ack_manager; use ack_manager::AckManager; mod config; pub use config::Config; mod engine; pub use engine::Engine; mod metrics; mod tip_manager; use tip_manager::TipManager; } } #[cfg(test)] pub mod mocks; #[cfg(test)] mod tests { use super::{mocks, Config, Engine}; use crate::types::Epoch; use commonware_cryptography::{ bls12381::{ dkg::ops, primitives::{ group::Share, poly, variant::{MinPk, MinSig, Variant}, }, }, ed25519::{PrivateKey, PublicKey}, sha256::Digest as Sha256Digest, PrivateKeyExt as _, Signer as _, }; use commonware_macros::test_traced; use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender}; use commonware_runtime::{ buffer::PoolRef, deterministic::{self, Context}, Clock, Metrics, Runner, Spawner, }; use commonware_utils::{quorum, NZUsize}; use futures::{channel::oneshot, future::join_all}; use rand::{rngs::StdRng, SeedableRng as _}; use std::{ collections::{BTreeMap, HashMap, HashSet}, num::NonZeroUsize, sync::{Arc, Mutex}, time::Duration, }; use tracing::debug; const PAGE_SIZE: NonZeroUsize = NZUsize!(1024); const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10); type Registrations

= BTreeMap, Receiver

), (Sender

, Receiver

))>; async fn register_participants( oracle: &mut Oracle, participants: &[PublicKey], ) -> Registrations { let mut registrations = BTreeMap::new(); for participant in participants.iter() { let mut control = oracle.control(participant.clone()); let (a1, a2) = control.register(0).await.unwrap(); let (b1, b2) = control.register(1).await.unwrap(); registrations.insert(participant.clone(), ((a1, a2), (b1, b2))); } registrations } enum Action { Link(Link), Update(Link), Unlink, } async fn link_participants( oracle: &mut Oracle, participants: &[PublicKey], action: Action, restrict_to: Option bool>, ) { for (i1, v1) in participants.iter().enumerate() { for (i2, v2) in participants.iter().enumerate() { if v2 == v1 { continue; } if let Some(f) = restrict_to { if !f(participants.len(), i1, i2) { continue; } } if matches!(action, Action::Update(_) | Action::Unlink) { oracle.remove_link(v1.clone(), v2.clone()).await.unwrap(); } if let Action::Link(ref link) | Action::Update(ref link) = action { oracle .add_link(v1.clone(), v2.clone(), link.clone()) .await .unwrap(); } } } } async fn initialize_simulation( context: Context, num_validators: u32, shares_vec: &mut [Share], ) -> ( Oracle, Vec<(PublicKey, PrivateKey, Share)>, Vec, Registrations, ) { let (network, mut oracle) = Network::new( context.with_label("network"), commonware_p2p::simulated::Config { max_size: 1024 * 1024, disconnect_on_block: true, tracked_peer_sets: None, }, ); network.start(); let mut schemes = (0..num_validators) .map(|i| PrivateKey::from_seed(i as u64)) .collect::>(); schemes.sort_by_key(|s| s.public_key()); let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes .iter() .enumerate() .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone())) .collect(); let pks = validators .iter() .map(|(pk, _, _)| pk.clone()) .collect::>(); let registrations = register_participants(&mut oracle, &pks).await; let link = Link { latency: Duration::from_millis(10), jitter: Duration::from_millis(1), success_rate: 1.0, }; link_participants(&mut oracle, &pks, Action::Link(link), None).await; (oracle, validators, pks, registrations) } #[allow(clippy::too_many_arguments)] fn spawn_validator_engines( context: Context, polynomial: poly::Public, sequencer_pks: &[PublicKey], validator_pks: &[PublicKey], validators: &[(PublicKey, PrivateKey, Share)], registrations: &mut Registrations, automatons: &mut BTreeMap>, reporters: &mut BTreeMap>, rebroadcast_timeout: Duration, invalid_when: fn(u64) -> bool, misses_allowed: Option, ) -> HashMap { let mut monitors = HashMap::new(); let namespace = b"my testing namespace"; for (validator, scheme, share) in validators.iter() { let context = context.with_label(&validator.to_string()); let monitor = mocks::Monitor::new(111); monitors.insert(validator.clone(), monitor.clone()); let sequencers = mocks::Sequencers::::new(sequencer_pks.to_vec()); let validators = mocks::Validators::::new( polynomial.clone(), validator_pks.to_vec(), Some(share.clone()), ); let automaton = mocks::Automaton::::new(invalid_when); automatons.insert(validator.clone(), automaton.clone()); let (reporter, reporter_mailbox) = mocks::Reporter::::new( namespace, *poly::public::(&polynomial), misses_allowed, ); context.with_label("reporter").spawn(|_| reporter.run()); reporters.insert(validator.clone(), reporter_mailbox); let engine = Engine::new( context.with_label("engine"), Config { crypto: scheme.clone(), relay: automaton.clone(), automaton: automaton.clone(), reporter: reporters.get(validator).unwrap().clone(), monitor, sequencers, validators, namespace: namespace.to_vec(), epoch_bounds: (1, 1), height_bound: 2, rebroadcast_timeout, priority_acks: false, priority_proposals: false, journal_heights_per_section: 10, journal_replay_buffer: NZUsize!(4096), journal_write_buffer: NZUsize!(4096), journal_name_prefix: format!("ordered-broadcast-seq/{validator}/"), journal_compression: Some(3), journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE), }, ); let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap(); engine.start((a1, a2), (b1, b2)); } monitors } async fn await_reporters( context: Context, sequencers: Vec, reporters: &BTreeMap>, threshold: (u64, Epoch, bool), ) { let mut receivers = Vec::new(); for (reporter, mailbox) in reporters.iter() { // Spawn a watcher for the reporter. for sequencer in sequencers.iter() { // Create a oneshot channel to signal when the reporter has reached the threshold. let (tx, rx) = oneshot::channel(); receivers.push(rx); context.with_label("reporter_watcher").spawn({ let reporter = reporter.clone(); let sequencer = sequencer.clone(); let mut mailbox = mailbox.clone(); move |context| async move { loop { let (height, epoch) = mailbox.get_tip(sequencer.clone()).await.unwrap_or((0, 0)); debug!(height, epoch, ?sequencer, ?reporter, "reporter"); let contiguous_height = mailbox .get_contiguous_tip(sequencer.clone()) .await .unwrap_or(0); if height >= threshold.0 && epoch >= threshold.1 && (!threshold.2 || contiguous_height >= threshold.0) { let _ = tx.send(sequencer.clone()); break; } context.sleep(Duration::from_millis(100)).await; } } }); } } // Wait for all oneshot receivers to complete. let results = join_all(receivers).await; assert_eq!(results.len(), sequencers.len() * reporters.len()); // Check that none were cancelled. for result in results { assert!(result.is_ok(), "reporter was cancelled"); } } async fn get_max_height( reporters: &mut BTreeMap>, ) -> u64 { let mut max_height = 0; for (sequencer, mailbox) in reporters.iter_mut() { let (height, _) = mailbox.get_tip(sequencer.clone()).await.unwrap_or((0, 0)); if height > max_height { max_height = height; } } max_height } fn all_online() { let num_validators: u32 = 4; let quorum: u32 = 3; let runner = deterministic::Runner::timed(Duration::from_secs(30)); runner.start(|mut context| async move { let (polynomial, mut shares_vec) = ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum); shares_vec.sort_by(|a, b| a.index.cmp(&b.index)); let (_oracle, validators, pks, mut registrations) = initialize_simulation( context.with_label("simulation"), num_validators, &mut shares_vec, ) .await; let automatons = Arc::new(Mutex::new( BTreeMap::>::new(), )); let mut reporters = BTreeMap::>::new(); spawn_validator_engines::( context.with_label("validator"), polynomial.clone(), &pks, &pks, &validators, &mut registrations, &mut automatons.lock().unwrap(), &mut reporters, Duration::from_secs(5), |_| false, Some(5), ); await_reporters( context.with_label("reporter"), reporters.keys().cloned().collect::>(), &reporters, (100, 111, true), ) .await; }); } #[test_traced] fn test_all_online() { all_online::(); all_online::(); } fn unclean_shutdown() { let num_validators: u32 = 4; let quorum: u32 = 3; let mut rng = StdRng::seed_from_u64(0); let (polynomial, mut shares_vec) = ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum); shares_vec.sort_by(|a, b| a.index.cmp(&b.index)); let completed = Arc::new(Mutex::new(HashSet::new())); let shutdowns = Arc::new(Mutex::new(0u64)); let mut prev_checkpoint = None; while completed.lock().unwrap().len() != num_validators as usize { let completed = completed.clone(); let shares_vec = shares_vec.clone(); let shutdowns = shutdowns.clone(); let polynomial = polynomial.clone(); let f = |context: deterministic::Context| async move { let (network, mut oracle) = Network::new( context.with_label("network"), commonware_p2p::simulated::Config { max_size: 1024 * 1024, disconnect_on_block: true, tracked_peer_sets: None, }, ); network.start(); let mut schemes = (0..num_validators) .map(|i| PrivateKey::from_seed(i as u64)) .collect::>(); schemes.sort_by_key(|s| s.public_key()); let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes .iter() .enumerate() .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone())) .collect(); let pks = validators .iter() .map(|(pk, _, _)| pk.clone()) .collect::>(); let mut registrations = register_participants(&mut oracle, &pks).await; let link = commonware_p2p::simulated::Link { latency: Duration::from_millis(10), jitter: Duration::from_millis(1), success_rate: 1.0, }; link_participants(&mut oracle, &pks, Action::Link(link), None).await; let automatons = Arc::new(Mutex::new(BTreeMap::< PublicKey, mocks::Automaton, >::new())); let mut reporters = BTreeMap::>::new( ); spawn_validator_engines( context.with_label("validator"), polynomial.clone(), &pks, &pks, &validators, &mut registrations, &mut automatons.lock().unwrap(), &mut reporters, Duration::from_secs(5), |_| false, None, ); let reporter_pairs: Vec<( PublicKey, mocks::ReporterMailbox, )> = reporters .iter() .map(|(v, m)| (v.clone(), m.clone())) .collect(); for (validator, mut mailbox) in reporter_pairs { let completed_clone = completed.clone(); context .with_label("reporter_unclean") .spawn(|context| async move { loop { let (height, _) = mailbox.get_tip(validator.clone()).await.unwrap_or((0, 0)); if height >= 100 { completed_clone.lock().unwrap().insert(validator.clone()); break; } context.sleep(Duration::from_millis(100)).await; } }); } context.sleep(Duration::from_millis(1000)).await; *shutdowns.lock().unwrap() += 1; }; let (_, checkpoint) = if let Some(prev_checkpoint) = prev_checkpoint { deterministic::Runner::from(prev_checkpoint) } else { deterministic::Runner::timed(Duration::from_secs(45)) } .start_and_recover(f); prev_checkpoint = Some(checkpoint); } } #[test_traced] fn test_unclean_shutdown() { unclean_shutdown::(); unclean_shutdown::(); } fn network_partition() { let num_validators: u32 = 4; let quorum: u32 = 3; let runner = deterministic::Runner::timed(Duration::from_secs(60)); runner.start(|mut context| async move { let (polynomial, mut shares_vec) = ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum); shares_vec.sort_by(|a, b| a.index.cmp(&b.index)); // Configure the network let (mut oracle, validators, pks, mut registrations) = initialize_simulation( context.with_label("simulation"), num_validators, &mut shares_vec, ) .await; let automatons = Arc::new(Mutex::new( BTreeMap::>::new(), )); let mut reporters = BTreeMap::>::new(); spawn_validator_engines( context.with_label("validator"), polynomial.clone(), &pks, &pks, &validators, &mut registrations, &mut automatons.lock().unwrap(), &mut reporters, Duration::from_secs(1), |_| false, None, ); // Simulate partition by removing all links. link_participants(&mut oracle, &pks, Action::Unlink, None).await; context.sleep(Duration::from_secs(30)).await; // Get the maximum height from all reporters. let max_height = get_max_height(&mut reporters).await; // Heal the partition by re-adding links. let link = Link { latency: Duration::from_millis(10), jitter: Duration::from_millis(1), success_rate: 1.0, }; link_participants(&mut oracle, &pks, Action::Link(link), None).await; await_reporters( context.with_label("reporter"), reporters.keys().cloned().collect::>(), &reporters, (max_height + 100, 111, false), ) .await; }); } #[test_traced] #[ignore] fn test_network_partition() { network_partition::(); network_partition::(); } fn slow_and_lossy_links(seed: u64) -> String { let num_validators: u32 = 4; let quorum: u32 = 3; let cfg = deterministic::Config::new() .with_seed(seed) .with_timeout(Some(Duration::from_secs(40))); let runner = deterministic::Runner::new(cfg); runner.start(|mut context| async move { let (polynomial, mut shares_vec) = ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum); shares_vec.sort_by(|a, b| a.index.cmp(&b.index)); let (oracle, validators, pks, mut registrations) = initialize_simulation( context.with_label("simulation"), num_validators, &mut shares_vec, ) .await; let delayed_link = Link { latency: Duration::from_millis(50), jitter: Duration::from_millis(40), success_rate: 0.5, }; let mut oracle_clone = oracle.clone(); link_participants(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await; let automatons = Arc::new(Mutex::new( BTreeMap::>::new(), )); let mut reporters = BTreeMap::>::new(); spawn_validator_engines( context.with_label("validator"), polynomial.clone(), &pks, &pks, &validators, &mut registrations, &mut automatons.lock().unwrap(), &mut reporters, Duration::from_millis(150), |_| false, None, ); await_reporters( context.with_label("reporter"), reporters.keys().cloned().collect::>(), &reporters, (40, 111, false), ) .await; context.auditor().state() }) } #[test_traced] fn test_slow_and_lossy_links() { slow_and_lossy_links::(0); slow_and_lossy_links::(0); } #[test_traced] #[ignore] fn test_determinism() { // We use slow and lossy links as the deterministic test // because it is the most complex test. for seed in 1..6 { let pk_state_1 = slow_and_lossy_links::(seed); let pk_state_2 = slow_and_lossy_links::(seed); assert_eq!(pk_state_1, pk_state_2); let sig_state_1 = slow_and_lossy_links::(seed); let sig_state_2 = slow_and_lossy_links::(seed); assert_eq!(sig_state_1, sig_state_2); // Sanity check that different types can't be identical. assert_ne!(pk_state_1, sig_state_1); } } fn invalid_signature_injection() { let num_validators: u32 = 4; let quorum: u32 = 3; let runner = deterministic::Runner::timed(Duration::from_secs(30)); runner.start(|mut context| async move { let (polynomial, mut shares_vec) = ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum); shares_vec.sort_by(|a, b| a.index.cmp(&b.index)); let (_oracle, validators, pks, mut registrations) = initialize_simulation( context.with_label("simulation"), num_validators, &mut shares_vec, ) .await; let automatons = Arc::new(Mutex::new( BTreeMap::>::new(), )); let mut reporters = BTreeMap::>::new(); spawn_validator_engines::( context.with_label("validator"), polynomial.clone(), &pks, &pks, &validators, &mut registrations, &mut automatons.lock().unwrap(), &mut reporters, Duration::from_secs(5), |i| i % 10 == 0, None, ); await_reporters( context.with_label("reporter"), reporters.keys().cloned().collect::>(), &reporters, (100, 111, true), ) .await; }); } #[test_traced] fn test_invalid_signature_injection() { invalid_signature_injection::(); invalid_signature_injection::(); } fn updated_epoch() { let num_validators: u32 = 4; let quorum: u32 = 3; let runner = deterministic::Runner::timed(Duration::from_secs(60)); runner.start(|mut context| async move { let (polynomial, mut shares_vec) = ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum); shares_vec.sort_by(|a, b| a.index.cmp(&b.index)); // Setup network let (mut oracle, validators, pks, mut registrations) = initialize_simulation( context.with_label("simulation"), num_validators, &mut shares_vec, ) .await; let automatons = Arc::new(Mutex::new( BTreeMap::>::new(), )); let mut reporters = BTreeMap::>::new(); let monitors = spawn_validator_engines::( context.with_label("validator"), polynomial.clone(), &pks, &pks, &validators, &mut registrations, &mut automatons.lock().unwrap(), &mut reporters, Duration::from_secs(1), |_| false, Some(5), ); // Perform some work await_reporters( context.with_label("reporter"), reporters.keys().cloned().collect::>(), &reporters, (100, 111, true), ) .await; // Simulate partition by removing all links. link_participants(&mut oracle, &pks, Action::Unlink, None).await; context.sleep(Duration::from_secs(30)).await; // Get the maximum height from all reporters. let max_height = get_max_height(&mut reporters).await; // Update the epoch for monitor in monitors.values() { monitor.update(112); } // Heal the partition by re-adding links. let link = Link { latency: Duration::from_millis(10), jitter: Duration::from_millis(1), success_rate: 1.0, }; link_participants(&mut oracle, &pks, Action::Link(link), None).await; await_reporters( context.with_label("reporter"), reporters.keys().cloned().collect::>(), &reporters, (max_height + 100, 112, true), ) .await; }); } #[test_traced] fn test_updated_epoch() { updated_epoch::(); updated_epoch::(); } fn external_sequencer() { let num_validators: u32 = 4; let quorum: u32 = quorum(3); let runner = deterministic::Runner::timed(Duration::from_secs(60)); runner.start(|mut context| async move { // Generate validator shares let (polynomial, shares) = ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum); // Generate validator schemes let mut schemes = (0..num_validators) .map(|i| PrivateKey::from_seed(i as u64)) .collect::>(); schemes.sort_by_key(|s| s.public_key()); // Generate validators let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes .iter() .enumerate() .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares[i].clone())) .collect(); let validator_pks = validators .iter() .map(|(pk, _, _)| pk.clone()) .collect::>(); // Generate sequencer let sequencer = PrivateKey::from_seed(u64::MAX); // Generate network participants let mut participants = validators .iter() .map(|(pk, _, _)| pk.clone()) .collect::>(); participants.push(sequencer.public_key()); // as long as external participants are in same position for all, it is safe // Create network let (network, mut oracle) = Network::new( context.with_label("network"), commonware_p2p::simulated::Config { max_size: 1024 * 1024, disconnect_on_block: true, tracked_peer_sets: None, }, ); network.start(); // Register all participants let mut registrations = register_participants(&mut oracle, &participants).await; let link = commonware_p2p::simulated::Link { latency: Duration::from_millis(10), jitter: Duration::from_millis(1), success_rate: 1.0, }; link_participants(&mut oracle, &participants, Action::Link(link), None).await; // Setup engines let automatons = Arc::new(Mutex::new( BTreeMap::>::new(), )); let mut reporters = BTreeMap::>::new(); let mut monitors = HashMap::new(); let namespace = b"my testing namespace"; // Spawn validator engines for (validator, scheme, share) in validators.iter() { let context = context.with_label(&validator.to_string()); let monitor = mocks::Monitor::new(111); monitors.insert(validator.clone(), monitor.clone()); let sequencers = mocks::Sequencers::::new(vec![sequencer.public_key()]); let validators = mocks::Validators::::new( polynomial.clone(), validator_pks.clone(), Some(share.clone()), ); let automaton = mocks::Automaton::::new(|_| false); automatons .lock() .unwrap() .insert(validator.clone(), automaton.clone()); let (reporter, reporter_mailbox) = mocks::Reporter::::new( namespace, *poly::public::(&polynomial), Some(5), ); context.with_label("reporter").spawn(|_| reporter.run()); reporters.insert(validator.clone(), reporter_mailbox); let engine = Engine::new( context.with_label("engine"), Config { crypto: scheme.clone(), relay: automaton.clone(), automaton: automaton.clone(), reporter: reporters.get(validator).unwrap().clone(), monitor, sequencers, validators, namespace: namespace.to_vec(), epoch_bounds: (1, 1), height_bound: 2, rebroadcast_timeout: Duration::from_secs(5), priority_acks: false, priority_proposals: false, journal_heights_per_section: 10, journal_replay_buffer: NZUsize!(4096), journal_write_buffer: NZUsize!(4096), journal_name_prefix: format!("ordered-broadcast-seq/{validator}/"), journal_compression: Some(3), journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE), }, ); let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap(); engine.start((a1, a2), (b1, b2)); } // Spawn sequencer engine { let context = context.with_label("sequencer"); let automaton = mocks::Automaton::::new(|_| false); automatons .lock() .unwrap() .insert(sequencer.public_key(), automaton.clone()); let (reporter, reporter_mailbox) = mocks::Reporter::::new( namespace, *poly::public::(&polynomial), Some(5), ); context.with_label("reporter").spawn(|_| reporter.run()); reporters.insert(sequencer.public_key(), reporter_mailbox); let engine = Engine::new( context.with_label("engine"), Config { crypto: sequencer.clone(), relay: automaton.clone(), automaton: automaton.clone(), reporter: reporters.get(&sequencer.public_key()).unwrap().clone(), monitor: mocks::Monitor::new(111), sequencers: mocks::Sequencers::::new(vec![ sequencer.public_key() ]), validators: mocks::Validators::::new( polynomial.clone(), validator_pks, None, ), namespace: namespace.to_vec(), epoch_bounds: (1, 1), height_bound: 2, rebroadcast_timeout: Duration::from_secs(5), priority_acks: false, priority_proposals: false, journal_heights_per_section: 10, journal_replay_buffer: NZUsize!(4096), journal_write_buffer: NZUsize!(4096), journal_name_prefix: format!( "ordered-broadcast-seq/{}/", sequencer.public_key() ), journal_compression: Some(3), journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE), }, ); let ((a1, a2), (b1, b2)) = registrations.remove(&sequencer.public_key()).unwrap(); engine.start((a1, a2), (b1, b2)); } // Await reporters await_reporters( context.with_label("reporter"), vec![sequencer.public_key()], &reporters, (100, 111, true), ) .await; }); } #[test_traced] fn test_external_sequencer() { external_sequencer::(); external_sequencer::(); } fn run_1k() { let num_validators: u32 = 10; let quorum: u32 = 3; let cfg = deterministic::Config::new(); let runner = deterministic::Runner::new(cfg); runner.start(|mut context| async move { let (polynomial, mut shares_vec) = ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum); shares_vec.sort_by(|a, b| a.index.cmp(&b.index)); let (oracle, validators, pks, mut registrations) = initialize_simulation( context.with_label("simulation"), num_validators, &mut shares_vec, ) .await; let delayed_link = Link { latency: Duration::from_millis(80), jitter: Duration::from_millis(10), success_rate: 0.98, }; let mut oracle_clone = oracle.clone(); link_participants(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await; let automatons = Arc::new(Mutex::new( BTreeMap::>::new(), )); let mut reporters = BTreeMap::>::new(); let sequencers = &pks[0..pks.len() / 2]; spawn_validator_engines::( context.with_label("validator"), polynomial.clone(), sequencers, &pks, &validators, &mut registrations, &mut automatons.lock().unwrap(), &mut reporters, Duration::from_millis(150), |_| false, None, ); await_reporters( context.with_label("reporter"), sequencers.to_vec(), &reporters, (1_000, 111, false), ) .await; }) } #[test_traced] #[ignore] fn test_1k_min_pk() { run_1k::(); } #[test_traced] #[ignore] fn test_1k_min_sig() { run_1k::(); } }