karyon_jsonrpc/server/
response_queue.rs

1use std::{collections::VecDeque, sync::Arc};
2
3use karyon_core::{async_runtime::lock::Mutex, async_util::CondVar};
4
5/// A queue for handling responses
6pub(super) struct ResponseQueue<T> {
7    queue: Mutex<VecDeque<T>>,
8    condvar: CondVar,
9}
10
11impl<T: std::fmt::Debug> ResponseQueue<T> {
12    pub(super) fn new() -> Arc<Self> {
13        Arc::new(Self {
14            queue: Mutex::new(VecDeque::new()),
15            condvar: CondVar::new(),
16        })
17    }
18
19    /// Wait while the queue is empty, remove and return the item from the queue,
20    /// panicking if empty (shouldn't happen)
21    pub(super) async fn recv(&self) -> T {
22        let mut queue = self.queue.lock().await;
23
24        while queue.is_empty() {
25            queue = self.condvar.wait(queue).await;
26        }
27
28        match queue.pop_front() {
29            Some(v) => v,
30            None => unreachable!(),
31        }
32    }
33
34    /// Push an item into the queue, notify all waiting tasks that the
35    /// condvar has changed
36    pub(super) async fn push(&self, res: T) {
37        self.queue.lock().await.push_back(res);
38        self.condvar.signal();
39    }
40}