use super::{ ingress::{Mailbox, Message}, Config, }; use crate::{ threshold_simplex::{ actors::voter, types::{Backfiller, Notarization, Nullification, Request, Response, View, Voter}, }, ThresholdSupervisor, Viewable, }; use commonware_cryptography::{bls12381::primitives::variant::Variant, Digest, PublicKey}; use commonware_macros::select; use commonware_p2p::{ utils::{ codec::{wrap, WrappedSender}, requester, }, Blocker, Receiver, Recipients, Sender, }; use commonware_runtime::{Clock, Handle, Metrics, Spawner}; use futures::{channel::mpsc, future::Either, StreamExt}; use governor::clock::Clock as GClock; use prometheus_client::metrics::{counter::Counter, gauge::Gauge}; use rand::{seq::IteratorRandom, Rng}; use std::{ cmp::Ordering, collections::{BTreeMap, BTreeSet}, time::{Duration, SystemTime}, }; use tracing::{debug, warn}; /// Task in the required set. #[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord)] enum Task { Notarization, Nullification, } /// Entry in the required set. #[derive(Clone, Eq, PartialEq)] struct Entry { task: Task, view: View, } impl Ord for Entry { fn cmp(&self, other: &Self) -> Ordering { match self.view.cmp(&other.view) { Ordering::Equal => self.task.cmp(&other.task), ordering => ordering, } } } impl PartialOrd for Entry { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } /// Tracks the contents of inflight requests to avoid duplicate work. struct Inflight { all: BTreeSet, requests: BTreeMap>, } impl Inflight { fn new() -> Self { Self { all: BTreeSet::new(), requests: BTreeMap::new(), } } /// Check if the entry is already inflight. fn contains(&self, entry: &Entry) -> bool { self.all.contains(entry) } /// Add a new request to the inflight set. fn add(&mut self, request: requester::ID, entries: Vec) { for entry in entries.iter() { self.all.insert(entry.clone()); } self.requests.insert(request, entries); } /// Clear a request from the inflight set. fn clear(&mut self, request: requester::ID) { if let Some(entries) = self.requests.remove(&request) { for entry in entries { self.all.remove(&entry); } } } } /// Requests are made concurrently to multiple peers. pub struct Actor< E: Clock + GClock + Rng + Metrics + Spawner, C: PublicKey, B: Blocker, V: Variant, D: Digest, S: ThresholdSupervisor< Index = View, PublicKey = C, Identity = V::Public, Polynomial = Vec, >, > { context: E, blocker: B, supervisor: S, namespace: Vec, notarizations: BTreeMap>, nullifications: BTreeMap>, activity_timeout: u64, required: BTreeSet, inflight: Inflight, retry: Option, mailbox_receiver: mpsc::Receiver>, fetch_timeout: Duration, max_fetch_count: usize, fetch_concurrent: usize, requester: requester::Requester, unfulfilled: Gauge, outstanding: Gauge, served: Counter, } impl< E: Clock + GClock + Rng + Metrics + Spawner, C: PublicKey, B: Blocker, V: Variant, D: Digest, S: ThresholdSupervisor< Index = View, PublicKey = C, Identity = V::Public, Polynomial = Vec, >, > Actor { pub fn new(context: E, cfg: Config) -> (Self, Mailbox) { // Initialize requester let config = requester::Config { public_key: cfg.crypto, rate_limit: cfg.fetch_rate_per_peer, initial: cfg.fetch_timeout / 2, timeout: cfg.fetch_timeout, }; let requester = requester::Requester::new(context.clone(), config); // Initialize metrics let unfulfilled = Gauge::default(); let outstanding = Gauge::default(); let served = Counter::default(); context.register( "unfulfilled", "unfulfilled notarizations/nullifications", unfulfilled.clone(), ); context.register("outstanding", "outstanding requests", outstanding.clone()); context.register( "served", "served notarizations/nullifications", served.clone(), ); // Initialize mailbox let (sender, receiver) = mpsc::channel(cfg.mailbox_size); ( Self { context, blocker: cfg.blocker, supervisor: cfg.supervisor, namespace: cfg.namespace, notarizations: BTreeMap::new(), nullifications: BTreeMap::new(), activity_timeout: cfg.activity_timeout, required: BTreeSet::new(), inflight: Inflight::new(), retry: None, mailbox_receiver: receiver, fetch_timeout: cfg.fetch_timeout, max_fetch_count: cfg.max_fetch_count, fetch_concurrent: cfg.fetch_concurrent, requester, unfulfilled, outstanding, served, }, Mailbox::new(sender), ) } /// Concurrent indicates whether we should send a new request (only if we see a request for the first time) async fn send>( &mut self, shuffle: bool, sender: &mut WrappedSender>, ) { // Clear retry self.retry = None; // We try to send as many requests as possible at the same time for unfulfilled notarizations and nullifications. loop { // If we have too many requests outstanding, return if self.requester.len() >= self.fetch_concurrent { return; } // We assume nothing about the usefulness (or existence) of any given entry, so we sample // the iterator to ensure we eventually try to fetch everything requested. let entries = self .required .iter() .filter(|entry| !self.inflight.contains(entry)) .choose_multiple(&mut self.context, self.max_fetch_count); if entries.is_empty() { return; } // Select entries up to configured limits let mut notarizations = Vec::new(); let mut nullifications = Vec::new(); let mut inflight = Vec::new(); for entry in entries { inflight.push(entry.clone()); match entry.task { Task::Notarization => notarizations.push(entry.view), Task::Nullification => nullifications.push(entry.view), } if notarizations.len() + nullifications.len() >= self.max_fetch_count { break; } } // If nothing to do, return if notarizations.is_empty() && nullifications.is_empty() { return; } // Select next recipient let mut msg = Request::new(0, notarizations.clone(), nullifications.clone()); loop { // Get next best let Some((recipient, request)) = self.requester.request(shuffle) else { // If we have outstanding items but there are no recipients available, set // a deadline to retry and return. // // We return instead of waiting to continue serving requests and in case we // learn of new notarizations or nullifications in the meantime. warn!("failed to send request to any validator"); let deadline = self .context .current() .checked_add(self.fetch_timeout) .expect("time overflowed"); self.retry = Some(deadline); return; }; // Create new message msg.id = request; let encoded = Backfiller::::Request(msg.clone()); // Try to send if sender .send(Recipients::One(recipient.clone()), encoded, false) .await .unwrap() .is_empty() { // Try again (treating past request as timeout) let request = self.requester.cancel(request).unwrap(); self.requester.timeout(request); debug!(peer = ?recipient, "failed to send request"); continue; } // Exit if sent self.inflight.add(request, inflight); debug!( peer = ?recipient, ?notarizations, ?nullifications, "sent request" ); break; } } } pub fn start( mut self, voter: voter::Mailbox, sender: impl Sender, receiver: impl Receiver, ) -> Handle<()> { self.context.spawn_ref()(self.run(voter, sender, receiver)) } async fn run( mut self, mut voter: voter::Mailbox, sender: impl Sender, receiver: impl Receiver, ) { // Wrap channel let (mut sender, mut receiver) = wrap(self.max_fetch_count, sender, receiver); // Wait for an event let mut current_view = 0; let mut finalized_view = 0; let identity = *self.supervisor.identity(); loop { // Record outstanding metric self.unfulfilled.set(self.required.len() as i64); self.outstanding.set(self.requester.len() as i64); // Set timeout for retry let retry = match self.retry { Some(retry) => Either::Left(self.context.sleep_until(retry)), None => Either::Right(futures::future::pending()), }; // Set timeout for next request let (request, timeout) = if let Some((request, timeout)) = self.requester.next() { (request, Either::Left(self.context.sleep_until(timeout))) } else { (0, Either::Right(futures::future::pending())) }; // Wait for an event select! { _ = retry => { // Retry sending after rate limiting self.send(false, &mut sender).await; }, _ = timeout => { // Penalize peer for timeout let request = self.requester.cancel(request).expect("request not found"); self.inflight.clear(request.id); self.requester.timeout(request); // Send message self.send(true, &mut sender).await; }, mailbox = self.mailbox_receiver.next() => { let msg = match mailbox { Some(msg) => msg, None => break, }; match msg { Message::Fetch { notarizations, nullifications } => { // Add to all outstanding required for view in notarizations { self.required.insert(Entry { task: Task::Notarization, view }); debug!(?view, "notarization required"); } for view in nullifications { self.required.insert(Entry { task: Task::Nullification, view }); debug!(?view, "nullification required"); } // Trigger fetch of new notarizations and nullifications as soon as possible self.send(false, &mut sender).await; } Message::Notarized { notarization } => { // Update current view let view = notarization.view(); if view > current_view { current_view = view; } else { continue; } // Update stored validators let validators = self.supervisor.participants(view).unwrap(); self.requester.reconcile(validators); // If waiting for this notarization, remove it self.required.remove(&Entry { task: Task::Notarization, view }); // Add notarization to cache self.notarizations.insert(view, notarization); } Message::Nullified { nullification } => { // Update current view let view = nullification.view; if view > current_view { current_view = view; } else { continue; } // Update stored validators let validators = self.supervisor.participants(view).unwrap(); self.requester.reconcile(validators); // If waiting for this nullification, remove it self.required.remove(&Entry { task: Task::Nullification, view }); // Add nullification to cache self.nullifications.insert(view, nullification); } Message::Finalized { view } => { // Update current view if view > current_view { current_view = view; } if view > finalized_view { finalized_view = view; } else { continue; } // Remove outstanding self.required.retain(|entry| entry.view >= view); // Set prune depth if view < self.activity_timeout { continue; } let min_view = view - self.activity_timeout; // Remove unneeded cache // // We keep some buffer of old messages around in case it helps other // peers. self.notarizations.retain(|k, _| *k >= min_view); self.nullifications.retain(|k, _| *k >= min_view); } } }, network = receiver.recv() => { // Break if there is an internal error let Ok((s, msg)) = network else { break; }; // Skip if there is a decoding error let msg = match msg { Ok(msg) => msg, Err(err) => { warn!(?err, sender = ?s, "blocking peer"); self.requester.block(s.clone()); self.blocker.block(s).await; continue; }, }; match msg{ Backfiller::Request(request) => { let mut notarizations = Vec::new(); let mut missing_notarizations = Vec::new(); let mut notarizations_found = Vec::new(); let mut nullifications = Vec::new(); let mut missing_nullifications = Vec::new(); let mut nullifications_found = Vec::new(); // Populate notarizations first for view in request.notarizations { if let Some(notarization) = self.notarizations.get(&view) { notarizations.push(view); notarizations_found.push(notarization.clone()); self.served.inc(); } else { missing_notarizations.push(view); } } // Populate nullifications next for view in request.nullifications { if let Some(nullification) = self.nullifications.get(&view) { nullifications.push(view); nullifications_found.push(nullification.clone()); self.served.inc(); } else { missing_nullifications.push(view); } } // Send response debug!(sender = ?s, ?notarizations, ?missing_notarizations, ?nullifications, ?missing_nullifications, "sending response"); let response = Response::new(request.id, notarizations_found, nullifications_found); let response = Backfiller::Response(response); sender .send(Recipients::One(s), response, false) .await .unwrap(); }, Backfiller::Response(response) => { // Ensure we were waiting for this response let Some(request) = self.requester.handle(&s, response.id) else { debug!(sender = ?s, "unexpected message"); continue; }; self.inflight.clear(request.id); // Verify message if !response.verify(&self.namespace, &identity) { warn!(sender = ?s, "blocking peer"); self.requester.block(s.clone()); self.blocker.block(s).await; continue; } // Update cache let mut voters = Vec::with_capacity(response.notarizations.len() + response.nullifications.len()); let mut notarizations_found = BTreeSet::new(); for notarization in response.notarizations { let view = notarization.view(); let entry = Entry { task: Task::Notarization, view }; if !self.required.remove(&entry) { debug!(view, sender = ?s, "unnecessary notarization"); continue; } self.notarizations.insert(view, notarization.clone()); voters.push(Voter::Notarization(notarization)); notarizations_found.insert(view); } let mut nullifications_found = BTreeSet::new(); for nullification in response.nullifications { let view = nullification.view; let entry = Entry { task: Task::Nullification, view }; if !self.required.remove(&entry) { debug!(view, sender = ?s, "unnecessary nullification"); continue; } self.nullifications.insert(view, nullification.clone()); voters.push(Voter::Nullification(nullification)); nullifications_found.insert(view); } // Send voters voter.verified(voters).await; // Update performance let mut shuffle = false; if !notarizations_found.is_empty() || !nullifications_found.is_empty() { self.requester.resolve(request); debug!( sender = ?s, notarizations = ?notarizations_found, nullifications = ?nullifications_found, "response useful", ); } else { // We don't reward a peer for sending us a response that doesn't help us shuffle = true; debug!(sender = ?s, "response not useful"); } // If still work to do, send another request self.send(shuffle, &mut sender).await; }, } }, } } } }