//! Waiter identity and lifecycle state for tracked io_uring requests. //! //! This module manages waiter IDs, request lifecycle transitions, and //! outstanding-operation tracking. It is the source of truth for which logical //! requests are still tracked and whether each currently has an operation SQE //! outstanding. use super::{request::Request, Tick, UserData}; use io_uring::squeue::Entry as SqueueEntry; use tracing::warn; /// Stable waiter identity packed into SQE/CQE `user_data`. /// /// Layout: /// - bits 0..31: slot index /// - bits 32..62: generation (31 bits, wraps at 2^31) /// - bit 63: reserved as cancel-tag in completion `user_data` /// /// The generation counter detects stale CQEs that arrive after a slot has been /// recycled. In normal (non-cancel) operation this cannot happen: a slot is /// only freed after its CQE is processed, so the slot cannot be reused before /// the CQE is consumed. With cancellation, the original op CQE can arrive /// before the cancel CQE. When this happens the slot is freed and may be /// recycled while the cancel CQE is still pending. The generation check /// discards that stale cancel CQE. The 31-bit generation wraps after ~2 billion /// reuses of the same slot, but cancellation is run synchronously on the kernel /// side (a CQE is always generated by the time the cancel request has been /// submitted, see /// ), /// so a wrap-around collision is not feasible in practice. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct WaiterId(UserData); impl WaiterId { /// Number of low-order bits reserved for the waiter slot index. const INDEX_BITS: u32 = 32; /// Number of bits reserved for the generation field. const GENERATION_BITS: u32 = 31; /// Bitmask that extracts the waiter slot index from packed user data. const INDEX_MASK: UserData = (1u64 << Self::INDEX_BITS) - 1; /// Bitmask that extracts the 31-bit generation from packed user data. const GENERATION_MASK: UserData = (1u64 << Self::GENERATION_BITS) - 1; /// High-bit tag used to mark cancellation CQE user data. const CANCEL_TAG: UserData = 1u64 << 63; /// Build a waiter id from slot index and generation components. pub const fn new(index: u32, generation: u32) -> Self { let index = index as UserData; let generation = generation as UserData; Self((generation & Self::GENERATION_MASK) << Self::INDEX_BITS | index) } /// Return the slot index component of this waiter id. pub const fn index(self) -> u32 { (self.0 & Self::INDEX_MASK) as u32 } /// Return the generation component of this waiter id. const fn generation(self) -> u32 { ((self.0 >> Self::INDEX_BITS) & Self::GENERATION_MASK) as u32 } /// Return the waiter id for the same slot with incremented generation. const fn next_generation(self) -> Self { let generation = ((self.generation() as UserData).wrapping_add(1)) & Self::GENERATION_MASK; Self::new(self.index(), generation as u32) } /// Encode this waiter id as `user_data` for the operation SQE/CQE. /// /// This value contains only the packed waiter identity (slot + generation), /// with the cancel tag bit clear. pub const fn user_data(self) -> UserData { self.0 } /// Encode this waiter id as `user_data` for the cancel SQE/CQE. /// /// This preserves the waiter identity and sets the high cancel-tag bit so /// completion handling can distinguish cancel CQEs from operation CQEs. pub const fn cancel_user_data(self) -> UserData { self.0 | Self::CANCEL_TAG } /// Decode `user_data` into waiter identity and cancel-tag state. /// /// The returned waiter id always has the cancel-tag bit stripped. The /// boolean reports whether that bit was set in the input value. const fn from_user_data(user_data: UserData) -> (Self, bool) { let is_cancel = (user_data & Self::CANCEL_TAG) != 0; (Self(user_data & !Self::CANCEL_TAG), is_cancel) } } /// Lifecycle state of a tracked request. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum WaiterState { /// Request is still tracked and has not transitioned to cancellation. Active { /// Absolute wheel tick by which the request must complete. /// /// If completion has not been observed by this tick, cancellation is /// requested. `None` means this request has no timeout deadline. target_tick: Option, }, /// Cancellation was requested. /// /// If the request still has an operation SQE in flight, the loop stages an /// async cancel. If the request is only parked in the ready queue, the loop /// completes it locally with timeout when that entry is revisited. CancelRequested, } /// State for one tracked logical request. struct Waiter { /// Stable identity of this waiter slot instance. id: WaiterId, /// Lifecycle state for the logical request stored in this slot. state: WaiterState, /// Whether the logical request currently has an operation SQE in flight. in_flight: bool, /// The active request state machine. request: Request, } /// Outcome produced when staging the next SQE for a waiter. pub enum StageOutcome { /// The waiter was canceled while parked in the ready queue and should /// complete locally with timeout rather than issuing another SQE. Timeout(Request), /// The original caller dropped its wait handle before this SQE could be /// staged, so the waiter was retired locally. Orphaned { /// Active deadline tracking to remove from the timeout wheel, if any. target_tick: Option, }, /// The waiter is still active and produced an SQE for submission. Submit(SqueueEntry), } /// Outcome produced when handling an operation CQE for a waiter. #[allow(clippy::large_enum_variant)] pub enum CompletionOutcome { /// The CQE belonged to an async cancel SQE and was handled internally. Cancel, /// The logical request needs another SQE and should be placed back in the /// ready queue. Requeue(WaiterId), /// The logical request completed and was removed from the waiter table. Complete { /// The completed request, ready to deliver its cached result. request: Request, /// Active deadline tracking to remove from the timeout wheel, when /// completion happened before cancellation was requested. target_tick: Option, }, } /// Tracks logical requests and the state needed to complete them. pub struct Waiters { /// Waiters indexed by slot index. /// /// Free slots have no waiter (`None`). entries: Vec>, /// Stack of reusable waiter ids. free: Vec, /// Number of tracked waiters currently stored in `entries`. len: usize, } impl Waiters { /// Create an empty waiter set that can track at most `capacity` logical /// requests at once. pub fn new(capacity: usize) -> Self { let mut entries = Vec::with_capacity(capacity); entries.resize_with(capacity, || None); let mut free = Vec::with_capacity(capacity); free.extend((0..capacity).rev().map(|index| { let index = u32::try_from(index).expect("slot index overflow"); WaiterId::new(index, 0) })); Self { entries, free, len: 0, } } /// Return the number of currently tracked waiters. pub const fn len(&self) -> usize { self.len } /// Return whether there are no tracked waiters. pub const fn is_empty(&self) -> bool { self.len == 0 } /// Insert a request and return its assigned id. /// /// Panics if no free slot is available. pub fn insert(&mut self, request: Request, target_tick: Option) -> WaiterId { let id = self .free .pop() .expect("waiters should not exceed configured capacity"); let index = id.index() as usize; let replaced = self.entries[index].replace(Waiter { id, state: WaiterState::Active { target_tick }, in_flight: false, request, }); assert!(replaced.is_none(), "free slot should not contain waiter"); self.len += 1; id } /// Remove the waiter stored at `index`, returning its owned request. /// /// Panics if `index` is out of bounds or the slot is empty. Callers must /// already have validated that the slot still belongs to the expected /// waiter. fn take(&mut self, index: usize) -> Request { let slot = self.entries[index].take().expect("tracked waiter missing"); self.free.push(slot.id.next_generation()); self.len -= 1; slot.request } /// Request cancellation for an active waiter. /// /// Returns `true` when the waiter was successfully transitioned to /// cancel-requested. Returns `false` when the waiter id is stale, not /// present, or already cancel-requested. pub fn cancel(&mut self, waiter_id: WaiterId) -> bool { let Some(slot) = self.entries.get_mut(waiter_id.index() as usize) else { return false; }; let Some(slot) = slot.as_mut() else { return false; }; if slot.id != waiter_id { // Slot was reused, this CQE belongs to an older waiter generation. return false; } match slot.state { WaiterState::Active { .. } => { slot.state = WaiterState::CancelRequested; true } WaiterState::CancelRequested => false, } } /// Stage the next SQE for a waiter. /// /// This either returns the next SQE to issue, or removes the waiter from /// the table and returns the local action that should happen instead. /// /// - [`StageOutcome::Submit`] leaves the waiter tracked and yields the next SQE. /// - [`StageOutcome::Timeout`] removes the waiter and completes it locally with /// timeout. /// - [`StageOutcome::Orphaned`] removes the waiter because the caller dropped its /// wait handle before restaging. /// /// When this returns [`StageOutcome::Submit`], the waiter is marked as having an /// operation SQE outstanding immediately, so [`Waiters::is_in_flight`] will return /// `true` for that waiter. /// /// Panics if `waiter_id` does not refer to a currently tracked waiter or if /// the waiter already has an operation SQE outstanding. pub fn stage(&mut self, waiter_id: WaiterId) -> StageOutcome { let index = waiter_id.index() as usize; let slot = self .entries .get_mut(index) .and_then(Option::as_mut) .expect("stage called for untracked waiter"); assert_eq!(slot.id, waiter_id, "stage called with stale waiter id"); match slot.state { WaiterState::CancelRequested => StageOutcome::Timeout(self.take(index)), WaiterState::Active { target_tick } if slot.request.is_orphaned() => { // The current request still owns all resources, but there is no // caller left to observe more progress, so retire it locally // instead of issuing another SQE. let _ = self.take(index); StageOutcome::Orphaned { target_tick } } WaiterState::Active { .. } => { assert!( !slot.in_flight, "stage called for waiter with op already in flight" ); slot.in_flight = true; StageOutcome::Submit(slot.request.build_sqe(waiter_id)) } } } /// Process one CQE for a waiter. /// /// Cancel CQEs are handled internally. Operation CQEs drive the request /// state machine and return a high-level loop action. If the current SQE /// completed but the original caller already dropped its wait handle, the /// waiter is retired locally instead of requeueing another SQE. /// /// Panics if a non-cancel CQE does not refer to a currently tracked waiter, /// if it uses a stale waiter generation, or if the waiter has no operation /// SQE outstanding. pub fn on_completion(&mut self, user_data: UserData, result: i32) -> CompletionOutcome { let (waiter_id, is_cancel) = WaiterId::from_user_data(user_data); let index = waiter_id.index() as usize; let Some(slot) = self.entries.get_mut(index).and_then(Option::as_mut) else { assert!(is_cancel, "operation CQE for untracked waiter"); return CompletionOutcome::Cancel; }; if slot.id != waiter_id { assert!(is_cancel, "operation CQE for stale waiter generation"); return CompletionOutcome::Cancel; } if is_cancel { if result == 0 { // Cancellation successful. } else if result == -libc::EALREADY { // Cancellation is no longer possible at this stage. The target // operation CQE should follow shortly. } else if result == -libc::ENOENT { // Not found can mean the target already completed (common race) or // stale/invalid user_data. } else if result == -libc::EINVAL { panic!("async cancel SQE rejected by kernel: EINVAL"); } else { warn!(result, "unexpected async cancel CQE result"); } // Cancel CQEs acknowledge cancel requests but do not complete waiters. return CompletionOutcome::Cancel; } // The operation CQE retires the currently in-flight SQE, regardless of // whether the request completes or is requeued for another one. assert!(slot.in_flight); slot.in_flight = false; let state = slot.state; let completed = slot.request.on_cqe(slot.state, result); if completed || slot.request.is_orphaned() { // Either the request reached a terminal state, or the current SQE // made non-terminal progress for a caller that is already gone. In // both cases, remove the waiter now instead of requeueing another SQE. let target_tick = match state { WaiterState::Active { target_tick } => target_tick, WaiterState::CancelRequested => None, }; CompletionOutcome::Complete { request: self.take(index), target_tick, } } else { CompletionOutcome::Requeue(waiter_id) } } /// Return whether a waiter currently has an operation SQE in flight. pub fn is_in_flight(&self, waiter_id: WaiterId) -> bool { let index = waiter_id.index() as usize; self.entries .get(index) .and_then(Option::as_ref) .is_some_and(|slot| slot.id == waiter_id && slot.in_flight) } } #[cfg(test)] mod tests { use super::*; use crate::{ iouring::request::{ReadAtRequest, RecvRequest, Request, SendRequest, SyncRequest}, IoBuf, IoBufMut, IoBufs, }; use commonware_utils::channel::oneshot; use std::{ os::fd::{FromRawFd, IntoRawFd}, panic::{catch_unwind, AssertUnwindSafe}, sync::Arc, }; /// Build a `Sync` request backed by a socket fd so waiter tests can /// exercise slot lifecycle without touching the filesystem. fn make_sync_request() -> (Request, oneshot::Receiver>) { let (sock_left, _sock_right) = std::os::unix::net::UnixStream::pair().expect("failed to create unix socket pair"); // SAFETY: sock_left is a valid fd that we own. let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) }; let (tx, rx) = oneshot::channel(); let request = Request::Sync(SyncRequest { file: Arc::new(file), result: None, sender: tx, }); (request, rx) } fn waiter_state(waiters: &Waiters, waiter_id: WaiterId) -> Option { let index = waiter_id.index() as usize; let slot = waiters.entries.get(index)?.as_ref()?; (slot.id == waiter_id).then_some(slot.state) } fn remove_waiter(waiters: &mut Waiters, waiter_id: WaiterId) -> Request { let index = waiter_id.index() as usize; let slot = waiters .entries .get(index) .and_then(Option::as_ref) .expect("remove_waiter called for untracked waiter"); assert_eq!( slot.id, waiter_id, "remove_waiter called with stale waiter id" ); waiters.take(index) } #[test] fn test_waiter_id_encoding_and_generation_wrap() { // Verify waiter ids round-trip through user_data encoding and wrap their generation field // without corrupting the slot index bits. let wrapped = WaiterId::new(7, (WaiterId::GENERATION_MASK as u32).wrapping_add(5)); assert_eq!(wrapped.generation(), 4); let max = WaiterId::new(7, WaiterId::GENERATION_MASK as u32); assert_eq!(max.next_generation().generation(), 0); let waiter_id = WaiterId::new(7, 3); assert_eq!(waiter_id.index(), 7); assert_eq!(waiter_id.generation(), 3); let (decoded_op, is_cancel_op) = WaiterId::from_user_data(waiter_id.user_data()); assert_eq!(decoded_op, waiter_id); assert!(!is_cancel_op); let (decoded_cancel, is_cancel) = WaiterId::from_user_data(waiter_id.cancel_user_data()); assert_eq!(decoded_cancel, waiter_id); assert!(is_cancel); } #[test] fn test_waiters_lifecycle_and_slot_reuse() { // Verify waiter insertion, completion, removal, and slot reuse all preserve generations. let mut waiters = Waiters::new(3); assert_eq!(waiters.entries.len(), 3); assert_eq!(waiters.len(), 0); assert!(waiters.is_empty()); // Populate two slots so the test can later free and reuse one of them. let (req0, _rx0) = make_sync_request(); let (req1, _rx1) = make_sync_request(); let id0 = waiters.insert(req0, Some(5)); let id1 = waiters.insert(req1, Some(9)); assert_eq!((id0.index(), id1.index()), (0, 1)); assert_eq!(waiters.len(), 2); // A stale operation CQE should panic because only cancel CQEs are // expected to arrive after slot reuse. let stale = WaiterId::new(id1.index(), id1.generation().wrapping_add(1)); let stale_completion = catch_unwind(AssertUnwindSafe(|| { let _ = waiters.on_completion(stale.user_data(), 0); })); assert!(stale_completion.is_err()); // Complete id1. assert!(matches!(waiters.stage(id1), StageOutcome::Submit(_))); assert!(matches!( waiters.on_completion(id1.user_data(), 0), CompletionOutcome::Complete { target_tick: Some(9), .. } )); assert_eq!(waiters.len(), 1); // Next allocation reuses the freed slot with incremented generation. let (req2, _rx2) = make_sync_request(); let id2 = waiters.insert(req2, Some(11)); assert_eq!(id2.index(), id1.index()); assert_eq!( id2.generation(), id1.generation().wrapping_add(1) & (WaiterId::GENERATION_MASK as u32) ); // All live waiters should still complete and remove cleanly after slot reuse. assert!(matches!(waiters.stage(id0), StageOutcome::Submit(_))); let _ = waiters.on_completion(id0.user_data(), 0); assert!(matches!(waiters.stage(id2), StageOutcome::Submit(_))); let _ = waiters.on_completion(id2.user_data(), 0); assert!(waiters.is_empty()); } #[test] fn test_waiters_cancel_paths() { // Verify cancel requests transition waiter state, ignore cancel CQEs for completion, and // discard late cancel CQEs once the original operation has already completed. let mut waiters = Waiters::new(3); let (req, _rx) = make_sync_request(); let waiter_id = waiters.insert(req, Some(2)); let stale = WaiterId::new(waiter_id.index(), waiter_id.generation().wrapping_add(1)); assert!(!waiters.cancel(stale)); assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_))); assert!( waiters.cancel(waiter_id), "cancel should transition active waiter" ); // Cancel CQE does not complete the waiter. assert!(matches!( waiters.on_completion(waiter_id.cancel_user_data(), -libc::ECANCELED), CompletionOutcome::Cancel )); // Op CQE completes the waiter. assert!(matches!( waiters.on_completion(waiter_id.user_data(), 0), CompletionOutcome::Complete { target_tick: None, .. } )); assert!(waiters.is_empty()); // Late cancel CQE for the already-completed waiter should be ignored. assert!(matches!( waiters.on_completion(waiter_id.cancel_user_data(), -libc::ECANCELED), CompletionOutcome::Cancel )); let missing_op_cqe = catch_unwind(AssertUnwindSafe(|| { let _ = waiters.on_completion(0, 1); })); assert!(missing_op_cqe.is_err()); } #[test] fn test_waiters_track_in_flight_state() { // Verify `stage` tracks a staged operation and that the bit is // cleared again when the matching op CQE is processed. let mut waiters = Waiters::new(1); let (req, _rx) = make_sync_request(); let waiter_id = waiters.insert(req, Some(4)); assert!(!waiters.is_in_flight(waiter_id)); assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_))); assert!(waiters.is_in_flight(waiter_id)); assert!(matches!( waiters.on_completion(waiter_id.user_data(), 0), CompletionOutcome::Complete { .. } )); assert!(!waiters.is_in_flight(waiter_id)); } #[test] fn test_waiters_reject_stale_in_flight_queries() { // Verify stale waiter ids cannot observe in-flight state after // their slot has been recycled to a new generation. let mut waiters = Waiters::new(1); let (req0, _rx0) = make_sync_request(); let stale_id = waiters.insert(req0, Some(1)); let _ = remove_waiter(&mut waiters, stale_id); let (req1, _rx1) = make_sync_request(); let active_id = waiters.insert(req1, Some(2)); assert_ne!(active_id, stale_id); assert!(!waiters.is_in_flight(stale_id)); assert!(matches!(waiters.stage(active_id), StageOutcome::Submit(_))); assert!(waiters.is_in_flight(active_id)); } #[test] fn test_waiters_stage_panics_for_out_of_range_and_empty_slots() { // Verify `stage` treats impossible waiter ids as invariant failures, // while the tolerant query/cancel paths still reject them cleanly. let mut waiters = Waiters::new(1); let out_of_range = WaiterId::new(7, 0); assert!(!waiters.cancel(out_of_range)); let out_of_range_stage = catch_unwind(AssertUnwindSafe(|| { let _ = waiters.stage(out_of_range); })); assert!(out_of_range_stage.is_err()); let empty_slot = WaiterId::new(0, 0); assert!(!waiters.cancel(empty_slot)); let empty_slot_stage = catch_unwind(AssertUnwindSafe(|| { let _ = waiters.stage(empty_slot); })); assert!(empty_slot_stage.is_err()); } #[test] fn test_waiters_cancel_and_in_flight_reject_out_of_range_and_empty_slots() { // Verify cancel and in-flight tracking reject waiter ids that point // outside the table or at currently empty slots. let mut waiters = Waiters::new(1); let out_of_range = WaiterId::new(7, 0); assert!(!waiters.cancel(out_of_range)); assert!(!waiters.is_in_flight(out_of_range)); let empty_slot = WaiterId::new(0, 0); assert!(!waiters.cancel(empty_slot)); assert!(!waiters.is_in_flight(empty_slot)); } #[test] fn test_waiters_cancel_stage_only_when_in_flight() { // Verify timeout processing can distinguish between: // - a waiter whose current SQE is still in flight // - a waiter that is only parked in the ready queue let mut waiters = Waiters::new(2); // First build a waiter that still has an operation SQE outstanding. let (active_req, _active_rx) = make_sync_request(); let active = waiters.insert(active_req, Some(2)); assert!(matches!(waiters.stage(active), StageOutcome::Submit(_))); assert!(waiters.cancel(active)); assert!(waiters.is_in_flight(active)); let active_state = waiter_state(&waiters, active).expect("active waiter missing"); assert!(matches!(active_state, WaiterState::CancelRequested)); // Then build a waiter that has been canceled before any SQE was staged. let (ready_req, _ready_rx) = make_sync_request(); let ready = waiters.insert(ready_req, Some(3)); assert!(waiters.cancel(ready)); assert!(!waiters.is_in_flight(ready)); let ready_state = waiter_state(&waiters, ready).expect("ready waiter missing"); assert!(matches!(ready_state, WaiterState::CancelRequested)); } #[test] fn test_waiters_stage_orphans_closed_requests() { // Verify closed send and read-at requests are removed locally before // their first SQE is ever staged. { // Send request orphaned before first submit. let mut waiters = Waiters::new(1); let (tx, rx) = oneshot::channel(); drop(rx); let waiter_id = waiters.insert( Request::Send(SendRequest { fd: Arc::new(std::os::unix::net::UnixStream::pair().unwrap().0.into()), write: IoBufs::from(IoBuf::from(b"hello")).into(), deadline: None, result: None, sender: tx, }), Some(7), ); match waiters.stage(waiter_id) { StageOutcome::Orphaned { target_tick: Some(7), } => {} _ => panic!("closed send waiter should be orphaned before staging"), } assert!(waiters.is_empty()); } { // Read-at request orphaned before first submit. let mut waiters = Waiters::new(1); let (sock_left, _sock_right) = std::os::unix::net::UnixStream::pair().expect("failed to create unix socket pair"); // SAFETY: sock_left is a valid fd that we own. let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) }; let (tx, rx) = oneshot::channel(); drop(rx); let waiter_id = waiters.insert( Request::ReadAt(ReadAtRequest { file: Arc::new(file), offset: 0, len: 8, read: 0, buf: IoBufMut::with_capacity(8), result: None, sender: tx, }), Some(8), ); match waiters.stage(waiter_id) { StageOutcome::Orphaned { target_tick: Some(8), } => {} _ => panic!("closed read waiter should be orphaned before staging"), } assert!(waiters.is_empty()); } } #[test] fn test_waiters_orphan_closed_requests_after_nonterminal_completion() { // Verify retryable and partial-progress send, recv, and read-at CQEs // remove the waiter instead of requeueing once the caller is gone. { // Send request orphaned after a retryable CQE. let mut waiters = Waiters::new(1); let (tx, rx) = oneshot::channel(); let waiter_id = waiters.insert( Request::Send(SendRequest { fd: Arc::new(std::os::unix::net::UnixStream::pair().unwrap().0.into()), write: IoBufs::from(IoBuf::from(b"hello")).into(), deadline: None, result: None, sender: tx, }), Some(5), ); assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_))); drop(rx); match waiters.on_completion(waiter_id.user_data(), -libc::EAGAIN) { CompletionOutcome::Complete { request, target_tick: Some(5), } => request.complete(), _ => panic!("closed send waiter should be orphaned after retry CQE"), } assert!(waiters.is_empty()); } { // Send request orphaned after a partial-progress CQE. let mut waiters = Waiters::new(1); let (tx, rx) = oneshot::channel(); let waiter_id = waiters.insert( Request::Send(SendRequest { fd: Arc::new(std::os::unix::net::UnixStream::pair().unwrap().0.into()), write: IoBufs::from(IoBuf::from(b"hello")).into(), deadline: None, result: None, sender: tx, }), Some(5), ); assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_))); drop(rx); match waiters.on_completion(waiter_id.user_data(), 2) { CompletionOutcome::Complete { request, target_tick: Some(5), } => request.complete(), _ => panic!("closed send waiter should be orphaned after partial CQE"), } assert!(waiters.is_empty()); } { // Exact recv orphaned after a retryable CQE. let mut waiters = Waiters::new(1); let (tx, rx) = oneshot::channel(); let waiter_id = waiters.insert( Request::Recv(RecvRequest { fd: Arc::new(std::os::unix::net::UnixStream::pair().unwrap().0.into()), buf: IoBufMut::with_capacity(5), offset: 0, len: 5, exact: true, deadline: None, result: None, sender: tx, }), Some(6), ); assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_))); drop(rx); match waiters.on_completion(waiter_id.user_data(), -libc::EAGAIN) { CompletionOutcome::Complete { request, target_tick: Some(6), } => request.complete(), _ => panic!("closed recv waiter should be orphaned after retry CQE"), } assert!(waiters.is_empty()); } { // Exact recv orphaned after a partial-progress CQE. let mut waiters = Waiters::new(1); let (tx, rx) = oneshot::channel(); let waiter_id = waiters.insert( Request::Recv(RecvRequest { fd: Arc::new(std::os::unix::net::UnixStream::pair().unwrap().0.into()), buf: IoBufMut::with_capacity(5), offset: 0, len: 5, exact: true, deadline: None, result: None, sender: tx, }), Some(6), ); assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_))); drop(rx); match waiters.on_completion(waiter_id.user_data(), 3) { CompletionOutcome::Complete { request, target_tick: Some(6), } => request.complete(), _ => panic!("closed recv waiter should be orphaned after partial CQE"), } assert!(waiters.is_empty()); } { // Read-at request orphaned after a partial-progress CQE. let mut waiters = Waiters::new(1); let (sock_left, _sock_right) = std::os::unix::net::UnixStream::pair().expect("failed to create unix socket pair"); // SAFETY: sock_left is a valid fd that we own. let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) }; let (tx, rx) = oneshot::channel(); let waiter_id = waiters.insert( Request::ReadAt(ReadAtRequest { file: Arc::new(file), offset: 0, len: 8, read: 0, buf: IoBufMut::with_capacity(8), result: None, sender: tx, }), Some(9), ); assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_))); drop(rx); match waiters.on_completion(waiter_id.user_data(), 3) { CompletionOutcome::Complete { request, target_tick: Some(9), } => request.complete(), _ => panic!("closed read waiter should be orphaned after partial CQE"), } assert!(waiters.is_empty()); } } #[test] fn test_waiters_accept_expected_cancel_cqe_results() { // Verify the expected kernel cancel CQE results leave the waiter alive // for the original operation CQE to finish it later. for result in [0, -libc::EALREADY, -libc::ENOENT] { let mut waiters = Waiters::new(1); let (req, _rx) = make_sync_request(); let waiter_id = waiters.insert(req, Some(2)); assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_))); assert!(waiters.cancel(waiter_id)); assert!(matches!( waiters.on_completion(waiter_id.cancel_user_data(), result), CompletionOutcome::Cancel )); let state = waiter_state(&waiters, waiter_id).expect("waiter should remain tracked"); assert!(matches!(state, WaiterState::CancelRequested)); } } #[test] fn test_waiters_tolerate_unexpected_negative_cancel_result() { // Verify unexpected negative cancel CQEs are ignored rather than // corrupting waiter state. let mut waiters = Waiters::new(1); let (req, _rx) = make_sync_request(); let waiter_id = waiters.insert(req, Some(2)); assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_))); assert!(waiters.cancel(waiter_id)); assert!(matches!( waiters.on_completion(waiter_id.cancel_user_data(), -libc::EPERM), CompletionOutcome::Cancel )); let state = waiter_state(&waiters, waiter_id).expect("waiter should remain tracked"); assert!(matches!(state, WaiterState::CancelRequested)); } #[test] fn test_waiters_cancel_cqe_einval_panics() { // Verify `EINVAL` remains a hard invariant failure because the kernel // rejected our async cancel SQE. let mut waiters = Waiters::new(1); let (req, _rx) = make_sync_request(); let waiter_id = waiters.insert(req, Some(2)); assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_))); assert!(waiters.cancel(waiter_id)); let result = catch_unwind(AssertUnwindSafe(|| { let _ = waiters.on_completion(waiter_id.cancel_user_data(), -libc::EINVAL); })); assert!(result.is_err()); } #[test] fn test_waiters_stale_ids_cannot_remove_reused_slots() { // Verify stale waiter ids cannot observe state or remove a reused slot. let mut waiters = Waiters::new(1); let (req0, _rx0) = make_sync_request(); let waiter_id = waiters.insert(req0, Some(1)); let _ = remove_waiter(&mut waiters, waiter_id); let (req1, _rx1) = make_sync_request(); let reused_id = waiters.insert(req1, Some(2)); assert_ne!(reused_id, waiter_id); assert!(waiter_state(&waiters, waiter_id).is_none()); let stale_remove = catch_unwind(AssertUnwindSafe(|| { let _ = remove_waiter(&mut waiters, waiter_id); })); assert!(stale_remove.is_err()); } #[test] fn test_waiters_insert_and_cancel_invariants() { // Verify waiter capacity is enforced and that cancel remains valid even for waiters that // were inserted without a deadline. let mut waiters = Waiters::new(2); // Inserting beyond configured capacity should panic. let (req0, _rx0) = make_sync_request(); let (req1, _rx1) = make_sync_request(); let _ = waiters.insert(req0, None); let _ = waiters.insert(req1, None); let insert_overflow = catch_unwind(AssertUnwindSafe(|| { let (req2, _rx2) = make_sync_request(); let _ = waiters.insert(req2, None); })); assert!(insert_overflow.is_err()); // Cancellation is allowed even when no deadline is tracked. let mut waiters = Waiters::new(2); let (req, _rx) = make_sync_request(); let no_deadline = waiters.insert(req, None); assert!( waiters.cancel(no_deadline), "cancel should support active waiter without deadline" ); // Repeated cancel on the same waiter must be ignored. let (req, _rx) = make_sync_request(); let active = waiters.insert(req, Some(3)); assert!(waiters.cancel(active)); assert!(!waiters.cancel(active)); } }