//! Recover quorum certificates over an externally synchronized sequencer of items. //! //! This module allows a dynamic set of participants to collectively produce quorum certificates //! for any ordered sequence of items. //! //! The primary use case for this primitive is to allow blockchain validators to agree on a series //! of state roots emitted from an opaque consensus process. Because some chains may finalize transaction //! data but not the output of said transactions during consensus, agreement must be achieved asynchronously //! over the output of consensus to support state sync and client balance proofs. //! //! _For applications that want to collect quorum certificates over concurrent, sequencer-driven broadcast, //! check out [crate::ordered_broadcast]._ //! //! # Pluggable Cryptography //! //! The aggregation module is generic over the signing scheme, allowing users to choose the //! cryptographic scheme best suited for their requirements: //! //! - [`ed25519`][scheme::ed25519]: Attributable signatures with individual verification. //! HSM-friendly, no trusted setup required. Certificates contain individual signatures. //! //! - [`bls12381_multisig`][scheme::bls12381_multisig]: Attributable signatures with aggregated //! verification. Produces compact certificates while preserving signer attribution. //! //! - [`bls12381_threshold`][scheme::bls12381_threshold]: Non-attributable threshold signatures. //! Produces succinct constant-size certificates. Requires trusted setup (DKG). //! //! # Architecture //! //! The core of the module is the [Engine]. It manages the agreement process by: //! - Requesting externally synchronized [commonware_cryptography::Digest]s //! - Signing said digests with the configured scheme's signature type //! - Multicasting signatures/shares to other validators //! - Assembling certificates from a quorum of signatures //! - Monitoring recovery progress and notifying the application layer of recoveries //! //! The engine interacts with four main components: //! - [crate::Automaton]: Provides external digests //! - [crate::Reporter]: Receives agreement confirmations //! - [crate::Monitor]: Tracks epoch transitions //! - [commonware_cryptography::certificate::Provider]: Manages validator sets and network identities //! //! # Design Decisions //! //! ## Missing Certificate Resolution //! //! The engine does not try to "fill gaps" when certificates are missing. When validators //! fall behind or miss signatures for certain indices, the tip may skip ahead and those //! certificates may never be emitted by the local engine. Before skipping ahead, we ensure that //! at-least-one honest validator has the certificate for any skipped index. //! //! Like other consensus primitives, aggregation's design prioritizes doing useful work at tip and //! minimal complexity over providing a comprehensive recovery mechanism. As a result, applications that need //! to build a complete history of all formed [types::Certificate]s must implement their own mechanism to synchronize //! historical results. //! //! ## Recovering Certificates //! //! In aggregation, participants never gossip recovered certificates. Rather, they gossip [types::TipAck]s //! with signatures over some index and their latest tip. This approach reduces the overhead of running aggregation //! concurrently with a consensus mechanism and consistently results in local recovery on stable networks. To increase //! the likelihood of local recovery, participants should tune the [Config::activity_timeout] to a value larger than the expected //! drift of online participants (even if all participants are synchronous the tip advancement logic will advance to the `f+1`th highest //! reported tip and drop all work below that tip minus the [Config::activity_timeout]). pub mod scheme; pub mod types; cfg_if::cfg_if! { if #[cfg(not(target_arch = "wasm32"))] { mod config; pub use config::Config; mod engine; pub use engine::Engine; mod metrics; mod safe_tip; #[cfg(test)] pub mod mocks; } } #[cfg(test)] mod tests { use super::{mocks, Config, Engine}; use crate::{ aggregation::scheme::{bls12381_multisig, bls12381_threshold, ed25519, Scheme}, types::{Epoch, EpochDelta}, }; use commonware_cryptography::{ bls12381::primitives::variant::{MinPk, MinSig}, certificate::mocks::Fixture, ed25519::PublicKey, sha256::Digest as Sha256Digest, }; use commonware_macros::{select, test_group, test_traced}; use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender}; use commonware_runtime::{ buffer::PoolRef, deterministic::{self, Context}, Clock, Metrics, Quota, Runner, Spawner, }; use commonware_utils::{NZUsize, NonZeroDuration}; use futures::{channel::oneshot, future::join_all}; use rand::{rngs::StdRng, Rng, SeedableRng}; use std::{ collections::BTreeMap, num::{NonZeroU32, NonZeroUsize}, time::Duration, }; use tracing::debug; type Registrations
= BTreeMap
, Receiver
)>;
const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
/// Reliable network link configuration for testing.
const RELIABLE_LINK: Link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
/// Register all participants with the network oracle.
async fn register_participants(
oracle: &mut Oracle,
link: Link,
) -> (
Oracle,
registrations: &mut Registrations(fixture: F)
where
S: Scheme,
{
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let num_validators = 4;
let fixture = fixture(&mut context, num_validators);
let namespace = b"my testing namespace";
let epoch = Epoch::new(111);
let (mut oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
let reporters = spawn_validator_engines(
context.with_label("validator"),
&fixture,
&mut registrations,
&mut oracle,
namespace,
epoch,
Duration::from_secs(5),
vec![],
);
await_reporters(context.with_label("reporter"), &reporters, 100, epoch).await;
});
}
#[test_traced("INFO")]
fn test_all_online() {
all_online(bls12381_threshold::fixture::(fixture: F)
where
S: Scheme,
{
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let num_validators = 4;
let fixture = fixture(&mut context, num_validators);
let namespace = b"my testing namespace";
let epoch = Epoch::new(111);
let (mut oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
let reporters = spawn_validator_engines(
context.with_label("validator"),
&fixture,
&mut registrations,
&mut oracle,
namespace,
epoch,
Duration::from_secs(5),
vec![0],
);
await_reporters(context.with_label("reporter"), &reporters, 100, epoch).await;
});
}
#[test_traced("INFO")]
fn test_byzantine_proposer() {
byzantine_proposer(bls12381_threshold::fixture::(fixture: F)
where
S: Scheme,
{
// Test parameters
let num_validators = 4;
let target_index = 200; // Target multiple rounds of signing
let min_shutdowns = 4; // Minimum number of shutdowns per validator
let max_shutdowns = 10; // Maximum number of shutdowns per validator
let shutdown_range_min = Duration::from_millis(100);
let shutdown_range_max = Duration::from_millis(1_000);
let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
let mut prev_checkpoint = None;
// Generate fixture once (persists across restarts)
let mut rng = StdRng::seed_from_u64(0);
let fixture = fixture(&mut rng, num_validators);
// Continue until shared reporter reaches target or max shutdowns exceeded
let mut shutdown_count = 0;
while shutdown_count < max_shutdowns {
let fixture = fixture.clone();
let f = move |mut context: Context| {
async move {
let namespace = b"my testing namespace";
let epoch = Epoch::new(111);
let (oracle, mut registrations) = initialize_simulation(
context.with_label("simulation"),
&fixture,
RELIABLE_LINK,
)
.await;
// Create a shared reporter
//
// We rely on replay to populate this reporter with a contiguous history of certificates.
let (reporter, mut reporter_mailbox) =
mocks::Reporter::new(context.clone(), namespace, fixture.verifier.clone());
context.with_label("reporter").spawn(|_| reporter.run());
// Spawn validator engines
for (idx, participant) in fixture.participants.iter().enumerate() {
let validator_context =
context.with_label(&format!("participant_{participant}"));
// Create Provider and register scheme for epoch
let provider = mocks::Provider::new();
assert!(provider.register(epoch, fixture.schemes[idx].clone()));
// Create monitor
let monitor = mocks::Monitor::new(epoch);
// Create automaton (validator 0 is Byzantine)
let strategy = if idx == 0 {
mocks::Strategy::Incorrect
} else {
mocks::Strategy::Correct
};
let automaton = mocks::Application::new(strategy);
// Create blocker
let blocker = oracle.control(participant.clone());
// Create and start engine
let engine = Engine::new(
validator_context.with_label("engine"),
Config {
monitor,
provider,
automaton,
reporter: reporter_mailbox.clone(),
blocker,
namespace: namespace.to_vec(),
priority_acks: false,
rebroadcast_timeout,
epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
window: std::num::NonZeroU64::new(10).unwrap(),
activity_timeout: 1_024, // ensure we don't drop any certificates
journal_partition: format!("unclean_shutdown_test_{participant}"),
journal_write_buffer: NZUsize!(4096),
journal_replay_buffer: NZUsize!(4096),
journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
journal_compression: Some(3),
journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
},
);
let (sender, receiver) = registrations.remove(participant).unwrap();
engine.start((sender, receiver));
}
// Create a single completion watcher for the shared reporter
let completion =
context
.with_label("completion_watcher")
.spawn(move |context| async move {
loop {
if let Some(tip_index) =
reporter_mailbox.get_contiguous_tip().await
{
if tip_index >= target_index {
break;
}
}
context.sleep(Duration::from_millis(50)).await;
}
});
// Random shutdown timing to simulate unclean shutdown
let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
select! {
_ = context.sleep(shutdown_wait) => {
debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
false // Unclean shutdown
},
_ = completion => {
debug!("Shared reporter completed normally");
true // Clean completion
},
}
}
};
let (complete, checkpoint) = prev_checkpoint
.map_or_else(
|| {
debug!("Starting initial run");
deterministic::Runner::timed(Duration::from_secs(45))
},
|prev_checkpoint| {
debug!(shutdown_count, "Restarting from previous context");
deterministic::Runner::from(prev_checkpoint)
},
)
.start_and_recover(f);
if complete && shutdown_count >= min_shutdowns {
debug!("Test completed successfully");
break;
}
prev_checkpoint = Some(checkpoint);
shutdown_count += 1;
}
}
#[test_traced("INFO")]
fn test_unclean_byzantine_shutdown() {
unclean_byzantine_shutdown(bls12381_threshold::fixture::(fixture: F)
where
S: Scheme,
{
// Test parameters
let num_validators = 4;
let skip_index = 50; // Index where no one will sign
let window = 10;
let target_index = 100;
let namespace = b"my testing namespace";
// Generate fixture once (persists across restarts)
let mut rng = StdRng::seed_from_u64(0);
let fixture = fixture(&mut rng, num_validators);
// First run: let validators skip signing at skip_index and reach beyond it
let f = |context: Context| {
let fixture = fixture.clone();
async move {
let epoch = Epoch::new(111);
// Set up simulated network
let (oracle, mut registrations) = initialize_simulation(
context.with_label("simulation"),
&fixture,
RELIABLE_LINK,
)
.await;
// Create a shared reporter
let (reporter, mut reporter_mailbox) =
mocks::Reporter::new(context.clone(), namespace, fixture.verifier.clone());
context.with_label("reporter").spawn(|_| reporter.run());
// Start validator engines with Skip strategy for skip_index
for (idx, participant) in fixture.participants.iter().enumerate() {
let validator_context =
context.with_label(&format!("participant_{participant}"));
// Create Provider and register scheme for epoch
let provider = mocks::Provider::new();
assert!(provider.register(epoch, fixture.schemes[idx].clone()));
// Create monitor
let monitor = mocks::Monitor::new(epoch);
// All validators use Skip strategy for skip_index
let automaton =
mocks::Application::new(mocks::Strategy::Skip { index: skip_index });
// Create blocker
let blocker = oracle.control(participant.clone());
// Create and start engine
let engine = Engine::new(
validator_context.with_label("engine"),
Config {
monitor,
provider,
automaton,
reporter: reporter_mailbox.clone(),
blocker,
namespace: namespace.to_vec(),
priority_acks: false,
rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
100,
)),
epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
window: std::num::NonZeroU64::new(window).unwrap(),
activity_timeout: 100,
journal_partition: format!("unsigned_index_test_{participant}"),
journal_write_buffer: NZUsize!(4096),
journal_replay_buffer: NZUsize!(4096),
journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
journal_compression: Some(3),
journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
},
);
let (sender, receiver) = registrations.remove(participant).unwrap();
engine.start((sender, receiver));
}
// Wait for validators to reach target_index (past skip_index)
loop {
if let Some((tip_index, _)) = reporter_mailbox.get_tip().await {
debug!(tip_index, skip_index, target_index, "reporter status");
if tip_index >= skip_index + window - 1 {
// max we can proceed before item confirmed
return;
}
}
context.sleep(Duration::from_millis(50)).await;
}
}
};
let (_, checkpoint) =
deterministic::Runner::timed(Duration::from_secs(60)).start_and_recover(f);
// Second run: restart and verify the skip_index gets confirmed
let f2 = |context: Context| {
async move {
let epoch = Epoch::new(111);
// Set up simulated network
let (oracle, mut registrations) = initialize_simulation(
context.with_label("simulation"),
&fixture,
RELIABLE_LINK,
)
.await;
// Create a shared reporter
let (reporter, mut reporter_mailbox) =
mocks::Reporter::new(context.clone(), namespace, fixture.verifier.clone());
context.with_label("reporter").spawn(|_| reporter.run());
// Start validator engines with Correct strategy (will sign everything now)
for (idx, participant) in fixture.participants.iter().enumerate() {
let validator_context =
context.with_label(&format!("participant_{participant}"));
// Create Provider and register scheme for epoch
let provider = mocks::Provider::new();
assert!(provider.register(epoch, fixture.schemes[idx].clone()));
// Create monitor
let monitor = mocks::Monitor::new(epoch);
// Now all validators use Correct strategy
let automaton = mocks::Application::new(mocks::Strategy::Correct);
// Create blocker
let blocker = oracle.control(participant.clone());
// Create and start engine
let engine = Engine::new(
validator_context.with_label("engine"),
Config {
monitor,
provider,
automaton,
reporter: reporter_mailbox.clone(),
blocker,
namespace: namespace.to_vec(),
priority_acks: false,
rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
100,
)),
epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
window: std::num::NonZeroU64::new(10).unwrap(),
activity_timeout: 100,
journal_partition: format!("unsigned_index_test_{participant}"),
journal_write_buffer: NZUsize!(4096),
journal_replay_buffer: NZUsize!(4096),
journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
journal_compression: Some(3),
journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
},
);
let (sender, receiver) = registrations.remove(participant).unwrap();
engine.start((sender, receiver));
}
// Wait for skip_index to be confirmed (should happen on replay)
loop {
if let Some(tip_index) = reporter_mailbox.get_contiguous_tip().await {
debug!(
tip_index,
skip_index, target_index, "reporter status on restart"
);
if tip_index >= target_index {
break;
}
}
context.sleep(Duration::from_millis(50)).await;
}
}
};
deterministic::Runner::from(checkpoint).start(f2);
}
#[test_traced("INFO")]
fn test_unclean_shutdown_with_unsigned_index() {
unclean_shutdown_with_unsigned_index(bls12381_threshold::fixture::(fixture: F, seed: u64) -> String
where
S: Scheme,
{
let cfg = deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(120)));
let runner = deterministic::Runner::new(cfg);
runner.start(|mut context| async move {
let num_validators = 4;
let fixture = fixture(&mut context, num_validators);
let namespace = b"my testing namespace";
let epoch = Epoch::new(111);
// Use degraded network links with realistic conditions
let degraded_link = Link {
latency: Duration::from_millis(200),
jitter: Duration::from_millis(150),
success_rate: 0.5,
};
let (mut oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, degraded_link)
.await;
let reporters = spawn_validator_engines(
context.with_label("validator"),
&fixture,
&mut registrations,
&mut oracle,
namespace,
epoch,
Duration::from_secs(2),
vec![],
);
await_reporters(context.with_label("reporter"), &reporters, 100, epoch).await;
context.auditor().state()
})
}
#[test_traced("INFO")]
fn test_slow_and_lossy_links() {
slow_and_lossy_links(bls12381_threshold::fixture::(fixture: F)
where
S: Scheme,
{
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let num_validators = 5;
let mut fixture = fixture(&mut context, num_validators);
let namespace = b"my testing namespace";
let epoch = Epoch::new(111);
// Truncate to only 4 validators (one offline)
fixture.participants.truncate(4);
fixture.schemes.truncate(4);
let (mut oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
let reporters = spawn_validator_engines(
context.with_label("validator"),
&fixture,
&mut registrations,
&mut oracle,
namespace,
epoch,
Duration::from_secs(5),
vec![],
);
await_reporters(context.with_label("reporter"), &reporters, 100, epoch).await;
});
}
#[test_traced("INFO")]
fn test_one_offline() {
one_offline(bls12381_threshold::fixture::(fixture: F)
where
S: Scheme,
{
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let num_validators = 4;
let fixture = fixture(&mut context, num_validators);
let namespace = b"my testing namespace";
let epoch = Epoch::new(111);
let (mut oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
let reporters = spawn_validator_engines(
context.with_label("validator"),
&fixture,
&mut registrations,
&mut oracle,
namespace,
epoch,
Duration::from_secs(5),
vec![],
);
// Partition network (remove all links)
for v1 in fixture.participants.iter() {
for v2 in fixture.participants.iter() {
if v2 == v1 {
continue;
}
oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
}
}
context.sleep(Duration::from_secs(20)).await;
// Restore network links
for v1 in fixture.participants.iter() {
for v2 in fixture.participants.iter() {
if v2 == v1 {
continue;
}
oracle
.add_link(v1.clone(), v2.clone(), RELIABLE_LINK)
.await
.unwrap();
}
}
await_reporters(context.with_label("reporter"), &reporters, 100, epoch).await;
});
}
#[test_traced("INFO")]
fn test_network_partition() {
network_partition(bls12381_threshold::fixture::(fixture: F)
where
S: Scheme,
{
let runner = deterministic::Runner::timed(Duration::from_secs(15));
runner.start(|mut context| async move {
let num_validators = 5;
let fixture = fixture(&mut context, num_validators);
let namespace = b"my testing namespace";
let epoch = Epoch::new(111);
// Set up simulated network
let (oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
// Create reporters (one per online validator)
let mut reporters =
BTreeMap::