use crate::{simplex::types::Certificate, types::View}; use bytes::Bytes; use commonware_cryptography::{certificate::Scheme, Digest}; use commonware_resolver::{p2p::Producer, Consumer}; use commonware_utils::{channels::fallible::AsyncFallibleExt, sequence::U64}; use futures::channel::{mpsc, oneshot}; /// Messages sent to the resolver actor from the voter. pub enum MailboxMessage { /// A certificate was received or produced. Certificate(Certificate), /// Certification result for a view. Certified { view: View, success: bool }, } #[derive(Clone)] pub struct Mailbox { sender: mpsc::Sender>, } impl Mailbox { /// Create a new mailbox. pub const fn new(sender: mpsc::Sender>) -> Self { Self { sender } } /// Send a certificate. pub async fn updated(&mut self, certificate: Certificate) { self.sender .send_lossy(MailboxMessage::Certificate(certificate)) .await; } /// Notify the resolver of a certification result. pub async fn certified(&mut self, view: View, success: bool) { self.sender .send_lossy(MailboxMessage::Certified { view, success }) .await; } } #[derive(Debug)] pub enum HandlerMessage { Deliver { view: View, data: Bytes, response: oneshot::Sender, }, Produce { view: View, response: oneshot::Sender, }, } #[derive(Clone)] pub struct Handler { sender: mpsc::Sender, } impl Handler { pub const fn new(sender: mpsc::Sender) -> Self { Self { sender } } } impl Consumer for Handler { type Key = U64; type Value = Bytes; type Failure = (); async fn deliver(&mut self, key: Self::Key, value: Self::Value) -> bool { self.sender .request_or( |response| HandlerMessage::Deliver { view: View::new(key.into()), data: value, response, }, false, ) .await } async fn failed(&mut self, _: Self::Key, _: Self::Failure) { // We don't need to do anything on failure, the resolver will retry. } } impl Producer for Handler { type Key = U64; async fn produce(&mut self, key: Self::Key) -> oneshot::Receiver { let (response, receiver) = oneshot::channel(); self.sender .send_lossy(HandlerMessage::Produce { view: View::new(key.into()), response, }) .await; receiver } }