use super::{ config::Config, fetcher::{Config as FetcherConfig, Fetcher}, inflight::Inflight, ingress::{FetchKey, Mailbox, Message}, metrics, wire, Producer, }; use crate::{subscribers, Consumer, Delivery}; use bytes::Bytes; use commonware_actor::mailbox; use commonware_cryptography::PublicKey; use commonware_macros::select_loop; use commonware_p2p::{ utils::codec::{wrap, WrappedSender}, Blocker, Provider, Receiver, Recipients, Sender, }; use commonware_runtime::{ spawn_cell, telemetry::metrics::{histogram, status::Status, GaugeExt}, BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, }; use commonware_utils::{channel::oneshot, futures::Pool as FuturesPool, Span}; use futures::future::{self, Either}; use rand::Rng; use std::marker::PhantomData; use tracing::{debug, error, trace, warn}; /// Represents a pending serve operation. struct Serve { timer: histogram::Timer, peer: P, id: u64, result: Result, } /// Manages incoming and outgoing P2P requests, coordinating fetch and serve operations. pub struct Engine where E: BufferPooler + Clock + Spawner + Rng + Metrics, P: PublicKey, D: Provider, B: Blocker, Key: Span, Con: Consumer, Pro: Producer, NetS: Sender, NetR: Receiver, Con::Subscriber: Eq, { /// Context used to spawn tasks, manage time, etc. context: ContextCell, /// Produces data for incoming requests producer: Pro, /// Manages the list of peers that can be used to fetch data peer_provider: D, /// The blocker that will be used to block peers that send invalid responses blocker: B, /// Used to detect changes in the peer set last_peer_set_id: Option, /// Mailbox that makes and prunes fetches mailbox: mailbox::Receiver>, /// Manages outgoing fetch requests fetcher: Fetcher, /// Tracks all in-flight fetch state inflight: Inflight, /// Subscribers that keep each fetch alive. subscribers: subscribers::Tracker, /// Holds futures that resolve once the `Producer` has produced the data. /// Once the future is resolved, the data (or an error) is sent to the peer. /// Has unbounded size; the number of concurrent requests should be limited /// by the `Producer` which may drop requests. serves: FuturesPool>, /// Whether responses are sent with priority over other network messages priority_responses: bool, /// Metrics for the peer actor metrics: metrics::Metrics, /// Phantom data for networking types _r: PhantomData, } impl Engine where E: BufferPooler + Clock + Spawner + Rng + Metrics, P: PublicKey, D: Provider, B: Blocker, Key: Span, Con: Consumer, Pro: Producer, NetS: Sender, NetR: Receiver, Con::Subscriber: Clone + Ord + Send + 'static, { /// Creates a new `Actor` with the given configuration. /// /// Returns the actor and a mailbox to send messages to it. pub fn new( context: E, cfg: Config, ) -> (Self, Mailbox) { let (sender, receiver) = mailbox::new(context.child("mailbox"), cfg.mailbox_size); let metrics = metrics::Metrics::init(&context); let fetcher = Fetcher::new( context.child("fetcher"), FetcherConfig { me: cfg.me, initial: cfg.initial, timeout: cfg.timeout, retry_timeout: cfg.fetch_retry_timeout, priority_requests: cfg.priority_requests, }, ); ( Self { context: ContextCell::new(context), producer: cfg.producer, peer_provider: cfg.peer_provider, blocker: cfg.blocker, last_peer_set_id: None, mailbox: receiver, fetcher, inflight: Inflight::new(cfg.consumer), subscribers: subscribers::Tracker::new(), serves: FuturesPool::default(), priority_responses: cfg.priority_responses, metrics, _r: PhantomData, }, Mailbox::new(sender), ) } /// Runs the actor until the context is stopped. /// /// The actor will handle: /// - Fetching data from other peers and notifying the `Consumer` /// - Serving data to other peers by requesting it from the `Producer` pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> { spawn_cell!(self.context, self.run(network)) } /// Inner run loop called by `start`. async fn run(mut self, network: (NetS, NetR)) { // Wrap channel let (mut sender, mut receiver) = wrap( (), self.context.network_buffer_pool().clone(), network.0, network.1, ); let mut peer_set_subscription = self.peer_provider.subscribe().await; select_loop! { self.context, on_start => { // Update metrics let _ = self .metrics .fetch_pending .try_set(self.fetcher.len_pending()); let _ = self.metrics.fetch_active.try_set(self.fetcher.len_active()); let _ = self .metrics .peers_blocked .try_set(self.fetcher.len_blocked()); let _ = self.metrics.serve_processing.try_set(self.serves.len()); // Get retry timeout (if any) let deadline_pending = match self.fetcher.get_pending_deadline() { Some(deadline) => Either::Left(self.context.sleep_until(deadline)), None => Either::Right(future::pending()), }; // Get requester timeout (if any) let deadline_active = match self.fetcher.get_active_deadline() { Some(deadline) => Either::Left(self.context.sleep_until(deadline)), None => Either::Right(future::pending()), }; }, on_stopped => { debug!("shutdown"); self.inflight.drain(); self.subscribers.clear(); self.serves.cancel_all(); }, // Handle peer set updates Some(update) = peer_set_subscription.recv() else { debug!("peer set subscription closed"); return; } => { if self.last_peer_set_id < Some(update.index) { self.last_peer_set_id = Some(update.index); self.fetcher.reconcile(update.latest.primary.as_ref()); } }, // Handle active deadline _ = deadline_active => { if let Some(key) = self.fetcher.pop_active() { debug!(?key, "requester timeout"); self.metrics.fetch.inc(Status::Failure); self.fetcher.add_retry(key); } }, // Handle pending deadline _ = deadline_pending => { self.fetcher.fetch(&mut sender); }, // Handle mailbox messages Some(msg) = self.mailbox.recv() else { error!("mailbox closed"); return; } => { match msg { Message::Fetch(keys) => { for FetchKey { key, subscribers, metadata: targets, } in keys { trace!(?key, "mailbox: fetch"); // Check if the fetch is already in progress let is_new = !self.inflight.contains(&key); self.subscribers.insert(key.clone(), subscribers); // Update targets match targets { Some(targets) => { // Only add targets if this is a new fetch OR the existing // fetch already has targets. Don't restrict an "all" fetch // (no targets) to specific targets. if is_new || self.fetcher.has_targets(&key) { self.fetcher.add_targets(key.clone(), targets); } } None => self.fetcher.clear_targets(&key), } // Only start new fetch if not already in progress if is_new { self.inflight.insert( key.clone(), self.metrics.fetch_duration.timer(self.context.as_ref()), ); self.fetcher.add_ready(key); } else { trace!(?key, "updated targets for existing fetch"); } } } Message::Retain { predicate } => { trace!("mailbox: retain"); self.subscribers .retain(|key, subscriber| predicate(key, subscriber)); let subscribers = &self.subscribers; self.fetcher.retain(|key| subscribers.contains(key)); let count = self.inflight.retain(|key| subscribers.contains(key)) as u64; self.record_cancellations(count); } } }, // Handle completed consumer deliveries delivery = self.inflight.next_delivery() => { // If the delivery was aborted, its inflight entry was dropped (via // Retain or shutdown) before the consumer finished validating. let (peer, delivery, result) = match delivery { Ok(delivery) => delivery, Err(_) => continue, }; self.handle_delivery(peer, delivery, result); }, // Handle completed server requests serve = self.serves.next_completed() => { let Serve { timer, peer, id, result, } = serve; // Metrics and logs match result { Ok(_) => { timer.observe(self.context.as_ref()); self.metrics.serve.inc(Status::Success); } Err(ref err) => { debug!(?err, ?peer, ?id, "serve failed"); self.metrics.serve.inc(Status::Failure); } } // Send response to peer self.handle_serve(&mut sender, peer, id, result, self.priority_responses); }, // Handle network messages msg = receiver.recv() => { // Break if the receiver is closed let (peer, msg) = match msg { Ok(msg) => msg, Err(err) => { error!(?err, "receiver closed"); return; } }; // Skip if there is a decoding error let msg = match msg { Ok(msg) => msg, Err(err) => { trace!(?err, ?peer, "decode failed"); continue; } }; match msg.payload { wire::Payload::Request(key) => self.handle_network_request(peer, msg.id, key), wire::Payload::Response(response) => { self.handle_network_response(peer, msg.id, response) } wire::Payload::Error => self.handle_network_error_response(peer, msg.id), }; }, } } /// Record cancellation metrics for a retain-style operation. fn record_cancellations(&mut self, count: u64) { if count == 0 { self.metrics.cancel.inc(Status::Dropped); } else { self.metrics.cancel.inc_by(Status::Success, count); } } /// Handles the case where the application responds to a request from an external peer. fn handle_serve( &mut self, sender: &mut WrappedSender>, peer: P, id: u64, response: Result, priority: bool, ) { // Encode message let payload: wire::Payload = response.map_or_else( |_| wire::Payload::Error, |data| wire::Payload::Response(data), ); let msg = wire::Message { id, payload }; // Send message to peer let result = sender.send(Recipients::One(peer.clone()), msg, priority); // Log result, but do not handle errors. if result.is_empty() { warn!(?peer, ?id, "serve send failed"); } else { trace!(?peer, ?id, "serve sent"); }; } /// Handle a network request from a peer. fn handle_network_request(&mut self, peer: P, id: u64, key: Key) { // Serve the request trace!(?peer, ?id, "peer request"); let mut producer = self.producer.clone(); let timer = self.metrics.serve_duration.timer(self.context.as_ref()); let receiver = producer.produce(key); self.serves.push(async move { let result = receiver.await; Serve { timer, peer, id, result, } }); } /// Handle a network response from a peer. fn handle_network_response(&mut self, peer: P, id: u64, response: Bytes) { trace!(?peer, ?id, "peer response: data"); // Get the key associated with the response, if any let Some(key) = self.fetcher.pop_by_id(id, &peer, true) else { // It's possible that the key does not exist if the request was pruned. return; }; let Some(subscribers) = self.subscribers.pending(&key) else { warn!(?key, "response for fetch with no subscribers"); self.inflight.cancel(&key); return; }; let delivery = Delivery { key: key.clone(), subscribers, }; // The peer had the data, so deliver it to the consumer without blocking the engine. self.inflight.deliver(delivery, peer, response); } /// Handle completed delivery to the consumer. fn handle_delivery(&mut self, peer: P, delivery: Delivery, valid: bool) { let Delivery { key, subscribers: delivered, } = delivery; if valid { let already_accepted = self.inflight.response_accepted(&key); // Remove only the subscribers that accepted this response. If other // subscribers still need the key, deliver the same accepted response // locally with the remaining annotations. let remaining = self.subscribers.remove_delivered(&key, delivered); if let Some(subscribers) = remaining { if !already_accepted { self.metrics.fetch.inc(Status::Success); self.inflight.accept_response(&key, self.context.as_ref()); } self.inflight.redeliver(Delivery { key, subscribers }); } else { // All subscribers observed a valid response; clear any targeting // state retained for this key. if !already_accepted { self.metrics.fetch.inc(Status::Success); } self.inflight.complete(self.context.as_ref(), &key); self.fetcher.clear_targets(&key); } return; } if self.inflight.response_accepted(&key) { warn!( ?key, "previously accepted response was rejected during local redelivery" ); self.metrics.fetch.inc(Status::Failure); self.inflight.complete(self.context.as_ref(), &key); self.subscribers.remove(&key); self.fetcher.clear_targets(&key); return; } // If the data is invalid, block the peer and try again. Blocking the // peer also removes any targets associated with it. commonware_p2p::block!(self.blocker, peer.clone(), "invalid data received"); self.fetcher.block(peer); self.metrics.fetch.inc(Status::Failure); self.inflight.discard_response(&key); self.fetcher.add_retry(key); } /// Handle a network response from a peer that did not have the data. fn handle_network_error_response(&mut self, peer: P, id: u64) { trace!(?peer, ?id, "peer response: error"); // Get the key associated with the response, if any let Some(key) = self.fetcher.pop_by_id(id, &peer, false) else { // It's possible that the key does not exist if the request was pruned. return; }; // The peer did not have the data, so we need to try again self.metrics.fetch.inc(Status::Failure); self.fetcher.add_retry(key); } }