//! Mock `Reporter` for tests: tracks participants/leaders, verifies activities, //! records votes/faults, and exposes a simple subscription. use crate::{ simplex::{ elector::{Config as ElectorConfig, Elector}, scheme, types::{ Activity, Attributable, ConflictingFinalize, ConflictingNotarize, Finalization, Finalize, Notarization, Notarize, Nullification, Nullify, NullifyFinalize, Subject, }, }, types::{Round, View}, Monitor, Viewable, }; use commonware_codec::{Decode, DecodeExt, Encode}; use commonware_cryptography::{certificate::Scheme, Digest}; use commonware_utils::ordered::{Quorum, Set}; use futures::channel::mpsc::{Receiver, Sender}; use rand::{CryptoRng, Rng}; use std::{ collections::{HashMap, HashSet}, hash::Hash, sync::{Arc, Mutex}, }; // Records which validators have participated in a given view/payload pair. type Participation
= HashMap = HashMap<::PublicKey, HashMap,
D: Digest + Eq + Hash + Clone,
{
pub fn new(context: E, cfg: Config) -> Self {
// Build elector with participants
let elector = cfg.elector.build(&cfg.participants);
Self {
context,
namespace: cfg.namespace,
participants: cfg.participants,
scheme: cfg.scheme,
elector,
leaders: Arc::new(Mutex::new(HashMap::new())),
certified: Arc::new(Mutex::new(HashSet::new())),
notarizes: Arc::new(Mutex::new(HashMap::new())),
notarizations: Arc::new(Mutex::new(HashMap::new())),
nullifies: Arc::new(Mutex::new(HashMap::new())),
nullifications: Arc::new(Mutex::new(HashMap::new())),
finalizes: Arc::new(Mutex::new(HashMap::new())),
finalizations: Arc::new(Mutex::new(HashMap::new())),
faults: Arc::new(Mutex::new(HashMap::new())),
invalid: Arc::new(Mutex::new(0)),
latest: Arc::new(Mutex::new(View::zero())),
subscribers: Arc::new(Mutex::new(Vec::new())),
}
}
fn certified(&self, round: Round, certificate: &S::Certificate) {
// Record that this view has a certificate
self.certified.lock().unwrap().insert(round.view());
// We use the certificate from view N to determine the leader for view N+1.
let next_round = Round::new(round.epoch(), round.view().next());
let mut leaders = self.leaders.lock().unwrap();
leaders.entry(next_round.view()).or_insert_with(|| {
let leader = self.elector.elect(next_round, Some(certificate));
self.participants.key(leader).cloned().unwrap()
});
}
}
impl,
D: Digest + Eq + Hash + Clone,
{
type Activity = Activity;
async fn report(&mut self, activity: Self::Activity) {
// We check signatures for all messages to ensure that the prover is working correctly
// but in production this isn't necessary (as signatures are already verified in
// consensus).
let verified = activity.verified();
match &activity {
Activity::Notarize(notarize) => {
if !notarize.verify(&self.scheme, &self.namespace) {
assert!(!verified);
*self.invalid.lock().unwrap() += 1;
return;
}
let encoded = notarize.encode();
Notarize::::decode(encoded).unwrap();
let public_key = self.participants[notarize.signer() as usize].clone();
self.notarizes
.lock()
.unwrap()
.entry(notarize.view())
.or_default()
.entry(notarize.proposal.payload)
.or_default()
.insert(public_key);
}
Activity::Notarization(notarization) | Activity::Certification(notarization) => {
// Verify notarization
let view = notarization.view();
if !self.scheme.verify_certificate::<_, D>(
&mut self.context,
&self.namespace,
Subject::Notarize {
proposal: ¬arization.proposal,
},
¬arization.certificate,
) {
assert!(!verified);
*self.invalid.lock().unwrap() += 1;
return;
}
let encoded = notarization.encode();
Notarization::::decode_cfg(encoded, &self.scheme.certificate_codec_config())
.unwrap();
self.notarizations
.lock()
.unwrap()
.insert(view, notarization.clone());
self.certified(notarization.round(), ¬arization.certificate);
}
Activity::Nullify(nullify) => {
if !nullify.verify(&self.scheme, &self.namespace) {
assert!(!verified);
*self.invalid.lock().unwrap() += 1;
return;
}
let encoded = nullify.encode();
Nullify::::decode(encoded).unwrap();
let public_key = self.participants[nullify.signer() as usize].clone();
self.nullifies
.lock()
.unwrap()
.entry(nullify.view())
.or_default()
.insert(public_key);
}
Activity::Nullification(nullification) => {
// Verify nullification
let view = nullification.view();
if !self.scheme.verify_certificate::<_, D>(
&mut self.context,
&self.namespace,
Subject::Nullify {
round: nullification.round,
},
&nullification.certificate,
) {
assert!(!verified);
*self.invalid.lock().unwrap() += 1;
return;
}
let encoded = nullification.encode();
Nullification::::decode_cfg(encoded, &self.scheme.certificate_codec_config())
.unwrap();
self.nullifications
.lock()
.unwrap()
.insert(view, nullification.clone());
self.certified(nullification.round, &nullification.certificate);
}
Activity::Finalize(finalize) => {
if !finalize.verify(&self.scheme, &self.namespace) {
assert!(!verified);
*self.invalid.lock().unwrap() += 1;
return;
}
let encoded = finalize.encode();
Finalize::::decode(encoded).unwrap();
let public_key = self.participants[finalize.signer() as usize].clone();
self.finalizes
.lock()
.unwrap()
.entry(finalize.view())
.or_default()
.entry(finalize.proposal.payload)
.or_default()
.insert(public_key);
}
Activity::Finalization(finalization) => {
// Verify finalization
let view = finalization.view();
if !self.scheme.verify_certificate::<_, D>(
&mut self.context,
&self.namespace,
Subject::Finalize {
proposal: &finalization.proposal,
},
&finalization.certificate,
) {
assert!(!verified);
*self.invalid.lock().unwrap() += 1;
return;
}
let encoded = finalization.encode();
Finalization::::decode_cfg(encoded, &self.scheme.certificate_codec_config())
.unwrap();
self.finalizations
.lock()
.unwrap()
.insert(view, finalization.clone());
self.certified(finalization.round(), &finalization.certificate);
// Send message to subscribers
*self.latest.lock().unwrap() = finalization.view();
let mut subscribers = self.subscribers.lock().unwrap();
for subscriber in subscribers.iter_mut() {
let _ = subscriber.try_send(finalization.view());
}
}
Activity::ConflictingNotarize(conflicting) => {
let view = conflicting.view();
if !conflicting.verify(&self.scheme, &self.namespace) {
assert!(!verified);
*self.invalid.lock().unwrap() += 1;
return;
}
let encoded = conflicting.encode();
ConflictingNotarize::::decode(encoded).unwrap();
let public_key = self.participants[conflicting.signer() as usize].clone();
self.faults
.lock()
.unwrap()
.entry(public_key)
.or_default()
.entry(view)
.or_default()
.insert(activity);
}
Activity::ConflictingFinalize(conflicting) => {
let view = conflicting.view();
if !conflicting.verify(&self.scheme, &self.namespace) {
assert!(!verified);
*self.invalid.lock().unwrap() += 1;
return;
}
let encoded = conflicting.encode();
ConflictingFinalize::::decode(encoded).unwrap();
let public_key = self.participants[conflicting.signer() as usize].clone();
self.faults
.lock()
.unwrap()
.entry(public_key)
.or_default()
.entry(view)
.or_default()
.insert(activity);
}
Activity::NullifyFinalize(conflicting) => {
let view = conflicting.view();
if !conflicting.verify(&self.scheme, &self.namespace) {
assert!(!verified);
*self.invalid.lock().unwrap() += 1;
return;
}
let encoded = conflicting.encode();
NullifyFinalize::::decode(encoded).unwrap();
let public_key = self.participants[conflicting.signer() as usize].clone();
self.faults
.lock()
.unwrap()
.entry(public_key)
.or_default()
.entry(view)
.or_default()
.insert(activity);
}
}
}
}
impl,
D: Digest + Eq + Hash + Clone,
{
type Index = View;
async fn subscribe(&mut self) -> (Self::Index, Receiver