Skip to main content

karyon_core/async_util/
async_queue.rs

1use std::{collections::VecDeque, sync::Arc};
2
3use crate::async_runtime::lock::Mutex;
4use crate::async_util::CondVar;
5
6/// Bounded async queue. Producers `push` (blocks while full),
7/// consumers `recv` (blocks while empty).
8///
9/// The mutex covers only the `VecDeque` push/pop, so slow work the
10/// consumer does after `recv` is never inside a lock.
11///
12/// # Example
13///
14/// ```no_run
15/// use karyon_core::async_util::AsyncQueue;
16///
17/// async {
18///     let q = AsyncQueue::<u32>::new(8);
19///     q.push(1).await;
20///     let _ = q.recv().await;
21/// };
22/// ```
23pub struct AsyncQueue<T> {
24    queue: Mutex<VecDeque<T>>,
25    capacity: usize,
26    not_empty: CondVar,
27    not_full: CondVar,
28}
29
30impl<T> AsyncQueue<T> {
31    /// Create an `AsyncQueue` with the given capacity. Panics if
32    /// `capacity` is zero.
33    pub fn new(capacity: usize) -> Arc<Self> {
34        assert!(capacity > 0, "AsyncQueue capacity must be > 0");
35        Arc::new(Self {
36            queue: Mutex::new(VecDeque::with_capacity(capacity)),
37            capacity,
38            not_empty: CondVar::new(),
39            not_full: CondVar::new(),
40        })
41    }
42
43    /// Push an item. Awaits while the queue is full.
44    pub async fn push(&self, item: T) {
45        let mut queue = self.queue.lock().await;
46        while queue.len() >= self.capacity {
47            queue = self.not_full.wait(queue).await;
48        }
49        queue.push_back(item);
50        self.not_empty.signal();
51    }
52
53    /// Pop the next item. Awaits while the queue is empty.
54    pub async fn recv(&self) -> T {
55        let mut queue = self.queue.lock().await;
56        while queue.is_empty() {
57            queue = self.not_empty.wait(queue).await;
58        }
59        let item = queue.pop_front().expect("queue not empty");
60        self.not_full.signal();
61        item
62    }
63
64    /// Current number of items.
65    pub async fn len(&self) -> usize {
66        self.queue.lock().await.len()
67    }
68
69    /// True when the queue has no items.
70    pub async fn is_empty(&self) -> bool {
71        self.queue.lock().await.is_empty()
72    }
73
74    /// Configured capacity.
75    pub fn capacity(&self) -> usize {
76        self.capacity
77    }
78}