//! Mock application used by `simplex` tests to produce and verify payloads, //! simulating proposal/verification latency and broadcasting via a mock relay. use super::relay::Relay; use crate::{ simplex::types::Context, types::{Epoch, Round}, Automaton as Au, CertifiableAutomaton as CAu, Relay as Re, }; use bytes::Bytes; use commonware_codec::{DecodeExt, Encode}; use commonware_cryptography::{Digest, Hasher, PublicKey}; use commonware_macros::select_loop; use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Spawner}; use futures::{ channel::{mpsc, oneshot}, SinkExt, StreamExt, }; use rand::{Rng, RngCore}; use rand_distr::{Distribution, Normal}; use std::{ collections::{HashMap, HashSet}, sync::Arc, time::Duration, }; use tracing::debug; pub enum Message { Genesis { epoch: Epoch, response: oneshot::Sender, }, Propose { context: Context, response: oneshot::Sender, }, Verify { context: Context, payload: D, response: oneshot::Sender, }, Certify { payload: D, response: oneshot::Sender, }, Broadcast { payload: D, }, } #[derive(Clone)] pub struct Mailbox { sender: mpsc::Sender>, } impl Mailbox { pub(super) const fn new(sender: mpsc::Sender>) -> Self { Self { sender } } } impl Au for Mailbox { type Digest = D; type Context = Context; async fn genesis(&mut self, epoch: Epoch) -> Self::Digest { let (response, receiver) = oneshot::channel(); self.sender .send(Message::Genesis { epoch, response }) .await .expect("Failed to send genesis"); receiver.await.expect("Failed to receive genesis") } async fn propose(&mut self, context: Self::Context) -> oneshot::Receiver { let (response, receiver) = oneshot::channel(); self.sender .send(Message::Propose { context, response }) .await .expect("Failed to send propose"); receiver } async fn verify( &mut self, context: Self::Context, payload: Self::Digest, ) -> oneshot::Receiver { let (response, receiver) = oneshot::channel(); self.sender .send(Message::Verify { context, payload, response, }) .await .expect("Failed to send verify"); receiver } } impl CAu for Mailbox { async fn certify(&mut self, payload: Self::Digest) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); self.sender .send(Message::Certify { payload, response: tx, }) .await .expect("Failed to send certify"); rx } } impl Re for Mailbox { type Digest = D; async fn broadcast(&mut self, payload: Self::Digest) { self.sender .send(Message::Broadcast { payload }) .await .expect("Failed to send broadcast"); } } const GENESIS_BYTES: &[u8] = b"genesis"; type Latency = (f64, f64); /// Predicate to determine whether a payload should be certified. /// Returning true means certify, false means reject. pub enum Certifier { /// Always certify. Always, /// Certify sometimes, but not always. The behavior is to certify pseudorandomly /// (but deterministically) 82% of the time, depending on the last byte of the payload. Sometimes, /// A custom predicate function. Custom(Box bool + Send + 'static>), } pub struct Config { pub hasher: H, pub relay: Arc>, /// The public key of the participant. /// /// It is common to use multiple instances of an application in a single simulation, this /// helps to identify the source of both progress and errors. pub me: P, pub propose_latency: Latency, pub verify_latency: Latency, pub certify_latency: Latency, /// Predicate to determine whether a payload should be certified. /// Returning true means certify, false means reject. pub should_certify: Certifier, } pub struct Application { context: ContextCell, hasher: H, me: P, relay: Arc>, broadcast: mpsc::UnboundedReceiver<(H::Digest, Bytes)>, mailbox: mpsc::Receiver>, propose_latency: Normal, verify_latency: Normal, certify_latency: Normal, fail_verification: bool, should_certify: Certifier, pending: HashMap, verified: HashSet, } impl Application { pub fn new(context: E, cfg: Config) -> (Self, Mailbox) { // Register self on relay let broadcast = cfg.relay.register(cfg.me.clone()); // Generate samplers let propose_latency = Normal::new(cfg.propose_latency.0, cfg.propose_latency.1).unwrap(); let verify_latency = Normal::new(cfg.verify_latency.0, cfg.verify_latency.1).unwrap(); let certify_latency = Normal::new(cfg.certify_latency.0, cfg.certify_latency.1).unwrap(); // Return constructed application let (sender, receiver) = mpsc::channel(1024); ( Self { context: ContextCell::new(context), hasher: cfg.hasher, me: cfg.me, relay: cfg.relay, broadcast, mailbox: receiver, propose_latency, verify_latency, certify_latency, fail_verification: false, should_certify: cfg.should_certify, pending: HashMap::new(), verified: HashSet::new(), }, Mailbox::new(sender), ) } pub const fn set_fail_verification(&mut self, fail: bool) { self.fail_verification = fail; } fn panic(&self, msg: &str) -> ! { panic!("[{:?}] {}", self.me, msg); } fn genesis(&mut self, epoch: Epoch) -> H::Digest { self.hasher .update(&(Bytes::from(GENESIS_BYTES), epoch).encode()); let digest = self.hasher.finalize(); self.verified.insert(digest); digest } /// When proposing a block, we do not care if the parent is verified (or even in our possession). /// Backfilling verification dependencies is considered out-of-scope for consensus. async fn propose(&mut self, context: Context) -> H::Digest { // Simulate the propose latency let duration = self.propose_latency.sample(&mut self.context); self.context .sleep(Duration::from_millis(duration as u64)) .await; // Generate the payload let rand = self.context.gen::(); let payload = (context.round, context.parent.1, rand).encode(); self.hasher.update(&payload); let digest = self.hasher.finalize(); // Mark verified self.verified.insert(digest); // Store pending payload self.pending.insert(digest, payload.into()); digest } async fn verify( &mut self, context: Context, payload: H::Digest, mut contents: Bytes, ) -> bool { // Simulate the verify latency let duration = self.verify_latency.sample(&mut self.context); self.context .sleep(Duration::from_millis(duration as u64)) .await; // Check if we should fail verification if self.fail_verification { return false; } // Verify contents let (parsed_round, parent, _) = <(Round, H::Digest, u64)>::decode(&mut contents).expect("invalid payload"); if parsed_round != context.round { self.panic(&format!( "invalid round (in payload): {} != {}", parsed_round, context.round )); } if parent != context.parent.1 { self.panic(&format!( "invalid parent (in payload): {:?} != {:?}", parent, context.parent.1 )); } // We don't care about the random number self.verified.insert(payload); true } async fn certify(&mut self, payload: H::Digest, _contents: Bytes) -> bool { // Simulate the certify latency let duration = self.certify_latency.sample(&mut self.context); self.context .sleep(Duration::from_millis(duration as u64)) .await; // Use configured predicate to determine certification match &self.should_certify { Certifier::Always => true, Certifier::Sometimes => (payload.as_ref().last().copied().unwrap_or(0) % 11) < 9, Certifier::Custom(func) => func(payload), } } async fn broadcast(&mut self, payload: H::Digest) { let contents = self.pending.remove(&payload).expect("missing payload"); self.relay.broadcast(&self.me, (payload, contents)).await; } pub fn start(mut self) -> Handle<()> { spawn_cell!(self.context, self.run().await) } async fn run(mut self) { // Setup digest tracking #[allow(clippy::type_complexity)] let mut waiters: HashMap< H::Digest, Vec<(Context, oneshot::Sender)>, > = HashMap::new(); let mut seen: HashMap = HashMap::new(); // Handle actions select_loop! { self.context, on_stopped => { debug!("context shutdown, stopping application"); }, message = self.mailbox.next() => { let message = match message { Some(message) => message, None => break, }; match message { Message::Genesis { epoch, response } => { let digest = self.genesis(epoch); let _ = response.send(digest); } Message::Propose { context, response } => { let digest = self.propose(context).await; let _ = response.send(digest); } Message::Verify { context, payload, response } => { if let Some(contents) = seen.get(&payload) { let verified = self.verify(context, payload, contents.clone()).await; let _ = response.send(verified); } else { waiters .entry(payload) .or_default() .push((context, response)); } } Message::Certify { payload, response } => { let contents = seen.get(&payload).cloned().unwrap_or_default(); let certified = self.certify(payload, contents).await; let _ = response.send(certified); } Message::Broadcast { payload } => { self.broadcast(payload).await; } } }, broadcast = self.broadcast.next() => { // Record digest for future use let (digest, contents) = broadcast.expect("broadcast closed"); seen.insert(digest, contents.clone()); // Check if we have a waiter if let Some(waiters) = waiters.remove(&digest) { for (context, sender) in waiters { let verified = self.verify(context, digest, contents.clone()).await; sender.send(verified).expect("Failed to send verification"); } } } } } }