//! Mock `Reporter` for tests: tracks participants/leaders, verifies activities, //! records votes/faults, and exposes a simple subscription. use crate::{ simplex::{ elector::{Config as ElectorConfig, Elector}, scheme, types::{ Activity, Attributable, ConflictingFinalize, ConflictingNotarize, Finalization, Finalize, Notarization, Notarize, Nullification, Nullify, NullifyFinalize, Subject, }, }, types::{Round, View}, Monitor, Viewable, }; use commonware_codec::{Decode, DecodeExt, Encode}; use commonware_cryptography::{certificate::Scheme, Digest}; use commonware_utils::ordered::{Quorum, Set}; use futures::channel::mpsc::{Receiver, Sender}; use rand::{CryptoRng, Rng}; use std::{ collections::{HashMap, HashSet}, hash::Hash, sync::{Arc, Mutex}, }; // Records which validators have participated in a given view/payload pair. type Participation = HashMap>>; type Faults = HashMap<::PublicKey, HashMap>>>; /// Reporter configuration used in tests. #[derive(Clone, Debug)] pub struct Config> { pub namespace: Vec, pub participants: Set, pub scheme: S, pub elector: L, } #[derive(Clone)] pub struct Reporter, D: Digest> { context: E, pub participants: Set, scheme: S, elector: L::Elector, namespace: Vec, pub leaders: Arc>>, pub certified: Arc>>, pub notarizes: Arc>>, pub notarizations: Arc>>>, pub nullifies: Arc>>>, pub nullifications: Arc>>>, pub finalizes: Arc>>, pub finalizations: Arc>>>, pub faults: Arc>>, pub invalid: Arc>, latest: Arc>, subscribers: Arc>>>, } impl Reporter where E: Rng + CryptoRng, S: Scheme, L: ElectorConfig, D: Digest + Eq + Hash + Clone, { pub fn new(context: E, cfg: Config) -> Self { // Build elector with participants let elector = cfg.elector.build(&cfg.participants); Self { context, namespace: cfg.namespace, participants: cfg.participants, scheme: cfg.scheme, elector, leaders: Arc::new(Mutex::new(HashMap::new())), certified: Arc::new(Mutex::new(HashSet::new())), notarizes: Arc::new(Mutex::new(HashMap::new())), notarizations: Arc::new(Mutex::new(HashMap::new())), nullifies: Arc::new(Mutex::new(HashMap::new())), nullifications: Arc::new(Mutex::new(HashMap::new())), finalizes: Arc::new(Mutex::new(HashMap::new())), finalizations: Arc::new(Mutex::new(HashMap::new())), faults: Arc::new(Mutex::new(HashMap::new())), invalid: Arc::new(Mutex::new(0)), latest: Arc::new(Mutex::new(View::zero())), subscribers: Arc::new(Mutex::new(Vec::new())), } } fn certified(&self, round: Round, certificate: &S::Certificate) { // Record that this view has a certificate self.certified.lock().unwrap().insert(round.view()); // We use the certificate from view N to determine the leader for view N+1. let next_round = Round::new(round.epoch(), round.view().next()); let mut leaders = self.leaders.lock().unwrap(); leaders.entry(next_round.view()).or_insert_with(|| { let leader = self.elector.elect(next_round, Some(certificate)); self.participants.key(leader).cloned().unwrap() }); } } impl crate::Reporter for Reporter where E: Clone + Rng + CryptoRng + Send + Sync + 'static, S: scheme::Scheme, L: ElectorConfig, D: Digest + Eq + Hash + Clone, { type Activity = Activity; async fn report(&mut self, activity: Self::Activity) { // We check signatures for all messages to ensure that the prover is working correctly // but in production this isn't necessary (as signatures are already verified in // consensus). let verified = activity.verified(); match &activity { Activity::Notarize(notarize) => { if !notarize.verify(&self.scheme, &self.namespace) { assert!(!verified); *self.invalid.lock().unwrap() += 1; return; } let encoded = notarize.encode(); Notarize::::decode(encoded).unwrap(); let public_key = self.participants[notarize.signer() as usize].clone(); self.notarizes .lock() .unwrap() .entry(notarize.view()) .or_default() .entry(notarize.proposal.payload) .or_default() .insert(public_key); } Activity::Notarization(notarization) | Activity::Certification(notarization) => { // Verify notarization let view = notarization.view(); if !self.scheme.verify_certificate::<_, D>( &mut self.context, &self.namespace, Subject::Notarize { proposal: ¬arization.proposal, }, ¬arization.certificate, ) { assert!(!verified); *self.invalid.lock().unwrap() += 1; return; } let encoded = notarization.encode(); Notarization::::decode_cfg(encoded, &self.scheme.certificate_codec_config()) .unwrap(); self.notarizations .lock() .unwrap() .insert(view, notarization.clone()); self.certified(notarization.round(), ¬arization.certificate); } Activity::Nullify(nullify) => { if !nullify.verify(&self.scheme, &self.namespace) { assert!(!verified); *self.invalid.lock().unwrap() += 1; return; } let encoded = nullify.encode(); Nullify::::decode(encoded).unwrap(); let public_key = self.participants[nullify.signer() as usize].clone(); self.nullifies .lock() .unwrap() .entry(nullify.view()) .or_default() .insert(public_key); } Activity::Nullification(nullification) => { // Verify nullification let view = nullification.view(); if !self.scheme.verify_certificate::<_, D>( &mut self.context, &self.namespace, Subject::Nullify { round: nullification.round, }, &nullification.certificate, ) { assert!(!verified); *self.invalid.lock().unwrap() += 1; return; } let encoded = nullification.encode(); Nullification::::decode_cfg(encoded, &self.scheme.certificate_codec_config()) .unwrap(); self.nullifications .lock() .unwrap() .insert(view, nullification.clone()); self.certified(nullification.round, &nullification.certificate); } Activity::Finalize(finalize) => { if !finalize.verify(&self.scheme, &self.namespace) { assert!(!verified); *self.invalid.lock().unwrap() += 1; return; } let encoded = finalize.encode(); Finalize::::decode(encoded).unwrap(); let public_key = self.participants[finalize.signer() as usize].clone(); self.finalizes .lock() .unwrap() .entry(finalize.view()) .or_default() .entry(finalize.proposal.payload) .or_default() .insert(public_key); } Activity::Finalization(finalization) => { // Verify finalization let view = finalization.view(); if !self.scheme.verify_certificate::<_, D>( &mut self.context, &self.namespace, Subject::Finalize { proposal: &finalization.proposal, }, &finalization.certificate, ) { assert!(!verified); *self.invalid.lock().unwrap() += 1; return; } let encoded = finalization.encode(); Finalization::::decode_cfg(encoded, &self.scheme.certificate_codec_config()) .unwrap(); self.finalizations .lock() .unwrap() .insert(view, finalization.clone()); self.certified(finalization.round(), &finalization.certificate); // Send message to subscribers *self.latest.lock().unwrap() = finalization.view(); let mut subscribers = self.subscribers.lock().unwrap(); for subscriber in subscribers.iter_mut() { let _ = subscriber.try_send(finalization.view()); } } Activity::ConflictingNotarize(conflicting) => { let view = conflicting.view(); if !conflicting.verify(&self.scheme, &self.namespace) { assert!(!verified); *self.invalid.lock().unwrap() += 1; return; } let encoded = conflicting.encode(); ConflictingNotarize::::decode(encoded).unwrap(); let public_key = self.participants[conflicting.signer() as usize].clone(); self.faults .lock() .unwrap() .entry(public_key) .or_default() .entry(view) .or_default() .insert(activity); } Activity::ConflictingFinalize(conflicting) => { let view = conflicting.view(); if !conflicting.verify(&self.scheme, &self.namespace) { assert!(!verified); *self.invalid.lock().unwrap() += 1; return; } let encoded = conflicting.encode(); ConflictingFinalize::::decode(encoded).unwrap(); let public_key = self.participants[conflicting.signer() as usize].clone(); self.faults .lock() .unwrap() .entry(public_key) .or_default() .entry(view) .or_default() .insert(activity); } Activity::NullifyFinalize(conflicting) => { let view = conflicting.view(); if !conflicting.verify(&self.scheme, &self.namespace) { assert!(!verified); *self.invalid.lock().unwrap() += 1; return; } let encoded = conflicting.encode(); NullifyFinalize::::decode(encoded).unwrap(); let public_key = self.participants[conflicting.signer() as usize].clone(); self.faults .lock() .unwrap() .entry(public_key) .or_default() .entry(view) .or_default() .insert(activity); } } } } impl Monitor for Reporter where E: Clone + Rng + CryptoRng + Send + Sync + 'static, S: Scheme, L: ElectorConfig, D: Digest + Eq + Hash + Clone, { type Index = View; async fn subscribe(&mut self) -> (Self::Index, Receiver) { let (tx, rx) = futures::channel::mpsc::channel(128); self.subscribers.lock().unwrap().push(tx); let latest = *self.latest.lock().unwrap(); (latest, rx) } }