2020-06-25 15:35:42 +02:00
|
|
|
//! A thin wrapper around `ThreadPool` to make sure that we join all things
|
|
|
|
//! properly.
|
2023-05-20 22:29:32 +10:00
|
|
|
use std::sync::{Arc, Barrier};
|
|
|
|
|
2020-06-25 15:35:42 +02:00
|
|
|
use crossbeam_channel::Sender;
|
|
|
|
|
|
|
|
pub(crate) struct TaskPool<T> {
|
|
|
|
sender: Sender<T>,
|
|
|
|
inner: threadpool::ThreadPool,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T> TaskPool<T> {
|
2022-12-09 21:11:46 +08:00
|
|
|
pub(crate) fn new_with_threads(sender: Sender<T>, threads: usize) -> TaskPool<T> {
|
2022-06-04 20:48:51 +03:00
|
|
|
const STACK_SIZE: usize = 8 * 1024 * 1024;
|
|
|
|
|
|
|
|
let inner = threadpool::Builder::new()
|
|
|
|
.thread_name("Worker".into())
|
|
|
|
.thread_stack_size(STACK_SIZE)
|
2022-12-09 21:11:46 +08:00
|
|
|
.num_threads(threads)
|
2022-06-04 20:48:51 +03:00
|
|
|
.build();
|
2023-05-20 22:29:32 +10:00
|
|
|
|
|
|
|
// Set QoS of all threads in threadpool.
|
|
|
|
let barrier = Arc::new(Barrier::new(threads + 1));
|
|
|
|
for _ in 0..threads {
|
|
|
|
let barrier = barrier.clone();
|
|
|
|
inner.execute(move || {
|
|
|
|
stdx::thread::set_current_thread_qos_class(stdx::thread::QoSClass::Utility);
|
|
|
|
barrier.wait();
|
|
|
|
});
|
|
|
|
}
|
|
|
|
barrier.wait();
|
|
|
|
|
2022-06-04 20:48:51 +03:00
|
|
|
TaskPool { sender, inner }
|
2020-06-25 15:35:42 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) fn spawn<F>(&mut self, task: F)
|
|
|
|
where
|
|
|
|
F: FnOnce() -> T + Send + 'static,
|
|
|
|
T: Send + 'static,
|
|
|
|
{
|
|
|
|
self.inner.execute({
|
|
|
|
let sender = self.sender.clone();
|
2023-05-20 22:29:32 +10:00
|
|
|
move || {
|
|
|
|
if stdx::thread::IS_QOS_AVAILABLE {
|
|
|
|
debug_assert_eq!(
|
|
|
|
stdx::thread::get_current_thread_qos_class(),
|
|
|
|
Some(stdx::thread::QoSClass::Utility)
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
sender.send(task()).unwrap()
|
|
|
|
}
|
2020-06-25 15:35:42 +02:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2020-10-06 17:58:03 +02:00
|
|
|
pub(crate) fn spawn_with_sender<F>(&mut self, task: F)
|
|
|
|
where
|
|
|
|
F: FnOnce(Sender<T>) + Send + 'static,
|
|
|
|
T: Send + 'static,
|
|
|
|
{
|
|
|
|
self.inner.execute({
|
|
|
|
let sender = self.sender.clone();
|
|
|
|
move || task(sender)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2020-06-25 15:35:42 +02:00
|
|
|
pub(crate) fn len(&self) -> usize {
|
|
|
|
self.inner.queued_count()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T> Drop for TaskPool<T> {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
self.inner.join()
|
|
|
|
}
|
|
|
|
}
|