Add mandatory panic contexts to all threadpool tasks

This commit is contained in:
Lukas Wirth 2023-06-04 09:09:25 +02:00
parent 4fb1df6b7a
commit 2d0510e226
5 changed files with 174 additions and 138 deletions

View File

@ -104,13 +104,10 @@ pub(crate) fn on_no_retry<R>(
None => return self, None => return self,
}; };
self.global_state.task_pool.handle.spawn(ThreadIntent::Worker, { self.global_state.task_pool.handle.spawn(ThreadIntent::Worker, panic_context, {
let world = self.global_state.snapshot(); let world = self.global_state.snapshot();
move || { move || {
let result = panic::catch_unwind(move || { let result = panic::catch_unwind(move || f(world, params));
let _pctx = stdx::panic_context::enter(panic_context);
f(world, params)
});
match thread_result_to_response::<R>(req.id.clone(), result) { match thread_result_to_response::<R>(req.id.clone(), result) {
Ok(response) => Task::Response(response), Ok(response) => Task::Response(response),
Err(_) => Task::Response(lsp_server::Response::new_err( Err(_) => Task::Response(lsp_server::Response::new_err(
@ -178,13 +175,10 @@ fn on_with_thread_intent<R>(
None => return self, None => return self,
}; };
self.global_state.task_pool.handle.spawn(intent, { self.global_state.task_pool.handle.spawn(intent, panic_context, {
let world = self.global_state.snapshot(); let world = self.global_state.snapshot();
move || { move || {
let result = panic::catch_unwind(move || { let result = panic::catch_unwind(move || f(world, params));
let _pctx = stdx::panic_context::enter(panic_context);
f(world, params)
});
match thread_result_to_response::<R>(req.id.clone(), result) { match thread_result_to_response::<R>(req.id.clone(), result) {
Ok(response) => Task::Response(response), Ok(response) => Task::Response(response),
Err(_) => Task::Retry(req), Err(_) => Task::Retry(req),

View File

@ -291,11 +291,15 @@ fn run_flycheck(state: &mut GlobalState, vfs_path: VfsPath) -> bool {
} }
Ok(()) Ok(())
}; };
state.task_pool.handle.spawn_with_sender(stdx::thread::ThreadIntent::Worker, move |_| { state.task_pool.handle.spawn_with_sender(
if let Err(e) = std::panic::catch_unwind(task) { stdx::thread::ThreadIntent::Worker,
tracing::error!("flycheck task panicked: {e:?}") "flycheck",
} move |_| {
}); if let Err(e) = std::panic::catch_unwind(task) {
tracing::error!("flycheck task panicked: {e:?}")
}
},
);
true true
} else { } else {
false false

View File

@ -397,19 +397,25 @@ fn prime_caches(&mut self, cause: String) {
tracing::debug!(%cause, "will prime caches"); tracing::debug!(%cause, "will prime caches");
let num_worker_threads = self.config.prime_caches_num_threads(); let num_worker_threads = self.config.prime_caches_num_threads();
self.task_pool.handle.spawn_with_sender(stdx::thread::ThreadIntent::Worker, { self.task_pool.handle.spawn_with_sender(
let analysis = self.snapshot().analysis; stdx::thread::ThreadIntent::Worker,
move |sender| { "prime_caches",
sender.send(Task::PrimeCaches(PrimeCachesProgress::Begin)).unwrap(); {
let res = analysis.parallel_prime_caches(num_worker_threads, |progress| { let analysis = self.snapshot().analysis;
let report = PrimeCachesProgress::Report(progress); move |sender| {
sender.send(Task::PrimeCaches(report)).unwrap(); sender.send(Task::PrimeCaches(PrimeCachesProgress::Begin)).unwrap();
}); let res = analysis.parallel_prime_caches(num_worker_threads, |progress| {
sender let report = PrimeCachesProgress::Report(progress);
.send(Task::PrimeCaches(PrimeCachesProgress::End { cancelled: res.is_err() })) sender.send(Task::PrimeCaches(report)).unwrap();
.unwrap(); });
} sender
}); .send(Task::PrimeCaches(PrimeCachesProgress::End {
cancelled: res.is_err(),
}))
.unwrap();
}
},
);
} }
fn update_status_or_notify(&mut self) { fn update_status_or_notify(&mut self) {
@ -796,56 +802,62 @@ fn update_diagnostics(&mut self) {
// Diagnostics are triggered by the user typing // Diagnostics are triggered by the user typing
// so we run them on a latency sensitive thread. // so we run them on a latency sensitive thread.
self.task_pool.handle.spawn(stdx::thread::ThreadIntent::LatencySensitive, move || { self.task_pool.handle.spawn(
let _p = profile::span("publish_diagnostics"); stdx::thread::ThreadIntent::LatencySensitive,
let diagnostics = subscriptions "publish_diagnostics",
.into_iter() move || {
.filter_map(|file_id| { let _p = profile::span("publish_diagnostics");
let line_index = snapshot.file_line_index(file_id).ok()?; let diagnostics = subscriptions
Some(( .into_iter()
file_id, .filter_map(|file_id| {
line_index, let line_index = snapshot.file_line_index(file_id).ok()?;
snapshot Some((
.analysis file_id,
.diagnostics( line_index,
&snapshot.config.diagnostics(), snapshot
ide::AssistResolveStrategy::None, .analysis
file_id, .diagnostics(
) &snapshot.config.diagnostics(),
.ok()?, ide::AssistResolveStrategy::None,
)) file_id,
}) )
.map(|(file_id, line_index, it)| { .ok()?,
( ))
file_id, })
it.into_iter() .map(|(file_id, line_index, it)| {
.map(move |d| lsp_types::Diagnostic { (
range: crate::to_proto::range(&line_index, d.range), file_id,
severity: Some(crate::to_proto::diagnostic_severity(d.severity)), it.into_iter()
code: Some(lsp_types::NumberOrString::String( .map(move |d| lsp_types::Diagnostic {
d.code.as_str().to_string(), range: crate::to_proto::range(&line_index, d.range),
)), severity: Some(crate::to_proto::diagnostic_severity(
code_description: Some(lsp_types::CodeDescription { d.severity,
href: lsp_types::Url::parse(&format!( )),
"https://rust-analyzer.github.io/manual.html#{}", code: Some(lsp_types::NumberOrString::String(
d.code.as_str() d.code.as_str().to_string(),
)) )),
.unwrap(), code_description: Some(lsp_types::CodeDescription {
}), href: lsp_types::Url::parse(&format!(
source: Some("rust-analyzer".to_string()), "https://rust-analyzer.github.io/manual.html#{}",
message: d.message, d.code.as_str()
related_information: None, ))
tags: if d.unused { .unwrap(),
Some(vec![lsp_types::DiagnosticTag::UNNECESSARY]) }),
} else { source: Some("rust-analyzer".to_string()),
None message: d.message,
}, related_information: None,
data: None, tags: if d.unused {
}) Some(vec![lsp_types::DiagnosticTag::UNNECESSARY])
.collect::<Vec<_>>(), } else {
) None
}); },
Task::Diagnostics(diagnostics.collect()) data: None,
}); })
.collect::<Vec<_>>(),
)
});
Task::Diagnostics(diagnostics.collect())
},
);
} }
} }

View File

@ -185,7 +185,7 @@ pub(crate) fn current_status(&self) -> lsp_ext::ServerStatusParams {
pub(crate) fn fetch_workspaces(&mut self, cause: Cause) { pub(crate) fn fetch_workspaces(&mut self, cause: Cause) {
tracing::info!(%cause, "will fetch workspaces"); tracing::info!(%cause, "will fetch workspaces");
self.task_pool.handle.spawn_with_sender(ThreadIntent::Worker, { self.task_pool.handle.spawn_with_sender(ThreadIntent::Worker, "fetch_workspaces", {
let linked_projects = self.config.linked_projects(); let linked_projects = self.config.linked_projects();
let detached_files = self.config.detached_files().to_vec(); let detached_files = self.config.detached_files().to_vec();
let cargo_config = self.config.cargo(); let cargo_config = self.config.cargo();
@ -260,19 +260,25 @@ pub(crate) fn fetch_build_data(&mut self, cause: Cause) {
tracing::info!(%cause, "will fetch build data"); tracing::info!(%cause, "will fetch build data");
let workspaces = Arc::clone(&self.workspaces); let workspaces = Arc::clone(&self.workspaces);
let config = self.config.cargo(); let config = self.config.cargo();
self.task_pool.handle.spawn_with_sender(ThreadIntent::Worker, move |sender| { self.task_pool.handle.spawn_with_sender(
sender.send(Task::FetchBuildData(BuildDataProgress::Begin)).unwrap(); ThreadIntent::Worker,
"fetch_build_data",
move |sender| {
sender.send(Task::FetchBuildData(BuildDataProgress::Begin)).unwrap();
let progress = { let progress = {
let sender = sender.clone(); let sender = sender.clone();
move |msg| { move |msg| {
sender.send(Task::FetchBuildData(BuildDataProgress::Report(msg))).unwrap() sender.send(Task::FetchBuildData(BuildDataProgress::Report(msg))).unwrap()
} }
}; };
let res = ProjectWorkspace::run_all_build_scripts(&workspaces, &config, &progress); let res = ProjectWorkspace::run_all_build_scripts(&workspaces, &config, &progress);
sender.send(Task::FetchBuildData(BuildDataProgress::End((workspaces, res)))).unwrap(); sender
}); .send(Task::FetchBuildData(BuildDataProgress::End((workspaces, res))))
.unwrap();
},
);
} }
pub(crate) fn fetch_proc_macros(&mut self, cause: Cause, paths: Vec<ProcMacroPaths>) { pub(crate) fn fetch_proc_macros(&mut self, cause: Cause, paths: Vec<ProcMacroPaths>) {
@ -280,50 +286,54 @@ pub(crate) fn fetch_proc_macros(&mut self, cause: Cause, paths: Vec<ProcMacroPat
let dummy_replacements = self.config.dummy_replacements().clone(); let dummy_replacements = self.config.dummy_replacements().clone();
let proc_macro_clients = self.proc_macro_clients.clone(); let proc_macro_clients = self.proc_macro_clients.clone();
self.task_pool.handle.spawn_with_sender(ThreadIntent::Worker, move |sender| { self.task_pool.handle.spawn_with_sender(
sender.send(Task::LoadProcMacros(ProcMacroProgress::Begin)).unwrap(); ThreadIntent::Worker,
"fetch_proc_macros",
move |sender| {
sender.send(Task::LoadProcMacros(ProcMacroProgress::Begin)).unwrap();
let dummy_replacements = &dummy_replacements; let dummy_replacements = &dummy_replacements;
let progress = { let progress = {
let sender = sender.clone(); let sender = sender.clone();
&move |msg| { &move |msg| {
sender.send(Task::LoadProcMacros(ProcMacroProgress::Report(msg))).unwrap() sender.send(Task::LoadProcMacros(ProcMacroProgress::Report(msg))).unwrap()
}
};
let mut res = FxHashMap::default();
let chain = proc_macro_clients
.iter()
.map(|res| res.as_ref().map_err(|e| e.to_string()))
.chain(iter::repeat_with(|| Err("Proc macros servers are not running".into())));
for (client, paths) in chain.zip(paths) {
res.extend(paths.into_iter().map(move |(crate_id, res)| {
(
crate_id,
res.map_or_else(
|_| Err("proc macro crate is missing dylib".to_owned()),
|(crate_name, path)| {
progress(path.display().to_string());
client.as_ref().map_err(Clone::clone).and_then(|client| {
load_proc_macro(
client,
&path,
crate_name
.as_deref()
.and_then(|crate_name| {
dummy_replacements.get(crate_name).map(|v| &**v)
})
.unwrap_or_default(),
)
})
},
),
)
}));
} }
};
let mut res = FxHashMap::default(); sender.send(Task::LoadProcMacros(ProcMacroProgress::End(res))).unwrap();
let chain = proc_macro_clients },
.iter() );
.map(|res| res.as_ref().map_err(|e| e.to_string()))
.chain(iter::repeat_with(|| Err("Proc macros servers are not running".into())));
for (client, paths) in chain.zip(paths) {
res.extend(paths.into_iter().map(move |(crate_id, res)| {
(
crate_id,
res.map_or_else(
|_| Err("proc macro crate is missing dylib".to_owned()),
|(crate_name, path)| {
progress(path.display().to_string());
client.as_ref().map_err(Clone::clone).and_then(|client| {
load_proc_macro(
client,
&path,
crate_name
.as_deref()
.and_then(|crate_name| {
dummy_replacements.get(crate_name).map(|v| &**v)
})
.unwrap_or_default(),
)
})
},
),
)
}));
}
sender.send(Task::LoadProcMacros(ProcMacroProgress::End(res))).unwrap();
});
} }
pub(crate) fn set_proc_macros(&mut self, proc_macros: ProcMacros) { pub(crate) fn set_proc_macros(&mut self, proc_macros: ProcMacros) {

View File

@ -14,25 +14,41 @@ pub(crate) fn new_with_threads(sender: Sender<T>, threads: usize) -> TaskPool<T>
TaskPool { sender, pool: Pool::new(threads) } TaskPool { sender, pool: Pool::new(threads) }
} }
pub(crate) fn spawn<F>(&mut self, intent: ThreadIntent, task: F) pub(crate) fn spawn<F>(
where &mut self,
intent: ThreadIntent,
panic_context: impl Into<String>,
task: F,
) where
F: FnOnce() -> T + Send + 'static, F: FnOnce() -> T + Send + 'static,
T: Send + 'static, T: Send + 'static,
{ {
let panic_context = panic_context.into();
self.pool.spawn(intent, { self.pool.spawn(intent, {
let sender = self.sender.clone(); let sender = self.sender.clone();
move || sender.send(task()).unwrap() move || {
let _pctx = stdx::panic_context::enter(panic_context);
sender.send(task()).unwrap()
}
}) })
} }
pub(crate) fn spawn_with_sender<F>(&mut self, intent: ThreadIntent, task: F) pub(crate) fn spawn_with_sender<F>(
where &mut self,
intent: ThreadIntent,
panic_context: impl Into<String>,
task: F,
) where
F: FnOnce(Sender<T>) + Send + 'static, F: FnOnce(Sender<T>) + Send + 'static,
T: Send + 'static, T: Send + 'static,
{ {
let panic_context = panic_context.into();
self.pool.spawn(intent, { self.pool.spawn(intent, {
let sender = self.sender.clone(); let sender = self.sender.clone();
move || task(sender) move || {
let _pctx = stdx::panic_context::enter(panic_context);
task(sender)
}
}) })
} }