//! Mailbox for the compact QMDB P2P resolver. use super::handler; use crate::stateful::db::AttachableResolver; use commonware_actor::mailbox::{Overflow, Policy, Sender}; use commonware_cryptography::{Digest, Hasher}; use commonware_storage::{merkle::Family, qmdb::sync::compact}; use commonware_utils::{channel::oneshot, sync::AsyncRwLock}; use std::{collections::VecDeque, future::Future, sync::Arc}; struct CancelGuard { sender: Sender>, request: Option>, } impl CancelGuard { const fn new(sender: Sender>, request: handler::Request) -> Self { Self { sender, request: Some(request), } } const fn disarm(&mut self) { self.request = None; } } impl Drop for CancelGuard { fn drop(&mut self) { let Some(request) = self.request.take() else { return; }; let _ = self.sender.enqueue(Message::CancelState { request }); } } /// The resolver actor dropped the response before completion. #[derive(Debug, thiserror::Error)] #[error("response dropped before completion")] pub struct ResponseDropped; pub(super) enum Message { AttachDatabase(Arc>), GetState { request: handler::Request, response: oneshot::Sender, ResponseDropped>>, }, CancelState { request: handler::Request, }, } impl Message { fn response_closed(&self) -> bool { match self { Self::AttachDatabase(_) | Self::CancelState { .. } => false, Self::GetState { response, .. } => response.is_closed(), } } } pub(super) struct Pending { database: Option>>, messages: VecDeque>, } impl Default for Pending { fn default() -> Self { Self { database: None, messages: VecDeque::new(), } } } impl Overflow> for Pending { fn is_empty(&self) -> bool { self.database.is_none() && self.messages.is_empty() } fn drain

(&mut self, mut push: P) where P: FnMut(Message) -> Option>, { if let Some(database) = self.database.take() { if let Some(Message::AttachDatabase(database)) = push(Message::AttachDatabase(database)) { self.database = Some(database); return; } } while let Some(message) = self.messages.pop_front() { if message.response_closed() { continue; } if let Some(message) = push(message) { self.messages.push_front(message); break; } } } } impl Policy for Message { type Overflow = Pending; fn handle(overflow: &mut Self::Overflow, message: Self) { if message.response_closed() { return; } match message { Self::AttachDatabase(database) => { overflow.database = Some(database); } message => overflow.messages.push_back(message), } } } /// Client-facing resolver mailbox used by compact QMDB sync. pub struct Mailbox { sender: Sender>, } impl Clone for Mailbox { fn clone(&self) -> Self { Self { sender: self.sender.clone(), } } } impl Mailbox { pub(super) const fn new(sender: Sender>) -> Self { Self { sender } } } impl Mailbox { pub fn attach_database(&self, db: Arc>) { let _ = self.sender.enqueue(Message::AttachDatabase(db)); } } impl compact::Resolver for Mailbox where DB: Send + Sync + 'static, F: Family, Op: Send + Sync + Clone + 'static, H: Hasher, { type Digest = H::Digest; type Error = ResponseDropped; type Family = F; type Op = Op; async fn get_compact_state( &self, target: compact::Target, ) -> Result, Self::Error> { let request = handler::Request::from_target(target); let (response, receiver) = oneshot::channel(); let _ = self.sender.enqueue(Message::GetState { request: request.clone(), response, }); let mut cancel = CancelGuard::new(self.sender.clone(), request); let result = receiver.await; cancel.disarm(); result.map_err(|_| ResponseDropped)? } } impl AttachableResolver for Mailbox where DB: Send + Sync + 'static, F: Family, Op: Send + Sync + Clone + 'static, H: Hasher, { fn attach_database(&self, db: Arc>) -> impl Future + Send { Self::attach_database(self, db); std::future::ready(()) } } #[cfg(test)] mod tests { use super::*; use commonware_cryptography::sha256::Sha256; use commonware_runtime::{deterministic, Runner as _}; use commonware_storage::{mmr, qmdb::sync::compact::Resolver as _}; use commonware_utils::NZUsize; use futures::future::poll_fn; use std::task::Poll; #[test] fn get_compact_state_sends_request() { deterministic::Runner::default().start(|context| async move { let (sender, mut receiver) = commonware_actor::mailbox::new(context, NZUsize!(4)); let mailbox = Mailbox::<(), mmr::Family, u64, Sha256>::new(sender); let target = compact::Target { root: [1u8; 32].into(), leaf_count: mmr::Location::new(7), }; let get = mailbox.get_compact_state(target.clone()); let observe = async move { let message = receiver.recv().await.expect("request should be queued"); let Message::GetState { request, response } = message else { panic!("unexpected attach message"); }; assert_eq!(request.to_target(), target); drop(response); }; let (result, _) = futures::join!(get, observe); assert!(matches!(result, Err(ResponseDropped))); }); } #[test] fn dropped_request_sends_cancel_message() { deterministic::Runner::default().start(|context| async move { let (sender, mut receiver) = commonware_actor::mailbox::new(context, NZUsize!(4)); let mailbox = Mailbox::<(), mmr::Family, u64, Sha256>::new(sender); let target = compact::Target { root: [2u8; 32].into(), leaf_count: mmr::Location::new(9), }; let mut get = Box::pin(mailbox.get_compact_state(target.clone())); poll_fn(|cx| { assert!(matches!(get.as_mut().poll(cx), Poll::Pending)); Poll::Ready(()) }) .await; drop(get); let message = receiver.recv().await.expect("request should be queued"); let Message::GetState { request, response } = message else { panic!("unexpected attach message"); }; assert_eq!(request.to_target(), target); drop(response); match receiver.recv().await.expect("cancel should be queued") { Message::CancelState { request } => { assert_eq!(request.to_target(), target); } Message::AttachDatabase(_) => panic!("unexpected attach message"), Message::GetState { .. } => panic!("unexpected duplicate request"), } }); } }