karyon_core/async_util/
condwait.rs1use super::CondVar;
2use crate::async_runtime::lock::Mutex;
3
4pub struct CondWait {
30 condvar: CondVar,
32 w: Mutex<bool>,
34}
35
36impl CondWait {
37 pub fn new() -> Self {
39 Self {
40 condvar: CondVar::new(),
41 w: Mutex::new(false),
42 }
43 }
44
45 pub async fn wait(&self) {
47 let mut w = self.w.lock().await;
48
49 while !*w {
51 w = self.condvar.wait(w).await;
52 }
53 }
54
55 pub async fn signal(&self) {
57 *self.w.lock().await = true;
58 self.condvar.signal();
59 }
60
61 pub async fn broadcast(&self) {
63 *self.w.lock().await = true;
64 self.condvar.broadcast();
65 }
66
67 pub async fn reset(&self) {
69 *self.w.lock().await = false;
70 }
71}
72
73impl Default for CondWait {
74 fn default() -> Self {
75 Self::new()
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use std::sync::{
82 atomic::{AtomicUsize, Ordering},
83 Arc,
84 };
85
86 use crate::async_runtime::{block_on, spawn};
87
88 use super::*;
89
90 #[test]
91 fn test_cond_wait() {
92 block_on(async {
93 let cond_wait = Arc::new(CondWait::new());
94 let count = Arc::new(AtomicUsize::new(0));
95
96 let task = spawn({
97 let cond_wait = cond_wait.clone();
98 let count = count.clone();
99 async move {
100 cond_wait.wait().await;
101 count.fetch_add(1, Ordering::Relaxed);
102 }
104 });
105
106 cond_wait.signal().await;
108
109 let _ = task.await;
110
111 cond_wait.reset().await;
113
114 assert_eq!(count.load(Ordering::Relaxed), 1);
115
116 let task1 = spawn({
117 let cond_wait = cond_wait.clone();
118 let count = count.clone();
119 async move {
120 cond_wait.wait().await;
121 count.fetch_add(1, Ordering::Relaxed);
122 }
124 });
125
126 let task2 = spawn({
127 let cond_wait = cond_wait.clone();
128 let count = count.clone();
129 async move {
130 cond_wait.wait().await;
131 count.fetch_add(1, Ordering::Relaxed);
132 }
134 });
135
136 cond_wait.broadcast().await;
138
139 let _ = task1.await;
140 let _ = task2.await;
141 assert_eq!(count.load(Ordering::Relaxed), 3);
142 });
143 }
144}