karyon_core/async_runtime/
executor.rs

1use std::{future::Future, panic::catch_unwind, sync::Arc, thread};
2
3use once_cell::sync::OnceCell;
4
5#[cfg(feature = "smol")]
6pub use smol::Executor as SmolEx;
7
8#[cfg(feature = "tokio")]
9pub use tokio::runtime::Runtime;
10
11use super::Task;
12
13#[derive(Clone)]
14pub struct Executor {
15    #[cfg(feature = "smol")]
16    inner: Arc<SmolEx<'static>>,
17    #[cfg(feature = "tokio")]
18    inner: Arc<Runtime>,
19}
20
21impl Executor {
22    pub fn spawn<T: Send + 'static>(
23        &self,
24        future: impl Future<Output = T> + Send + 'static,
25    ) -> Task<T> {
26        self.inner.spawn(future).into()
27    }
28
29    #[cfg(feature = "tokio")]
30    pub fn handle(&self) -> &tokio::runtime::Handle {
31        self.inner.handle()
32    }
33}
34
35static GLOBAL_EXECUTOR: OnceCell<Executor> = OnceCell::new();
36
37/// Returns a single-threaded global executor
38pub fn global_executor() -> Executor {
39    #[cfg(feature = "smol")]
40    fn init_executor() -> Executor {
41        let ex = smol::Executor::new();
42        thread::Builder::new()
43            .name("smol-executor".to_string())
44            .spawn(|| loop {
45                catch_unwind(|| {
46                    smol::block_on(global_executor().inner.run(std::future::pending::<()>()))
47                })
48                .ok();
49            })
50            .expect("cannot spawn executor thread");
51        // Prevent spawning another thread by running the process driver on this
52        // thread. see https://github.com/smol-rs/smol/blob/master/src/spawn.rs
53        ex.spawn(async_process::driver()).detach();
54        Executor {
55            inner: Arc::new(ex),
56        }
57    }
58
59    #[cfg(feature = "tokio")]
60    fn init_executor() -> Executor {
61        let ex = Arc::new(tokio::runtime::Runtime::new().expect("cannot build tokio runtime"));
62        thread::Builder::new()
63            .name("tokio-executor".to_string())
64            .spawn({
65                let ex = ex.clone();
66                move || {
67                    catch_unwind(|| ex.block_on(std::future::pending::<()>())).ok();
68                }
69            })
70            .expect("cannot spawn tokio runtime thread");
71        Executor { inner: ex }
72    }
73
74    GLOBAL_EXECUTOR.get_or_init(init_executor).clone()
75}
76
77#[cfg(feature = "smol")]
78impl From<Arc<smol::Executor<'static>>> for Executor {
79    fn from(ex: Arc<smol::Executor<'static>>) -> Executor {
80        Executor { inner: ex }
81    }
82}
83
84#[cfg(feature = "tokio")]
85impl From<Arc<tokio::runtime::Runtime>> for Executor {
86    fn from(rt: Arc<tokio::runtime::Runtime>) -> Executor {
87        Executor { inner: rt }
88    }
89}
90
91#[cfg(feature = "smol")]
92impl From<smol::Executor<'static>> for Executor {
93    fn from(ex: smol::Executor<'static>) -> Executor {
94        Executor {
95            inner: Arc::new(ex),
96        }
97    }
98}
99
100#[cfg(feature = "tokio")]
101impl From<tokio::runtime::Runtime> for Executor {
102    fn from(rt: tokio::runtime::Runtime) -> Executor {
103        Executor {
104            inner: Arc::new(rt),
105        }
106    }
107}