use super::Variant; use crate::types::Height; use commonware_utils::{futures::OptionFuture, Acknowledgement}; use futures::FutureExt; use pin_project::pin_project; use std::{ collections::VecDeque, future::Future, pin::Pin, task::{Context, Poll}, }; /// A pending acknowledgement from the application for a block at the contained height/commitment. #[pin_project] pub(super) struct PendingAck { pub(super) height: Height, pub(super) commitment: V::Commitment, #[pin] pub(super) receiver: A::Waiter, } impl Future for PendingAck { type Output = ::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.project().receiver.poll(cx) } } /// Tracks in-flight application acknowledgements with FIFO semantics. pub(super) struct PendingAcks { current: OptionFuture>, queue: VecDeque>, max: usize, } impl PendingAcks { /// Creates a new pending-ack tracker with a maximum in-flight capacity. pub(super) fn new(max: usize) -> Self { Self { current: None.into(), queue: VecDeque::with_capacity(max), max, } } /// Drops the current ack and all queued acks. pub(super) fn clear(&mut self) { self.current = None.into(); self.queue.clear(); } /// Returns the currently armed ack future (if any) for `select_loop!`. pub(super) const fn current(&mut self) -> &mut OptionFuture> { &mut self.current } /// Returns whether we can dispatch another block without exceeding capacity. pub(super) fn has_capacity(&self) -> bool { let reserved = usize::from(self.current.is_some()); self.queue.len() < self.max - reserved } /// Returns the next height to dispatch while preserving sequential order. pub(super) fn next_dispatch_height(&self, start_height: Height) -> Height { self.queue .back() .map(|ack| ack.height.next()) .or_else(|| self.current.as_ref().map(|ack| ack.height.next())) .unwrap_or(start_height) } /// Enqueues a newly dispatched ack, arming it immediately when idle. pub(super) fn enqueue(&mut self, ack: PendingAck) { if self.current.is_none() { self.current.replace(ack); return; } self.queue.push_back(ack); } /// Returns metadata for a completed current ack and arms the next queued ack. pub(super) fn complete_current( &mut self, result: ::Output, ) -> (Height, V::Commitment, ::Output) { let PendingAck { height, commitment, .. } = self.current.take().expect("ack state must be present"); if let Some(next) = self.queue.pop_front() { self.current.replace(next); } (height, commitment, result) } /// If the current ack is already resolved, takes it and arms the next ack. pub(super) fn pop_ready( &mut self, ) -> Option<(Height, V::Commitment, ::Output)> { let pending = self.current.as_mut()?; let result = Pin::new(&mut pending.receiver).now_or_never()?; Some(self.complete_current(result)) } } #[cfg(test)] mod tests { use super::*; use crate::{ marshal::{mocks::block::Block, standard::Standard}, types::Height, }; use commonware_cryptography::sha256::{Digest, Sha256}; use commonware_utils::acknowledgement::Exact; type TestBlock = Block; type TestVariant = Standard; fn digest(byte: u8) -> Digest { Sha256::fill(byte) } fn pending_ack(height: u64, byte: u8) -> (PendingAck, Exact) { let (ack, receiver) = Exact::handle(); ( PendingAck { height: Height::new(height), commitment: digest(byte), receiver, }, ack, ) } #[test] fn enqueue_tracks_capacity_and_fifo_ready_order() { let mut pending = PendingAcks::::new(2); assert!(pending.has_capacity()); assert_eq!(pending.next_dispatch_height(Height::new(8)), Height::new(8)); let (first, first_ack) = pending_ack(8, 1); pending.enqueue(first); assert!(pending.has_capacity()); assert_eq!(pending.next_dispatch_height(Height::new(8)), Height::new(9)); let (second, second_ack) = pending_ack(9, 2); pending.enqueue(second); assert!(!pending.has_capacity()); assert_eq!( pending.next_dispatch_height(Height::new(8)), Height::new(10) ); second_ack.acknowledge(); assert!(pending.pop_ready().is_none()); first_ack.acknowledge(); let (height, commitment, result) = pending.pop_ready().expect("first ack should be ready"); assert_eq!(height, Height::new(8)); assert_eq!(commitment, digest(1)); assert!(result.is_ok()); let (height, commitment, result) = pending .pop_ready() .expect("queued ready ack should be armed next"); assert_eq!(height, Height::new(9)); assert_eq!(commitment, digest(2)); assert!(result.is_ok()); assert!(pending.has_capacity()); } #[test] fn clear_drops_all_pending_acks() { let mut pending = PendingAcks::::new(2); let (first, first_ack) = pending_ack(3, 1); let (second, second_ack) = pending_ack(4, 2); pending.enqueue(first); pending.enqueue(second); assert!(!pending.has_capacity()); pending.clear(); first_ack.acknowledge(); second_ack.acknowledge(); assert!(pending.pop_ready().is_none()); assert!(pending.has_capacity()); assert_eq!( pending.next_dispatch_height(Height::new(10)), Height::new(10) ); } }