use crate::{ application::{types::genesis_block, Block, Mailbox, Message}, dkg, BLOCKS_PER_EPOCH, }; use commonware_consensus::{ marshal, simplex::signing_scheme::Scheme, types::Round, utils::{epoch, last_block_in_epoch}, }; use commonware_cryptography::{ bls12381::primitives::variant::Variant, Committable, Digestible, Hasher, Signer, }; use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner}; use futures::{ channel::mpsc, future::{try_join, Either}, lock::Mutex, StreamExt, }; use rand::Rng; use std::{future, marker::PhantomData, sync::Arc, time::Duration}; use tracing::{info, warn}; /// The application [Actor]. pub struct Actor where H: Hasher, C: Signer, V: Variant, S: Scheme, { context: ContextCell, mailbox: mpsc::Receiver>, _phantom: PhantomData<(C, V, S)>, } impl Actor where E: Rng + Spawner + Metrics + Clock, H: Hasher, C: Signer, V: Variant, S: Scheme, { /// Create a new application [Actor] and its associated [Mailbox]. pub fn new(context: E, mailbox_size: usize) -> (Self, Mailbox) { let (sender, mailbox) = mpsc::channel(mailbox_size); ( Self { context: ContextCell::new(context), mailbox, _phantom: PhantomData, }, Mailbox::new(sender), ) } /// Start the application. pub fn start( mut self, marshal: marshal::Mailbox>, dkg: dkg::Mailbox, ) -> Handle<()> { spawn_cell!(self.context, self.run(marshal, dkg).await) } /// Application control loop async fn run( mut self, mut marshal: marshal::Mailbox>, dkg: dkg::Mailbox, ) { let genesis = genesis_block(); let genesis_digest = genesis.digest(); let built = Arc::new(Mutex::new(None)); while let Some(message) = self.mailbox.next().await { match message { Message::Genesis { epoch, response } => { // Case: Genesis. if epoch == 0 { let _ = response.send(genesis_block::().commitment()); continue; } // Case: Non-genesis. let height = last_block_in_epoch(BLOCKS_PER_EPOCH, epoch - 1); let Some(block) = marshal.get_block(height).await else { // A new consensus engine will never be started without having the genesis block // of the new epoch (the last block of the previous epoch) already stored. unreachable!("missing block at height {}", height); }; let _ = response.send(block.commitment()); } Message::Propose { round, parent, response, } => { let (parent_view, parent_digest) = parent; let parent_request = if parent_digest == genesis_digest { Either::Left(future::ready(Ok(genesis.clone()))) } else { Either::Right( marshal .subscribe( Some(Round::new(round.epoch(), parent_view)), parent_digest, ) .await, ) }; let built = built.clone(); let mut dkg = dkg.clone(); self.context .with_label("propose") .spawn(move |context| async move { let parent = parent_request.await.expect("parent request cancelled"); // Re-propose the parent block if it's already at the last height in the epoch. if parent.height == last_block_in_epoch(BLOCKS_PER_EPOCH, round.epoch()) { // Set the built block to the parent block let digest = parent.digest(); { let mut built = built.lock().await; *built = Some((round.view(), parent)); } // Send the digest to the consensus let result = response.send(digest); info!( ?round, ?digest, success = result.is_ok(), "re-proposed parent block at epoch boundary" ); return; } // Ask the DKG actor for a result to include // // This approach does allow duplicate commitments to be proposed, but // the arbiter handles this by choosing the first commitment it sees // from any given dealer. let reshare = context .timeout(Duration::from_millis(5), async move { dkg.act().await }) .await .ok() .flatten(); // Create a new block let block = Block::new(parent_digest, parent.height + 1, reshare); let digest = block.digest(); let mut built = built.lock().await; *built = Some((round.view(), block)); // Send the digest to the consensus let result = response.send(digest); info!( ?round, ?digest, success = result.is_ok(), "proposed new block" ); }); } Message::Verify { round, parent, digest, response, } => { let (parent_view, parent_digest) = parent; let parent_request = if parent_digest == genesis_digest { Either::Left(future::ready(Ok(genesis.clone()))) } else { Either::Right( marshal .subscribe( Some(Round::new(round.epoch(), parent_view)), parent_digest, ) .await, ) }; let mut marshal = marshal.clone(); self.context .with_label("verify") .spawn(move |_| async move { let (parent, block) = try_join(parent_request, marshal.subscribe(None, digest).await) .await .unwrap(); // You can only re-propose the same block if it's the last height in the epoch. if parent.commitment() == block.commitment() { if block.height == last_block_in_epoch(BLOCKS_PER_EPOCH, round.epoch()) { marshal.verified(round, block).await; let _ = response.send(true); } else { let _ = response.send(false); } return; } // Verify the block if block.height != parent.height + 1 || block.parent != parent.digest() || epoch(BLOCKS_PER_EPOCH, block.height) != round.epoch() { let _ = response.send(false); return; } marshal.verified(round, block).await; let _ = response.send(true); }); } Message::Broadcast { digest } => { let Some((_, block)) = built.lock().await.clone() else { warn!(%digest, "no built block to broadcast"); continue; }; if block.digest() != digest { warn!( want = %digest, have = %block.digest(), "Broadcast request digest does not match built block" ); continue; } marshal.broadcast(block).await; } } } info!("mailbox closed, exiting"); } }