use crate::{ simplex::types::{ Activity, Attributable, ConflictingFinalize, ConflictingNotarize, Finalization, Finalize, Notarization, Notarize, Nullification, Nullify, NullifyFinalize, View, }, Monitor, Reporter, Supervisor as Su, Viewable, }; use commonware_codec::{Decode, DecodeExt, Encode}; use commonware_cryptography::{Digest, PublicKey}; use futures::channel::mpsc::{Receiver, Sender}; use std::{ collections::{BTreeMap, HashMap, HashSet}, sync::{Arc, Mutex}, }; pub struct Config { pub namespace: Vec, pub participants: BTreeMap>, } type Participation = HashMap>>; type Faults = HashMap>>>; type Participants

= BTreeMap, Vec

)>; #[derive(Clone)] pub struct Supervisor { participants: Participants, namespace: Vec, pub leaders: Arc>>, pub notarizes: Arc>>, #[allow(clippy::type_complexity)] pub notarizations: Arc>>>, pub nullifies: Arc>>>, pub nullifications: Arc>>>, #[allow(clippy::type_complexity)] pub finalizes: Arc>>, #[allow(clippy::type_complexity)] pub finalizations: Arc>>>, #[allow(clippy::type_complexity)] pub faults: Arc>>, latest: Arc>, subscribers: Arc>>>, } impl Supervisor { pub fn new(cfg: Config) -> Self { let mut parsed_participants = BTreeMap::new(); for (view, mut validators) in cfg.participants.into_iter() { let mut map = HashMap::new(); for (index, validator) in validators.iter().enumerate() { map.insert(validator.clone(), index as u32); } validators.sort(); parsed_participants.insert(view, (map, validators)); } Self { leaders: Arc::new(Mutex::new(HashMap::new())), participants: parsed_participants, namespace: cfg.namespace, notarizes: Arc::new(Mutex::new(HashMap::new())), notarizations: Arc::new(Mutex::new(HashMap::new())), nullifies: Arc::new(Mutex::new(HashMap::new())), nullifications: Arc::new(Mutex::new(HashMap::new())), finalizes: Arc::new(Mutex::new(HashMap::new())), finalizations: Arc::new(Mutex::new(HashMap::new())), faults: Arc::new(Mutex::new(HashMap::new())), latest: Arc::new(Mutex::new(0)), subscribers: Arc::new(Mutex::new(Vec::new())), } } } impl Su for Supervisor { type Index = View; type PublicKey = C; fn leader(&self, index: Self::Index) -> Option { let closest = match self.participants.range(..=index).next_back() { Some((_, p)) => p, None => { panic!("no participants in required range"); } }; let leader = closest.1[index as usize % closest.1.len()].clone(); self.leaders .lock() .unwrap() .entry(index) .or_insert(leader.clone()); Some(leader) } fn participants(&self, index: Self::Index) -> Option<&Vec> { let closest = match self.participants.range(..=index).next_back() { Some((_, p)) => p, None => { panic!("no participants in required range"); } }; Some(&closest.1) } fn is_participant(&self, index: Self::Index, candidate: &Self::PublicKey) -> Option { let closest = match self.participants.range(..=index).next_back() { Some((_, p)) => p, None => { panic!("no participants in required range"); } }; closest.0.get(candidate).cloned() } } impl Reporter for Supervisor { type Activity = Activity; async fn report(&mut self, activity: Activity) { // We check signatures for all messages to ensure that the prover is working correctly // but in production this isn't necessary (as signatures are already verified in // consensus). match activity { Activity::Notarize(notarize) => { let view = notarize.view(); let participants = self.participants(view).unwrap(); let public_key = participants[notarize.signer() as usize].clone(); if !notarize.verify(&self.namespace, &public_key) { panic!("signature verification failed"); } let encoded = notarize.encode(); Notarize::::decode(encoded).unwrap(); self.notarizes .lock() .unwrap() .entry(view) .or_default() .entry(notarize.proposal.payload) .or_default() .insert(public_key); } Activity::Notarization(notarization) => { let view = notarization.view(); let participants = self.participants(view).unwrap(); if !notarization.verify(&self.namespace, participants) { panic!("signature verification failed"); } let encoded = notarization.encode(); Notarization::::decode_cfg(encoded, &participants.len()).unwrap(); let mut notarizes = self.notarizes.lock().unwrap(); let notarizes = notarizes .entry(view) .or_default() .entry(notarization.proposal.payload) .or_default(); for signature in ¬arization.signatures { let public_key_index = signature.signer() as usize; let public_key = participants[public_key_index].clone(); notarizes.insert(public_key); } self.notarizations .lock() .unwrap() .insert(view, notarization); } Activity::Nullify(nullify) => { let view = nullify.view(); let participants = self.participants(view).unwrap(); let public_key = participants[nullify.signer() as usize].clone(); if !nullify.verify(&self.namespace, &public_key) { panic!("signature verification failed"); } let encoded = nullify.encode(); Nullify::::decode(encoded).unwrap(); self.nullifies .lock() .unwrap() .entry(view) .or_default() .insert(public_key); } Activity::Nullification(nullification) => { let view = nullification.view(); let participants = self.participants(view).unwrap(); if !nullification.verify(&self.namespace, participants) { panic!("signature verification failed"); } let encoded = nullification.encode(); Nullification::::decode_cfg(encoded, &participants.len()).unwrap(); let mut nullifies = self.nullifies.lock().unwrap(); let nullifies = nullifies.entry(view).or_default(); for signature in &nullification.signatures { let public_key_index = signature.signer() as usize; let public_key = participants[public_key_index].clone(); nullifies.insert(public_key); } self.nullifications .lock() .unwrap() .insert(view, nullification); } Activity::Finalize(finalize) => { let view = finalize.view(); let participants = self.participants(view).unwrap(); let public_key = participants[finalize.signer() as usize].clone(); if !finalize.verify(&self.namespace, &public_key) { panic!("signature verification failed"); } let encoded = finalize.encode(); Finalize::::decode(encoded).unwrap(); self.finalizes .lock() .unwrap() .entry(view) .or_default() .entry(finalize.proposal.payload) .or_default() .insert(public_key); } Activity::Finalization(finalization) => { let view = finalization.view(); let participants = self.participants(view).unwrap(); if !finalization.verify(&self.namespace, participants) { panic!("signature verification failed"); } let encoded = finalization.encode(); Finalization::::decode_cfg(encoded, &participants.len()).unwrap(); let mut finalizes = self.finalizes.lock().unwrap(); let finalizes = finalizes .entry(view) .or_default() .entry(finalization.proposal.payload) .or_default(); for signature in &finalization.signatures { let public_key_index = signature.signer() as usize; let public_key = participants[public_key_index].clone(); finalizes.insert(public_key); } self.finalizations .lock() .unwrap() .insert(view, finalization); // Send message to subscribers *self.latest.lock().unwrap() = view; let mut subscribers = self.subscribers.lock().unwrap(); for subscriber in subscribers.iter_mut() { let _ = subscriber.try_send(view); } } Activity::ConflictingNotarize(ref conflicting) => { let view = conflicting.view(); let participants = self.participants(view).unwrap(); let public_key = participants[conflicting.signer() as usize].clone(); if !conflicting.verify(&self.namespace, &public_key) { panic!("signature verification failed"); } let encoded = conflicting.encode(); ConflictingNotarize::::decode(encoded).unwrap(); self.faults .lock() .unwrap() .entry(public_key) .or_default() .entry(view) .or_default() .insert(activity); } Activity::ConflictingFinalize(ref conflicting) => { let view = conflicting.view(); let participants = self.participants(view).unwrap(); let public_key = participants[conflicting.signer() as usize].clone(); if !conflicting.verify(&self.namespace, &public_key) { panic!("signature verification failed"); } let encoded = conflicting.encode(); ConflictingFinalize::::decode(encoded).unwrap(); self.faults .lock() .unwrap() .entry(public_key) .or_default() .entry(view) .or_default() .insert(activity); } Activity::NullifyFinalize(ref conflicting) => { let view = conflicting.view(); let participants = self.participants(view).unwrap(); let public_key = participants[conflicting.signer() as usize].clone(); if !conflicting.verify(&self.namespace, &public_key) { panic!("signature verification failed"); } let encoded = conflicting.encode(); NullifyFinalize::::decode(encoded).unwrap(); self.faults .lock() .unwrap() .entry(public_key) .or_default() .entry(view) .or_default() .insert(activity); } } } } impl Monitor for Supervisor { type Index = View; async fn subscribe(&mut self) -> (Self::Index, Receiver) { let (sender, receiver) = futures::channel::mpsc::channel(128); self.subscribers.lock().unwrap().push(sender); let latest = *self.latest.lock().unwrap(); (latest, receiver) } }