//! Handler types for compact resolver actor coordination. use bytes::{Buf, BufMut, Bytes}; use commonware_actor::mailbox::{Overflow, Policy, Sender}; use commonware_codec::{EncodeSize, Error as CodecError, Read, ReadExt as _, Write}; use commonware_cryptography::Digest; use commonware_resolver::{self as resolver, p2p::Producer, Delivery}; use commonware_storage::{merkle::Family, qmdb::sync::compact}; use commonware_utils::{channel::oneshot, Span}; use std::{ collections::VecDeque, fmt, hash::{Hash, Hasher}, }; #[derive(Clone, Debug)] pub(super) struct Request { root: D, leaf_count: commonware_storage::merkle::Location, } impl Request { pub(super) const fn from_target(target: compact::Target) -> Self { Self { root: target.root, leaf_count: target.leaf_count, } } pub(super) const fn to_target(&self) -> compact::Target { compact::Target::new(self.root, self.leaf_count) } } impl PartialEq for Request { fn eq(&self, other: &Self) -> bool { self.root == other.root && self.leaf_count == other.leaf_count } } impl Eq for Request {} impl PartialOrd for Request { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } impl Ord for Request { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.root .cmp(&other.root) .then_with(|| self.leaf_count.cmp(&other.leaf_count)) } } impl Hash for Request { fn hash(&self, state: &mut H) { self.root.hash(state); self.leaf_count.hash(state); } } impl fmt::Display for Request { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, "CompactRequest(root={}, leaf_count={})", self.root, self.leaf_count ) } } impl Write for Request { fn write(&self, buf: &mut impl BufMut) { self.root.write(buf); self.leaf_count.write(buf); } } impl EncodeSize for Request { fn encode_size(&self) -> usize { self.root.encode_size() + self.leaf_count.encode_size() } } impl Read for Request { type Cfg = (); fn read_cfg(buf: &mut impl Buf, _: &()) -> Result { let root = D::read(buf)?; let leaf_count = commonware_storage::merkle::Location::::read(buf)?; let target = compact::Target::new(root, leaf_count); target.validate().map_err(|reason| { CodecError::Invalid( "commonware_glue::stateful::db::p2p::compact::Request", reason, ) })?; Ok(Self::from_target(target)) } } impl Span for Request {} #[cfg(feature = "arbitrary")] impl arbitrary::Arbitrary<'_> for Request where D: for<'a> arbitrary::Arbitrary<'a>, { fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result { Ok(Self::from_target(u.arbitrary()?)) } } pub(super) enum EngineMessage { Deliver { key: Request, value: Bytes, response: oneshot::Sender, }, Produce { key: Request, response: oneshot::Sender, }, } impl EngineMessage { fn response_closed(&self) -> bool { match self { Self::Deliver { response, .. } => response.is_closed(), Self::Produce { response, .. } => response.is_closed(), } } } pub(super) struct EnginePending(VecDeque>); impl Default for EnginePending { fn default() -> Self { Self(VecDeque::new()) } } impl Overflow> for EnginePending { fn is_empty(&self) -> bool { self.0.is_empty() } fn drain

(&mut self, mut push: P) where P: FnMut(EngineMessage) -> Option>, { while let Some(message) = self.0.pop_front() { if message.response_closed() { continue; } if let Some(message) = push(message) { self.0.push_front(message); break; } } } } impl Policy for EngineMessage { type Overflow = EnginePending; fn handle(overflow: &mut Self::Overflow, message: Self) { if message.response_closed() { return; } overflow.0.push_back(message); } } #[derive(Clone)] pub(super) struct Handler { sender: Sender>, } impl Handler { pub(super) const fn new(sender: Sender>) -> Self { Self { sender } } } impl resolver::Consumer for Handler { type Key = Request; type Value = Bytes; type Subscriber = (); fn deliver( &mut self, delivery: Delivery, value: Self::Value, ) -> oneshot::Receiver { let (response, receiver) = oneshot::channel(); let _ = self.sender.enqueue(EngineMessage::Deliver { key: delivery.key, value, response, }); receiver } } impl Producer for Handler { type Key = Request; fn produce(&mut self, key: Self::Key) -> oneshot::Receiver { let (response, receiver) = oneshot::channel(); let _ = self .sender .enqueue(EngineMessage::Produce { key, response }); receiver } } #[cfg(test)] mod tests { #[cfg(feature = "arbitrary")] mod conformance { use super::super::*; use commonware_codec::conformance::CodecConformance; use commonware_cryptography::sha256; use commonware_storage::merkle::mmr; commonware_conformance::conformance_tests! { CodecConformance>, } } }