//! Ordered, reliable broadcast across reconfigurable participants. //! //! # Concepts //! //! The system has two types of network participants: `sequencers` and `validators`. Their sets may //! overlap and are defined by the current `epoch`, a monotonically increasing integer. This module //! can handle reconfiguration of these sets across different epochs. //! //! Sequencers broadcast data. The smallest unit of data is a `chunk`. Sequencers broadcast `node`s //! that contain a chunk and a certificate over the previous chunk, forming a linked chain //! of nodes from each sequencer. //! //! Validators verify and sign chunks. These signatures can be combined to form a quorum //! certificate, ensuring a quorum verifies each chunk. The certificate allows external parties //! to confirm that the chunk was reliably broadcast. //! //! Network participants persist any new nodes to a journal. This enables recovery from crashes and //! ensures that sequencers do not broadcast conflicting chunks and that validators do not sign //! them. "Conflicting" chunks are chunks from the same sequencer at the same height with different //! payloads. //! //! # Pluggable Cryptography //! //! The ordered broadcast module is generic over the signing scheme, allowing users to choose the //! cryptographic scheme best suited for their requirements: //! //! - [`ed25519`][scheme::ed25519]: Attributable signatures with individual verification. //! HSM-friendly, no trusted setup required. Certificates contain individual signatures. //! //! - [`bls12381_multisig`][scheme::bls12381_multisig]: Attributable signatures with aggregated //! verification. Produces compact certificates while preserving signer attribution. //! //! - [`bls12381_threshold`][scheme::bls12381_threshold]: Non-attributable threshold signatures. //! Produces succinct constant-size certificates. Requires trusted setup (DKG). //! //! # Design //! //! The core of the module is the [Engine]. It is responsible for: //! - Broadcasting nodes (if a sequencer) //! - Signing chunks (if a validator) //! - Tracking the latest chunk in each sequencer's chain //! - Assembling certificates from a quorum of signatures //! - Notifying other actors of new chunks and certificates //! //! # Acknowledgements //! //! [Autobahn](https://arxiv.org/abs/2401.10369) provided the insight that a succinct //! proof-of-availability could be produced by linking sequencer broadcasts. pub mod scheme; pub mod types; cfg_if::cfg_if! { if #[cfg(not(target_arch = "wasm32"))] { mod ack_manager; use ack_manager::AckManager; mod config; pub use config::Config; mod engine; pub use engine::Engine; mod metrics; mod tip_manager; use tip_manager::TipManager; } } #[cfg(test)] pub mod mocks; #[cfg(test)] mod tests { use super::{mocks, Config, Engine}; use crate::{ ordered_broadcast::scheme::{bls12381_multisig, bls12381_threshold, ed25519, Scheme}, types::{Epoch, EpochDelta}, }; use commonware_cryptography::{ bls12381::primitives::variant::{MinPk, MinSig}, certificate::{self, mocks::Fixture}, ed25519::{PrivateKey, PublicKey}, sha256::Digest as Sha256Digest, Signer as _, }; use commonware_macros::{select, test_group, test_traced}; use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender}; use commonware_runtime::{ buffer::PoolRef, deterministic::{self, Context}, Clock, Metrics, Quota, Runner, Spawner, }; use commonware_utils::NZUsize; use futures::{channel::oneshot, future::join_all}; use std::{ collections::{BTreeMap, HashMap}, num::{NonZeroU32, NonZeroUsize}, time::Duration, }; use tracing::debug; const PAGE_SIZE: NonZeroUsize = NZUsize!(1024); const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10); const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX); type Registrations
= BTreeMap< P, ( (Sender
, Receiver
), (Sender
, Receiver
),
),
>;
async fn register_participants(
oracle: &mut Oracle,
link: Link,
) -> (
Oracle(
context: Context,
fixture: &Fixture,
sequencer_pks: &[PublicKey],
registrations: &mut Registrations(
context: Context,
sequencers: Vec(fixture: F)
where
S: Scheme,
{
let runner = deterministic::Runner::timed(Duration::from_secs(120));
runner.start(|mut context| async move {
let epoch = Epoch::new(111);
let num_validators = 4;
let fixture = fixture(&mut context, num_validators);
let (_oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
let reporters = spawn_validator_engines(
context.with_label("validator"),
&fixture,
&fixture.participants,
&mut registrations,
Duration::from_secs(5),
|_| false,
Some(5),
epoch,
);
await_reporters(
context.with_label("reporter"),
reporters.keys().cloned().collect::(fixture: F)
where
S: Scheme + Clone,
{
let mut prev_checkpoint = None;
let epoch = Epoch::new(111);
let num_validators = 4;
let crash_after = Duration::from_secs(5);
let target_height = 30;
loop {
let fixture = fixture.clone();
let f = |mut context: deterministic::Context| async move {
let fixture = fixture(&mut context, num_validators);
let (network, mut oracle) = Network::new(
context.with_label("network"),
commonware_p2p::simulated::Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: None,
},
);
network.start();
let mut registrations =
register_participants(&mut oracle, &fixture.participants).await;
link_participants(
&mut oracle,
&fixture.participants,
Action::Link(RELIABLE_LINK),
None,
)
.await;
let reporters = spawn_validator_engines(
context.with_label("validator"),
&fixture,
&fixture.participants,
&mut registrations,
Duration::from_secs(5),
|_| false,
None,
epoch,
);
// Either crash after `crash_after` or succeed once everyone reaches `target_height`.
let crash = context.sleep(crash_after);
let run = await_reporters(
context.with_label("reporter"),
reporters.keys().cloned().collect::(fixture: F)
where
S: Scheme,
{
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let epoch = Epoch::new(111);
let num_validators = 4;
let fixture = fixture(&mut context, num_validators);
// Configure the network
let (mut oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
let mut reporters = spawn_validator_engines(
context.with_label("validator"),
&fixture,
&fixture.participants,
&mut registrations,
Duration::from_secs(1),
|_| false,
None,
epoch,
);
// Simulate partition by removing all links.
link_participants(&mut oracle, &fixture.participants, Action::Unlink, None).await;
context.sleep(Duration::from_secs(30)).await;
// Get the maximum height from all reporters.
let max_height = get_max_height(&mut reporters).await;
// Heal the partition by re-adding links.
link_participants(
&mut oracle,
&fixture.participants,
Action::Link(RELIABLE_LINK),
None,
)
.await;
await_reporters(
context.with_label("reporter"),
reporters.keys().cloned().collect::(fixture: F, seed: u64) -> String
where
S: Scheme,
{
let cfg = deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(40)));
let runner = deterministic::Runner::new(cfg);
runner.start(|mut context| async move {
let epoch = Epoch::new(111);
let num_validators = 4;
let fixture = fixture(&mut context, num_validators);
let (mut oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
let delayed_link = Link {
latency: Duration::from_millis(50),
jitter: Duration::from_millis(40),
success_rate: 0.5,
};
link_participants(
&mut oracle,
&fixture.participants,
Action::Update(delayed_link),
None,
)
.await;
let reporters = spawn_validator_engines(
context.with_label("validator"),
&fixture,
&fixture.participants,
&mut registrations,
Duration::from_millis(150),
|_| false,
None,
epoch,
);
await_reporters(
context.with_label("reporter"),
reporters.keys().cloned().collect::(fixture: F)
where
S: Scheme,
{
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let epoch = Epoch::new(111);
let num_validators = 4;
let fixture = fixture(&mut context, num_validators);
let (_oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
let reporters = spawn_validator_engines(
context.with_label("validator"),
&fixture,
&fixture.participants,
&mut registrations,
Duration::from_secs(5),
|i| i % 10 == 0,
None,
epoch,
);
await_reporters(
context.with_label("reporter"),
reporters.keys().cloned().collect::(fixture: F)
where
S: Scheme,
{
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let epoch = Epoch::new(111);
let num_validators = 4;
let fixture = fixture(&mut context, num_validators);
// Setup network
let (mut oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
let mut reporters = BTreeMap::new();
// Create validators instances that we can update later for epoch changes
let mut validators_providers = HashMap::new();
let mut monitors = HashMap::new();
let namespace = b"my testing namespace";
for (idx, validator) in fixture.participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{validator}"));
let monitor = mocks::Monitor::new(epoch);
monitors.insert(validator.clone(), monitor.clone());
let sequencers = mocks::Sequencers::(fixture: F)
where
S: Scheme,
{
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let epoch = Epoch::new(111);
let num_validators = 4;
let fixture = fixture(&mut context, num_validators);
// Generate sequencer (external, not a validator)
let sequencer = PrivateKey::from_seed(u64::MAX);
// Generate network participants (validators + sequencer)
let mut participants = fixture.participants.clone();
participants.push(sequencer.public_key());
// Create network
let (network, mut oracle) = Network::new(
context.with_label("network"),
commonware_p2p::simulated::Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: None,
},
);
network.start();
// Register all participants
let mut registrations = register_participants(&mut oracle, &participants).await;
link_participants(
&mut oracle,
&participants,
Action::Link(RELIABLE_LINK),
None,
)
.await;
// Setup engines
let mut reporters = BTreeMap::new();
let namespace = b"my testing namespace";
// Spawn validator engines (no signing key, only validate)
for (idx, validator) in fixture.participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{validator}"));
let monitor = mocks::Monitor::new(epoch);
let sequencers = mocks::Sequencers::(fixture: F)
where
S: Scheme,
{
let cfg = deterministic::Config::new();
let runner = deterministic::Runner::new(cfg);
runner.start(|mut context| async move {
let epoch = Epoch::new(111);
let num_validators = 10;
let fixture = fixture(&mut context, num_validators);
let delayed_link = Link {
latency: Duration::from_millis(80),
jitter: Duration::from_millis(10),
success_rate: 0.98,
};
let (mut oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
// Update to delayed links
link_participants(
&mut oracle,
&fixture.participants,
Action::Update(delayed_link),
None,
)
.await;
// Use first half of validators as sequencers
let sequencers: Vec