//! Internal handler types for resolver actor coordination. use bytes::{Buf, BufMut, Bytes}; use commonware_actor::mailbox::{Overflow, Policy, Sender}; use commonware_codec::{EncodeSize, Error as CodecError, Read, ReadExt, ReadRangeExt, Write}; use commonware_cryptography::Digest; use commonware_resolver::{self as resolver, p2p::Producer, Delivery}; use commonware_storage::merkle::{ Family, Location, Proof, MAX_PINNED_NODES, MAX_PROOF_DIGESTS_PER_ELEMENT, }; use commonware_utils::{channel::oneshot, Span}; use std::{ cmp::Ordering, collections::VecDeque, fmt, hash::{Hash, Hasher}, num::NonZeroU64, }; /// Request key sent through `resolver::p2p::Engine`. #[derive(Clone, Debug)] pub(super) struct Request { /// Total operation count for proof context. pub op_count: Location, /// First operation location to fetch. pub start_loc: Location, /// Maximum number of operations to fetch. pub max_ops: NonZeroU64, /// Include pinned nodes for `start_loc` when `true`. pub include_pinned_nodes: bool, } impl PartialEq for Request { fn eq(&self, other: &Self) -> bool { self.op_count == other.op_count && self.start_loc == other.start_loc && self.max_ops == other.max_ops && self.include_pinned_nodes == other.include_pinned_nodes } } impl Eq for Request {} impl PartialOrd for Request { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } impl Ord for Request { fn cmp(&self, other: &Self) -> Ordering { self.op_count .cmp(&other.op_count) .then_with(|| self.start_loc.cmp(&other.start_loc)) .then_with(|| self.max_ops.cmp(&other.max_ops)) .then_with(|| self.include_pinned_nodes.cmp(&other.include_pinned_nodes)) } } impl Hash for Request { fn hash(&self, state: &mut H) { self.op_count.hash(state); self.start_loc.hash(state); self.max_ops.hash(state); self.include_pinned_nodes.hash(state); } } impl fmt::Display for Request { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, "Request(count={}, start={}, max={}, pinned={})", self.op_count, self.start_loc, self.max_ops, self.include_pinned_nodes, ) } } impl Write for Request { fn write(&self, buf: &mut impl BufMut) { self.op_count.write(buf); self.start_loc.write(buf); self.max_ops.write(buf); self.include_pinned_nodes.write(buf); } } impl EncodeSize for Request { fn encode_size(&self) -> usize { self.op_count.encode_size() + self.start_loc.encode_size() + self.max_ops.encode_size() + self.include_pinned_nodes.encode_size() } } impl Read for Request { type Cfg = (); fn read_cfg(buf: &mut impl Buf, _: &()) -> Result { Ok(Self { op_count: Location::::read(buf)?, start_loc: Location::::read(buf)?, max_ops: NonZeroU64::read(buf)?, include_pinned_nodes: bool::read(buf)?, }) } } impl Span for Request {} #[cfg(feature = "arbitrary")] impl arbitrary::Arbitrary<'_> for Request { fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result { Ok(Self { op_count: u.arbitrary()?, start_loc: u.arbitrary()?, max_ops: u.arbitrary()?, include_pinned_nodes: u.arbitrary()?, }) } } /// Wire-format response to a [`Request`]. /// /// Carries the inclusion proof, the fetched operations, and /// optionally the pinned nodes at the requested start location. /// Encoded by the producing peer and decoded by the consuming peer; /// the actor converts this into a [`FetchResult`](commonware_storage::qmdb::sync::resolver::FetchResult) /// before handing it to subscribers. pub(super) struct Response { pub(super) proof: Proof, pub(super) operations: Vec, pub(super) pinned_nodes: Option>, } impl Write for Response { fn write(&self, buf: &mut impl BufMut) { self.proof.write(buf); self.operations.write(buf); self.pinned_nodes.write(buf); } } impl EncodeSize for Response { fn encode_size(&self) -> usize { self.proof.encode_size() + self.operations.encode_size() + self.pinned_nodes.encode_size() } } impl, D: Digest> Read for Response { /// Maximum operations expected in this response, derived from the /// request's `max_ops` field. type Cfg = usize; fn read_cfg(buf: &mut impl Buf, max_ops: &usize) -> Result { let max_proof_digests = max_ops.saturating_mul(MAX_PROOF_DIGESTS_PER_ELEMENT); let proof = Proof::::read_cfg(buf, &max_proof_digests)?; let operations = Vec::::read_range(buf, ..=*max_ops)?; // Pinned nodes are the fold-prefix peaks at `start_loc`, independent of // `max_ops`. Bound them by the global pinned-node limit. let pinned_nodes = Option::>::read_range(buf, ..=MAX_PINNED_NODES)?; Ok(Self { proof, operations, pinned_nodes, }) } } #[cfg(feature = "arbitrary")] impl arbitrary::Arbitrary<'_> for Response where Op: for<'a> arbitrary::Arbitrary<'a>, D: for<'a> arbitrary::Arbitrary<'a>, { fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result { Ok(Self { proof: u.arbitrary()?, operations: u.arbitrary()?, pinned_nodes: u.arbitrary()?, }) } } /// Messages sent from [`Handler`] to the resolver [`Actor`](super::Actor). /// /// Each variant corresponds to one of the `resolver::Consumer` or `p2p::Producer` /// callbacks, re-routed so the actor processes them on its own task. pub(super) enum EngineMessage { /// A peer delivered a response for a previously fetched key. /// The actor decodes the value, fans it out to waiting subscribers, /// and reports acceptance back through `response`. Deliver { key: Request, value: Bytes, response: oneshot::Sender, }, /// A peer requested data for `key`. /// The actor queries the local database and sends the encoded /// [`Response`] back through `response`. Produce { key: Request, response: oneshot::Sender, }, } impl EngineMessage { fn response_closed(&self) -> bool { match self { Self::Deliver { response, .. } => response.is_closed(), Self::Produce { response, .. } => response.is_closed(), } } } pub(super) struct EnginePending(VecDeque>); impl Default for EnginePending { fn default() -> Self { Self(VecDeque::new()) } } impl Overflow> for EnginePending { fn is_empty(&self) -> bool { self.0.is_empty() } fn drain

