karyon_core/async_util/backoff.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
use std::{
cmp::min,
sync::atomic::{AtomicBool, AtomicU32, Ordering},
time::Duration,
};
use super::sleep;
/// Exponential backoff
/// <https://en.wikipedia.org/wiki/Exponential_backoff>
///
/// # Examples
///
/// ```
/// use karyon_core::async_util::Backoff;
///
/// async {
/// let backoff = Backoff::new(300, 3000);
///
/// loop {
/// backoff.sleep().await;
///
/// // do something
/// break;
/// }
///
/// backoff.reset();
///
/// // ....
/// };
///
/// ```
///
pub struct Backoff {
/// The base delay in milliseconds for the initial retry.
base_delay: u64,
/// The max delay in milliseconds allowed for a retry.
max_delay: u64,
/// Atomic counter
retries: AtomicU32,
/// Stop flag
stop: AtomicBool,
}
impl Backoff {
/// Creates a new Backoff.
pub fn new(base_delay: u64, max_delay: u64) -> Self {
Self {
base_delay,
max_delay,
retries: AtomicU32::new(0),
stop: AtomicBool::new(false),
}
}
/// Sleep based on the current retry count and delay values.
/// Retruns the delay value.
pub async fn sleep(&self) -> u64 {
if self.stop.load(Ordering::SeqCst) {
sleep(Duration::from_millis(self.max_delay)).await;
return self.max_delay;
}
let retries = self.retries.load(Ordering::SeqCst);
let delay = self.base_delay * (2_u64).pow(retries);
let delay = min(delay, self.max_delay);
if delay == self.max_delay {
self.stop.store(true, Ordering::SeqCst);
}
self.retries.store(retries + 1, Ordering::SeqCst);
sleep(Duration::from_millis(delay)).await;
delay
}
/// Reset the retry counter to 0.
pub fn reset(&self) {
self.retries.store(0, Ordering::SeqCst);
self.stop.store(false, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::async_runtime::{block_on, spawn};
use super::*;
#[test]
fn test_backoff() {
block_on(async move {
let backoff = Arc::new(Backoff::new(5, 15));
let backoff_c = backoff.clone();
spawn(async move {
let delay = backoff_c.sleep().await;
assert_eq!(delay, 5);
let delay = backoff_c.sleep().await;
assert_eq!(delay, 10);
let delay = backoff_c.sleep().await;
assert_eq!(delay, 15);
})
.await
.unwrap();
spawn(async move {
backoff.reset();
let delay = backoff.sleep().await;
assert_eq!(delay, 5);
})
.await
.unwrap();
});
}
}