//! DKG [Actor] ingress (mailbox and messages) //! //! [Actor]: super::Actor use crate::{application::Block, dkg::DealOutcome}; use commonware_consensus::{marshal::Update, Reporter}; use commonware_cryptography::{bls12381::primitives::variant::Variant, Hasher, Signer}; use futures::{ channel::{mpsc, oneshot}, SinkExt, }; /// A message that can be sent to the [Actor]. /// /// [Actor]: super::Actor #[allow(clippy::large_enum_variant)] pub enum Message where H: Hasher, C: Signer, V: Variant, { /// A request for the [Actor]'s next [DealOutcome] for inclusion within a block. /// /// [Actor]: super::Actor Act { response: oneshot::Sender>>, }, /// A new block has been finalized. Finalized { block: Block, response: oneshot::Sender<()>, }, } /// Inbox for sending messages to the DKG [Actor]. /// /// [Actor]: super::Actor #[derive(Clone)] pub struct Mailbox where H: Hasher, C: Signer, V: Variant, { sender: mpsc::Sender>, } impl Mailbox where H: Hasher, C: Signer, V: Variant, { /// Create a new mailbox. pub fn new(sender: mpsc::Sender>) -> Self { Self { sender } } /// Request the [Actor]'s next payload for inclusion within a block. /// /// [Actor]: super::Actor pub async fn act(&mut self) -> Option> { let (response_tx, response_rx) = oneshot::channel(); let message = Message::Act { response: response_tx, }; self.sender.send(message).await.expect("mailbox closed"); response_rx.await.expect("response channel closed") } } impl Reporter for Mailbox where H: Hasher, C: Signer, V: Variant, { type Activity = Update>; async fn report(&mut self, update: Self::Activity) { let (sender, receiver) = oneshot::channel(); // Report the finalized block to the DKG actor on a best-effort basis. let Update::Block(block) = update else { // We ignore any other updates sent by marshal. return; }; let _ = self .sender .send(Message::Finalized { block, response: sender, }) .await; let _ = receiver.await; } }