(&mut self, mut push: P) where P: FnMut(EngineMessage) -> Option>, { while let Some(message) = self.0.pop_front() { if message.response_closed() { continue; } if let Some(message) = push(message) { self.0.push_front(message); break; } } } } impl Policy for EngineMessage { type Overflow = EnginePending; fn handle(overflow: &mut Self::Overflow, message: Self) { if message.response_closed() { return; } overflow.0.push_back(message); } } /// Bridges `resolver::Consumer` and `p2p::Producer` into the actor's /// message channel. /// /// Every callback from the resolver engine is converted into an /// [`EngineMessage`] and sent to the actor. This keeps all mutable /// state (pending subscribers, database handle) on the actor task, /// while the engine runs independently. #[derive(Clone)] pub(super) struct Handler { sender: Sender>, } impl Handler { pub(super) const fn new(sender: Sender>) -> Self { Self { sender } } } impl resolver::Consumer for Handler { type Key = Request; type Value = Bytes; type Subscriber = (); fn deliver( &mut self, delivery: Delivery, value: Self::Value, ) -> oneshot::Receiver { let (response, receiver) = oneshot::channel(); let _ = self.sender.enqueue(EngineMessage::Deliver { key: delivery.key, value, response, }); receiver } } impl Producer for Handler { type Key = Request; fn produce(&mut self, key: Self::Key) -> oneshot::Receiver { let (response, receiver) = oneshot::channel(); let _ = self .sender .enqueue(EngineMessage::Produce { key, response }); receiver } } #[cfg(test)] mod tests { use super::*; use commonware_codec::{Decode, DecodeExt, Encode}; use commonware_cryptography::sha256; use commonware_storage::merkle::mmr; const TEST_MAX_OPS: usize = 10_000; #[test] fn response_codec_roundtrip() { let response = Response:: { proof: Proof { leaves: mmr::Location::new(10), inactive_peaks: 0, digests: vec![sha256::Digest::from([7; 32])], }, operations: vec![1, 2, 3], pinned_nodes: Some(vec![sha256::Digest::from([9; 32])]), }; let encoded = response.encode(); let decoded = Response::::decode_cfg(encoded, &TEST_MAX_OPS) .unwrap(); assert_eq!(decoded.operations, vec![1, 2, 3]); assert_eq!(decoded.proof.leaves, mmr::Location::new(10)); assert_eq!(decoded.pinned_nodes.unwrap().len(), 1); } #[test] fn response_decode_rejects_invalid_pinned_flag() { let mut encoded = Response:: { proof: Proof { leaves: mmr::Location::new(10), inactive_peaks: 0, digests: vec![sha256::Digest::from([7; 32])], }, operations: vec![1, 2, 3], pinned_nodes: None, } .encode() .to_vec(); *encoded .last_mut() .expect("response encoding must include pinned_nodes flag") = 2; let err = match Response::::decode_cfg( Bytes::from(encoded), &TEST_MAX_OPS, ) { Ok(_) => panic!("decode should fail for invalid bool flag"), Err(err) => err, }; assert!(matches!(err, CodecError::InvalidBool)); } #[test] fn response_decode_allows_pinned_nodes_above_max_ops() { let max_ops = 1usize; let response = Response:: { proof: Proof { leaves: mmr::Location::new(10), inactive_peaks: 0, digests: vec![sha256::Digest::from([7; 32])], }, operations: vec![1], pinned_nodes: Some(vec![sha256::Digest::from([9; 32]); 3]), }; let encoded = response.encode(); let decoded = Response::::decode_cfg(encoded, &max_ops).unwrap(); assert_eq!(decoded.operations, vec![1]); assert_eq!(decoded.pinned_nodes.unwrap().len(), 3); } #[test] fn response_decode_allows_max_single_operation_proof() { let max_ops = 1usize; let response = Response:: { proof: Proof { leaves: mmr::Location::new(10), inactive_peaks: 0, digests: vec![sha256::Digest::from([7; 32]); MAX_PROOF_DIGESTS_PER_ELEMENT], }, operations: vec![1], pinned_nodes: None, }; let encoded = response.encode(); let decoded = Response::::decode_cfg(encoded, &max_ops).unwrap(); assert_eq!(decoded.operations, vec![1]); assert_eq!(decoded.proof.digests.len(), MAX_PROOF_DIGESTS_PER_ELEMENT); } #[test] fn request_codec_roundtrip() { let req = Request:: { op_count: mmr::Location::new(128), start_loc: mmr::Location::new(64), max_ops: NonZeroU64::new(16).unwrap(), include_pinned_nodes: true, }; let encoded = req.encode(); let decoded = Request::::decode(encoded).unwrap(); assert_eq!(req, decoded); } #[test] fn request_decode_rejects_invalid_pinned_flag() { let mut encoded = Request:: { op_count: mmr::Location::new(128), start_loc: mmr::Location::new(64), max_ops: NonZeroU64::new(16).unwrap(), include_pinned_nodes: true, } .encode() .to_vec(); *encoded .last_mut() .expect("request encoding must include flag") = 2; let err = Request::::decode(Bytes::from(encoded)).unwrap_err(); assert!(matches!(err, CodecError::InvalidBool)); } #[cfg(feature = "arbitrary")] mod conformance { use super::*; use commonware_codec::conformance::CodecConformance; commonware_conformance::conformance_tests! { CodecConformance>, CodecConformance>, } } }