use crate::{ merkle::{Family, Location, Proof}, qmdb::{ self, any::{ ordered::{ fixed::{Db as OrderedFixedDb, Operation as OrderedFixedOperation}, variable::{Db as OrderedVariableDb, Operation as OrderedVariableOperation}, }, unordered::{ fixed::{Db as FixedDb, Operation as FixedOperation}, variable::{Db as VariableDb, Operation as VariableOperation}, }, FixedValue, VariableValue, }, immutable::{ fixed::{Db as ImmutableFixedDb, Operation as ImmutableFixedOp}, variable::{Db as ImmutableVariableDb, Operation as ImmutableVariableOp}, }, keyless::{ fixed::{Db as KeylessFixedDb, Operation as KeylessFixedOp}, variable::{Db as KeylessVariableDb, Operation as KeylessVariableOp}, }, operation::Key, }, translator::Translator, Context, }; use commonware_cryptography::{Digest, Hasher}; use commonware_parallel::Strategy; use commonware_utils::{channel::oneshot, sync::AsyncRwLock, Array}; use std::{future::Future, num::NonZeroU64, sync::Arc}; /// Result from a fetch operation. pub struct FetchResult { /// The proof for the operations pub proof: Proof, /// The operations that were fetched pub operations: Vec, /// Pinned merkle nodes at the start location, if requested pub pinned_nodes: Option>, /// Optional callback for resolvers that observe downstream validation feedback. pub callback: Option>, } impl FetchResult { /// Creates a fetch result that does not observe the validation acknowledgement. pub const fn new( proof: Proof, operations: Vec, pinned_nodes: Option>, ) -> Self { Self { proof, operations, pinned_nodes, callback: None, } } /// Creates a fetch result using an externally managed validation callback. pub const fn with_callback( proof: Proof, operations: Vec, pinned_nodes: Option>, callback: oneshot::Sender, ) -> Self { Self { proof, operations, pinned_nodes, callback: Some(callback), } } } /// Operations fetched from a resolver before packaging as a [`FetchResult`]. pub struct FetchedOperations { /// The proof for the operations pub proof: Proof, /// The operations that were fetched pub operations: Vec, /// Pinned merkle nodes at the start location, if requested pub pinned_nodes: Option>, } impl FetchedOperations { /// Creates fetched operations with optional pinned nodes. pub const fn new( proof: Proof, operations: Vec, pinned_nodes: Option>, ) -> Self { Self { proof, operations, pinned_nodes, } } } impl std::fmt::Debug for FetchResult { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("FetchResult") .field("proof", &self.proof) .field("operations", &self.operations) .field("pinned_nodes", &self.pinned_nodes) .field("callback", &self.callback.as_ref().map(|_| "")) .finish() } } /// Fetch an operation range with a caller-provided callback and package it as a /// [`FetchResult`]. /// /// Use this when the source returns the proof, operations, and optional pinned nodes together, /// such as a network `get_operations` request. pub async fn fetch_operation_range( op_count: Location, start_loc: Location, max_ops: NonZeroU64, include_pinned_nodes: bool, fetch: Fetch, ) -> Result, Error> where F: Family, D: Digest, Fetch: FnOnce(Location, Location, NonZeroU64, bool) -> FetchFuture, FetchFuture: Future, Error>>, { let FetchedOperations { proof, operations, pinned_nodes, } = fetch(op_count, start_loc, max_ops, include_pinned_nodes).await?; Ok(FetchResult::new(proof, operations, pinned_nodes)) } /// Fetch an operation range from separate local-store callbacks and package it as a /// [`FetchResult`]. /// /// Use this for database APIs that expose `historical_proof` separately from /// `pinned_nodes_at`; pinned nodes are fetched only when `include_pinned_nodes` is true. pub async fn fetch_operations< F, Op, D, Error, HistoricalProof, HistoricalFuture, Pins, PinsFuture, >( op_count: Location, start_loc: Location, max_ops: NonZeroU64, include_pinned_nodes: bool, historical_proof: HistoricalProof, pinned_nodes_at: Pins, ) -> Result, Error> where F: Family, D: Digest, HistoricalProof: FnOnce(Location, Location, NonZeroU64) -> HistoricalFuture, HistoricalFuture: Future, Vec), Error>>, Pins: FnOnce(Location) -> PinsFuture, PinsFuture: Future, Error>>, { fetch_operation_range( op_count, start_loc, max_ops, include_pinned_nodes, |op_count, start_loc, max_ops, include_pinned_nodes| async move { let (proof, operations) = historical_proof(op_count, start_loc, max_ops).await?; let pinned_nodes = if include_pinned_nodes { Some(pinned_nodes_at(start_loc).await?) } else { None }; Ok(FetchedOperations::new(proof, operations, pinned_nodes)) }, ) .await } /// Trait for network communication with the sync server. pub trait Resolver: Send + Sync + Clone + 'static { /// The merkle family backing the resolver's proofs type Family: Family; /// The digest type used in proofs returned by the resolver type Digest: Digest; /// The type of operations returned by the resolver type Op; /// The error type returned by the resolver type Error: std::error::Error + Send + 'static; /// Get the operations starting at `start_loc` in the database, up to `max_ops` operations. /// Returns the operations and a proof that they were present in the database when it had /// `op_count` operations. If `include_pinned_nodes` is true, the result will include the /// pinned merkle nodes at `start_loc`. /// /// The corresponding `cancel_tx` is dropped when the engine no longer needs this /// request (e.g. due to a target update), causing `cancel_rx.await` to return /// `Err`. Implementations may `select!` on it to abort in-flight work early. #[allow(clippy::type_complexity)] fn get_operations<'a>( &'a self, op_count: Location, start_loc: Location, max_ops: NonZeroU64, include_pinned_nodes: bool, cancel_rx: oneshot::Receiver<()>, ) -> impl Future, Self::Error>> + Send + 'a; } macro_rules! impl_resolver { ($db:ident, $op:ident, $val_bound:ident) => { impl Resolver for Arc<$db> where F: Family, E: Context, K: Array, V: $val_bound + Send + Sync + 'static, H: Hasher, T: Translator + Send + Sync + 'static, T::Key: Send + Sync, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = qmdb::Error; async fn get_operations( &self, op_count: Location, start_loc: Location, max_ops: NonZeroU64, include_pinned_nodes: bool, _cancel_rx: oneshot::Receiver<()>, ) -> Result, Self::Error> { fetch_operations( op_count, start_loc, max_ops, include_pinned_nodes, |op_count, start_loc, max_ops| { self.historical_proof(op_count, start_loc, max_ops) }, |start_loc| self.pinned_nodes_at(start_loc), ) .await } } impl Resolver for Arc>> where F: Family, E: Context, K: Array, V: $val_bound + Send + Sync + 'static, H: Hasher, T: Translator + Send + Sync + 'static, T::Key: Send + Sync, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = qmdb::Error; async fn get_operations( &self, op_count: Location, start_loc: Location, max_ops: NonZeroU64, include_pinned_nodes: bool, _cancel_rx: oneshot::Receiver<()>, ) -> Result, Self::Error> { let db = self.read().await; fetch_operations( op_count, start_loc, max_ops, include_pinned_nodes, |op_count, start_loc, max_ops| { db.historical_proof(op_count, start_loc, max_ops) }, |start_loc| db.pinned_nodes_at(start_loc), ) .await } } impl Resolver for Arc>>> where F: Family, E: Context, K: Array, V: $val_bound + Send + Sync + 'static, H: Hasher, T: Translator + Send + Sync + 'static, T::Key: Send + Sync, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = qmdb::Error; async fn get_operations( &self, op_count: Location, start_loc: Location, max_ops: NonZeroU64, include_pinned_nodes: bool, _cancel_rx: oneshot::Receiver<()>, ) -> Result, Self::Error> { let guard = self.read().await; let db = guard.as_ref().ok_or(qmdb::Error::KeyNotFound)?; fetch_operations( op_count, start_loc, max_ops, include_pinned_nodes, |op_count, start_loc, max_ops| { db.historical_proof(op_count, start_loc, max_ops) }, |start_loc| db.pinned_nodes_at(start_loc), ) .await } } }; } // Unordered Fixed impl_resolver!(FixedDb, FixedOperation, FixedValue); // Unordered Variable impl_resolver!(VariableDb, VariableOperation, VariableValue); // Ordered Fixed impl_resolver!(OrderedFixedDb, OrderedFixedOperation, FixedValue); // Ordered Variable impl_resolver!(OrderedVariableDb, OrderedVariableOperation, VariableValue); // Immutable types need a separate macro because the key bound varies // (Array for fixed, Key for variable) unlike the other DB types which // always use Array. macro_rules! impl_resolver_immutable { ($db:ident, $op:ident, $val_bound:ident, $key_bound:path) => { impl Resolver for Arc<$db> where F: Family, E: Context, K: $key_bound, V: $val_bound + Send + Sync + 'static, H: Hasher, T: Translator + Send + Sync + 'static, T::Key: Send + Sync, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = qmdb::Error; async fn get_operations( &self, op_count: Location, start_loc: Location, max_ops: NonZeroU64, include_pinned_nodes: bool, _cancel_rx: oneshot::Receiver<()>, ) -> Result, Self::Error> { fetch_operations( op_count, start_loc, max_ops, include_pinned_nodes, |op_count, start_loc, max_ops| { self.historical_proof(op_count, start_loc, max_ops) }, |start_loc| self.pinned_nodes_at(start_loc), ) .await } } impl Resolver for Arc>> where F: Family, E: Context, K: $key_bound, V: $val_bound + Send + Sync + 'static, H: Hasher, T: Translator + Send + Sync + 'static, T::Key: Send + Sync, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = qmdb::Error; async fn get_operations( &self, op_count: Location, start_loc: Location, max_ops: NonZeroU64, include_pinned_nodes: bool, _cancel_rx: oneshot::Receiver<()>, ) -> Result, Self::Error> { let db = self.read().await; fetch_operations( op_count, start_loc, max_ops, include_pinned_nodes, |op_count, start_loc, max_ops| { db.historical_proof(op_count, start_loc, max_ops) }, |start_loc| db.pinned_nodes_at(start_loc), ) .await } } impl Resolver for Arc>>> where F: Family, E: Context, K: $key_bound, V: $val_bound + Send + Sync + 'static, H: Hasher, T: Translator + Send + Sync + 'static, T::Key: Send + Sync, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = qmdb::Error; async fn get_operations( &self, op_count: Location, start_loc: Location, max_ops: NonZeroU64, include_pinned_nodes: bool, _cancel_rx: oneshot::Receiver<()>, ) -> Result, Self::Error> { let guard = self.read().await; let db = guard.as_ref().ok_or(qmdb::Error::KeyNotFound)?; fetch_operations( op_count, start_loc, max_ops, include_pinned_nodes, |op_count, start_loc, max_ops| { db.historical_proof(op_count, start_loc, max_ops) }, |start_loc| db.pinned_nodes_at(start_loc), ) .await } } }; } // Immutable Fixed impl_resolver_immutable!(ImmutableFixedDb, ImmutableFixedOp, FixedValue, Array); // Immutable Variable impl_resolver_immutable!(ImmutableVariableDb, ImmutableVariableOp, VariableValue, Key); // Keyless types have no key or translator, so they need their own macro. macro_rules! impl_resolver_keyless { ($db:ident, $op:ident, $val_bound:ident) => { impl Resolver for Arc<$db> where F: Family, E: Context, V: $val_bound + Send + Sync + 'static, H: Hasher, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = qmdb::Error; async fn get_operations( &self, op_count: Location, start_loc: Location, max_ops: NonZeroU64, include_pinned_nodes: bool, _cancel_rx: oneshot::Receiver<()>, ) -> Result, Self::Error> { fetch_operations( op_count, start_loc, max_ops, include_pinned_nodes, |op_count, start_loc, max_ops| { self.historical_proof(op_count, start_loc, max_ops) }, |start_loc| self.pinned_nodes_at(start_loc), ) .await } } impl Resolver for Arc>> where F: Family, E: Context, V: $val_bound + Send + Sync + 'static, H: Hasher, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = qmdb::Error; async fn get_operations( &self, op_count: Location, start_loc: Location, max_ops: NonZeroU64, include_pinned_nodes: bool, _cancel_rx: oneshot::Receiver<()>, ) -> Result, Self::Error> { let db = self.read().await; fetch_operations( op_count, start_loc, max_ops, include_pinned_nodes, |op_count, start_loc, max_ops| { db.historical_proof(op_count, start_loc, max_ops) }, |start_loc| db.pinned_nodes_at(start_loc), ) .await } } impl Resolver for Arc>>> where F: Family, E: Context, V: $val_bound + Send + Sync + 'static, H: Hasher, S: Strategy, { type Family = F; type Digest = H::Digest; type Op = $op; type Error = qmdb::Error; async fn get_operations( &self, op_count: Location, start_loc: Location, max_ops: NonZeroU64, include_pinned_nodes: bool, _cancel_rx: oneshot::Receiver<()>, ) -> Result, Self::Error> { let guard = self.read().await; let db = guard.as_ref().ok_or(qmdb::Error::KeyNotFound)?; fetch_operations( op_count, start_loc, max_ops, include_pinned_nodes, |op_count, start_loc, max_ops| { db.historical_proof(op_count, start_loc, max_ops) }, |start_loc| db.pinned_nodes_at(start_loc), ) .await } } }; } // Keyless Fixed impl_resolver_keyless!(KeylessFixedDb, KeylessFixedOp, FixedValue); // Keyless Variable impl_resolver_keyless!(KeylessVariableDb, KeylessVariableOp, VariableValue); #[cfg(test)] pub(crate) mod tests { use super::*; use crate::{ merkle::mmr, translator::{OneCap, TwoCap}, }; use commonware_cryptography::{sha256::Digest as ShaDigest, Sha256}; use commonware_parallel::Rayon; use commonware_runtime::deterministic; use commonware_utils::sync::AsyncRwLock; use std::{marker::PhantomData, sync::Arc}; macro_rules! assert_resolver_variants { ($db:ty) => { assert_resolver::>(); assert_resolver::>>(); assert_resolver::>>>(); }; } fn assert_resolver() {} fn empty_proof() -> Proof { Proof { leaves: Location::new(0), inactive_peaks: 0, digests: vec![], } } #[test] fn test_fetch_result_new_has_no_success_acknowledgement() { let result = FetchResult::::new(empty_proof(), vec![], None); assert!(result.callback.is_none()); } #[test] fn test_fetch_result_with_callback_reports_to_external_receiver() { let (success_tx, mut success_rx) = oneshot::channel(); let result = FetchResult::::with_callback( empty_proof(), vec![], None, success_tx, ); assert!(result.callback.expect("success sender").send(true).is_ok()); assert_eq!(success_rx.try_recv(), Ok(true)); } /// A resolver that always fails. #[derive(Clone)] pub struct FailResolver { _phantom: PhantomData<(F, Op, D)>, } impl Resolver for FailResolver where F: Family, D: Digest, Op: Send + Sync + Clone + 'static, { type Family = F; type Digest = D; type Op = Op; type Error = qmdb::Error; async fn get_operations( &self, _op_count: Location, _start_loc: Location, _max_ops: NonZeroU64, _include_pinned_nodes: bool, _cancel: oneshot::Receiver<()>, ) -> Result, qmdb::Error> { Err(qmdb::Error::KeyNotFound) // Arbitrary dummy error } } impl FailResolver { pub fn new() -> Self { Self { _phantom: PhantomData, } } } #[test] fn test_all_qmdb_variants_implement_strategy_resolvers() { type AnyOrderedFixed = crate::qmdb::any::ordered::fixed::Db< mmr::Family, deterministic::Context, ShaDigest, ShaDigest, Sha256, OneCap, Rayon, >; type AnyOrderedVariable = crate::qmdb::any::ordered::variable::Db< mmr::Family, deterministic::Context, ShaDigest, Vec, Sha256, OneCap, Rayon, >; type AnyUnorderedFixed = crate::qmdb::any::unordered::fixed::Db< mmr::Family, deterministic::Context, ShaDigest, ShaDigest, Sha256, TwoCap, Rayon, >; type AnyUnorderedVariable = crate::qmdb::any::unordered::variable::Db< mmr::Family, deterministic::Context, ShaDigest, Vec, Sha256, TwoCap, Rayon, >; type CurrentOrderedFixed = crate::qmdb::current::ordered::fixed::Db< mmr::Family, deterministic::Context, ShaDigest, ShaDigest, Sha256, OneCap, 32, Rayon, >; type CurrentOrderedVariable = crate::qmdb::current::ordered::variable::Db< mmr::Family, deterministic::Context, ShaDigest, Vec, Sha256, OneCap, 32, Rayon, >; type CurrentUnorderedFixed = crate::qmdb::current::unordered::fixed::Db< mmr::Family, deterministic::Context, ShaDigest, ShaDigest, Sha256, TwoCap, 32, Rayon, >; type CurrentUnorderedVariable = crate::qmdb::current::unordered::variable::Db< mmr::Family, deterministic::Context, ShaDigest, Vec, Sha256, TwoCap, 32, Rayon, >; type ImmutableFixed = crate::qmdb::immutable::fixed::Db< mmr::Family, deterministic::Context, ShaDigest, ShaDigest, Sha256, TwoCap, Rayon, >; type ImmutableVariable = crate::qmdb::immutable::variable::Db< mmr::Family, deterministic::Context, ShaDigest, Vec, Sha256, TwoCap, Rayon, >; type KeylessFixed = crate::qmdb::keyless::fixed::Db< mmr::Family, deterministic::Context, ShaDigest, Sha256, Rayon, >; type KeylessVariable = crate::qmdb::keyless::variable::Db< mmr::Family, deterministic::Context, Vec, Sha256, Rayon, >; assert_resolver_variants!(AnyOrderedFixed); assert_resolver_variants!(AnyOrderedVariable); assert_resolver_variants!(AnyUnorderedFixed); assert_resolver_variants!(AnyUnorderedVariable); assert_resolver_variants!(CurrentOrderedFixed); assert_resolver_variants!(CurrentOrderedVariable); assert_resolver_variants!(CurrentUnorderedFixed); assert_resolver_variants!(CurrentUnorderedVariable); assert_resolver_variants!(ImmutableFixed); assert_resolver_variants!(ImmutableVariable); assert_resolver_variants!(KeylessFixed); assert_resolver_variants!(KeylessVariable); } }