Skip to main content

karyon_core/async_runtime/
executor.rs

1use std::{future::Future, panic::catch_unwind, sync::Arc, thread};
2
3use once_cell::sync::OnceCell;
4
5#[cfg(feature = "smol")]
6pub use smol::Executor as SmolEx;
7
8#[cfg(feature = "tokio")]
9pub use tokio::runtime::Runtime;
10
11use super::Task;
12
13/// A handle to a multi-threaded async executor.
14///
15/// karyon does not run the executor for you. The caller owns the runtime and
16/// is responsible for driving it. On `tokio` this is just a `Runtime`. On
17/// `smol` you must spin up worker threads yourself, e.g. via
18/// [`easy_parallel`](https://docs.rs/easy-parallel):
19///
20/// ```ignore
21/// use std::sync::Arc;
22/// use async_channel::unbounded;
23/// use easy_parallel::Parallel;
24/// use smol::{future, Executor as SmolEx};
25///
26/// let ex = Arc::new(SmolEx::new());
27/// let (signal, shutdown) = unbounded::<()>();
28///
29/// let num_threads = std::thread::available_parallelism()
30///     .map(|n| n.get())
31///     .unwrap_or(1);
32///
33/// Parallel::new()
34///     .each(0..num_threads, |_| future::block_on(ex.run(shutdown.recv())))
35///     .finish(|| future::block_on(async {
36///         // your async main here
37///         drop(signal);
38///     }));
39/// ```
40///
41/// Pass `ex.clone().into()` to karyon APIs that take an [`Executor`].
42#[derive(Clone)]
43pub struct Executor {
44    #[cfg(feature = "smol")]
45    inner: Arc<SmolEx<'static>>,
46    #[cfg(feature = "tokio")]
47    inner: Arc<Runtime>,
48}
49
50impl Executor {
51    pub fn spawn<T: Send + 'static>(
52        &self,
53        future: impl Future<Output = T> + Send + 'static,
54    ) -> Task<T> {
55        self.inner.spawn(future).into()
56    }
57
58    #[cfg(feature = "tokio")]
59    pub fn handle(&self) -> &tokio::runtime::Handle {
60        self.inner.handle()
61    }
62}
63
64static GLOBAL_EXECUTOR: OnceCell<Executor> = OnceCell::new();
65
66/// Returns a single-threaded global executor
67pub fn global_executor() -> Executor {
68    #[cfg(feature = "smol")]
69    fn init_executor() -> Executor {
70        let ex = smol::Executor::new();
71        thread::Builder::new()
72            .name("smol-executor".to_string())
73            .spawn(|| loop {
74                catch_unwind(|| {
75                    smol::block_on(global_executor().inner.run(std::future::pending::<()>()))
76                })
77                .ok();
78            })
79            .expect("cannot spawn executor thread");
80        // Prevent spawning another thread by running the process driver on this
81        // thread. see https://github.com/smol-rs/smol/blob/master/src/spawn.rs
82        ex.spawn(async_process::driver()).detach();
83        Executor {
84            inner: Arc::new(ex),
85        }
86    }
87
88    #[cfg(feature = "tokio")]
89    fn init_executor() -> Executor {
90        let ex = Arc::new(tokio::runtime::Runtime::new().expect("cannot build tokio runtime"));
91        thread::Builder::new()
92            .name("tokio-executor".to_string())
93            .spawn({
94                let ex = ex.clone();
95                move || {
96                    catch_unwind(|| ex.block_on(std::future::pending::<()>())).ok();
97                }
98            })
99            .expect("cannot spawn tokio runtime thread");
100        Executor { inner: ex }
101    }
102
103    GLOBAL_EXECUTOR.get_or_init(init_executor).clone()
104}
105
106#[cfg(feature = "smol")]
107impl From<Arc<smol::Executor<'static>>> for Executor {
108    fn from(ex: Arc<smol::Executor<'static>>) -> Executor {
109        Executor { inner: ex }
110    }
111}
112
113#[cfg(feature = "tokio")]
114impl From<Arc<tokio::runtime::Runtime>> for Executor {
115    fn from(rt: Arc<tokio::runtime::Runtime>) -> Executor {
116        Executor { inner: rt }
117    }
118}
119
120#[cfg(feature = "smol")]
121impl From<smol::Executor<'static>> for Executor {
122    fn from(ex: smol::Executor<'static>) -> Executor {
123        Executor {
124            inner: Arc::new(ex),
125        }
126    }
127}
128
129#[cfg(feature = "tokio")]
130impl From<tokio::runtime::Runtime> for Executor {
131    fn from(rt: tokio::runtime::Runtime) -> Executor {
132        Executor {
133            inner: Arc::new(rt),
134        }
135    }
136}