karyon_core/async_util/
async_queue.rs1use std::{collections::VecDeque, sync::Arc};
2
3use crate::async_runtime::lock::Mutex;
4use crate::async_util::CondVar;
5
6pub struct AsyncQueue<T> {
24 queue: Mutex<VecDeque<T>>,
25 capacity: usize,
26 not_empty: CondVar,
27 not_full: CondVar,
28}
29
30impl<T> AsyncQueue<T> {
31 pub fn new(capacity: usize) -> Arc<Self> {
34 assert!(capacity > 0, "AsyncQueue capacity must be > 0");
35 Arc::new(Self {
36 queue: Mutex::new(VecDeque::with_capacity(capacity)),
37 capacity,
38 not_empty: CondVar::new(),
39 not_full: CondVar::new(),
40 })
41 }
42
43 pub async fn push(&self, item: T) {
45 let mut queue = self.queue.lock().await;
46 while queue.len() >= self.capacity {
47 queue = self.not_full.wait(queue).await;
48 }
49 queue.push_back(item);
50 self.not_empty.signal();
51 }
52
53 pub async fn recv(&self) -> T {
55 let mut queue = self.queue.lock().await;
56 while queue.is_empty() {
57 queue = self.not_empty.wait(queue).await;
58 }
59 let item = queue.pop_front().expect("queue not empty");
60 self.not_full.signal();
61 item
62 }
63
64 pub async fn len(&self) -> usize {
66 self.queue.lock().await.len()
67 }
68
69 pub async fn is_empty(&self) -> bool {
71 self.queue.lock().await.is_empty()
72 }
73
74 pub fn capacity(&self) -> usize {
76 self.capacity
77 }
78}