9535: internal: remove proc macro management thread r=jonas-schievink a=jonas-schievink

Communication with the proc macro server process has always happened one request at a time, so the additional thread isn't really needed (it just forwarded each request, and sent back the response). This removes some indirection that was a bit hard to understand (a channel was allocated and sent over another channel to return the response).

Hope I'm not missing anything here

Co-authored-by: Jonas Schievink <jonasschievink@gmail.com>
This commit is contained in:
bors[bot] 2021-07-12 13:23:26 +00:00 committed by GitHub
commit 091dbfe637
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 80 deletions

View File

@ -15,19 +15,19 @@ use std::{
ffi::OsStr,
io,
path::{Path, PathBuf},
sync::Arc,
sync::{Arc, Mutex},
};
use tt::{SmolStr, Subtree};
use crate::process::{ProcMacroProcessSrv, ProcMacroProcessThread};
use crate::process::ProcMacroProcessSrv;
pub use rpc::{ExpansionResult, ExpansionTask, ListMacrosResult, ListMacrosTask, ProcMacroKind};
pub use version::{read_dylib_info, RustCInfo};
#[derive(Debug, Clone)]
struct ProcMacroProcessExpander {
process: Arc<ProcMacroProcessSrv>,
process: Arc<Mutex<ProcMacroProcessSrv>>,
dylib_path: PathBuf,
name: SmolStr,
}
@ -56,24 +56,34 @@ impl base_db::ProcMacroExpander for ProcMacroProcessExpander {
env: env.iter().map(|(k, v)| (k.to_string(), v.to_string())).collect(),
};
let result: ExpansionResult = self.process.send_task(msg::Request::ExpansionMacro(task))?;
let result: ExpansionResult = self
.process
.lock()
.unwrap_or_else(|e| e.into_inner())
.send_task(msg::Request::ExpansionMacro(task))?;
Ok(result.expansion)
}
}
#[derive(Debug)]
pub struct ProcMacroClient {
process: Arc<ProcMacroProcessSrv>,
thread: ProcMacroProcessThread,
/// Currently, the proc macro process expands all procedural macros sequentially.
///
/// That means that concurrent salsa requests may block each other when expanding proc macros,
/// which is unfortunate, but simple and good enough for the time being.
///
/// Therefore, we just wrap the `ProcMacroProcessSrv` in a mutex here.
process: Arc<Mutex<ProcMacroProcessSrv>>,
}
impl ProcMacroClient {
/// Spawns an external process as the proc macro server and returns a client connected to it.
pub fn extern_process(
process_path: PathBuf,
args: impl IntoIterator<Item = impl AsRef<OsStr>>,
) -> io::Result<ProcMacroClient> {
let (thread, process) = ProcMacroProcessSrv::run(process_path, args)?;
Ok(ProcMacroClient { process: Arc::new(process), thread })
let process = ProcMacroProcessSrv::run(process_path, args)?;
Ok(ProcMacroClient { process: Arc::new(Mutex::new(process)) })
}
pub fn by_dylib_path(&self, dylib_path: &Path) -> Vec<ProcMacro> {
@ -93,7 +103,12 @@ impl ProcMacroClient {
}
}
let macros = match self.process.find_proc_macros(dylib_path) {
let macros = match self
.process
.lock()
.unwrap_or_else(|e| e.into_inner())
.find_proc_macros(dylib_path)
{
Err(err) => {
eprintln!("Failed to find proc macros. Error: {:#?}", err);
return vec![];

View File

@ -5,11 +5,9 @@ use std::{
ffi::{OsStr, OsString},
io::{self, BufRead, BufReader, Write},
path::{Path, PathBuf},
process::{Child, Command, Stdio},
sync::{Arc, Weak},
process::{Child, ChildStdin, ChildStdout, Command, Stdio},
};
use crossbeam_channel::{bounded, Receiver, Sender};
use stdx::JodChild;
use crate::{
@ -17,42 +15,28 @@ use crate::{
rpc::{ListMacrosResult, ListMacrosTask, ProcMacroKind},
};
#[derive(Debug, Default)]
pub(crate) struct ProcMacroProcessSrv {
inner: Weak<Sender<Task>>,
}
#[derive(Debug)]
pub(crate) struct ProcMacroProcessThread {
// XXX: drop order is significant
sender: Arc<Sender<Task>>,
handle: jod_thread::JoinHandle<()>,
pub(crate) struct ProcMacroProcessSrv {
process: Process,
stdin: ChildStdin,
stdout: BufReader<ChildStdout>,
}
impl ProcMacroProcessSrv {
pub(crate) fn run(
process_path: PathBuf,
args: impl IntoIterator<Item = impl AsRef<OsStr>>,
) -> io::Result<(ProcMacroProcessThread, ProcMacroProcessSrv)> {
let process = Process::run(process_path, args)?;
) -> io::Result<ProcMacroProcessSrv> {
let mut process = Process::run(process_path, args)?;
let (stdin, stdout) = process.stdio().expect("couldn't access child stdio");
let (task_tx, task_rx) = bounded(0);
let handle = jod_thread::Builder::new()
.name("ProcMacroClient".to_owned())
.spawn(move || {
client_loop(task_rx, process);
})
.expect("failed to spawn thread");
let srv = ProcMacroProcessSrv { process, stdin, stdout };
let task_tx = Arc::new(task_tx);
let srv = ProcMacroProcessSrv { inner: Arc::downgrade(&task_tx) };
let thread = ProcMacroProcessThread { handle, sender: task_tx };
Ok((thread, srv))
Ok(srv)
}
pub(crate) fn find_proc_macros(
&self,
&mut self,
dylib_path: &Path,
) -> Result<Vec<(String, ProcMacroKind)>, tt::ExpansionError> {
let task = ListMacrosTask { lib: dylib_path.to_path_buf() };
@ -61,22 +45,27 @@ impl ProcMacroProcessSrv {
Ok(result.macros)
}
pub(crate) fn send_task<R>(&self, req: Request) -> Result<R, tt::ExpansionError>
pub(crate) fn send_task<R>(&mut self, req: Request) -> Result<R, tt::ExpansionError>
where
R: TryFrom<Response, Error = &'static str>,
{
let (result_tx, result_rx) = bounded(0);
let sender = match self.inner.upgrade() {
None => return Err(tt::ExpansionError::Unknown("proc macro process is closed".into())),
Some(it) => it,
let mut buf = String::new();
let res = match send_request(&mut self.stdin, &mut self.stdout, req, &mut buf) {
Ok(res) => res,
Err(err) => {
let result = self.process.child.try_wait();
log::error!(
"proc macro server crashed, server process state: {:?}, server request error: {:?}",
result,
err
);
let res = Response::Error(ResponseError {
code: ErrorCode::ServerErrorEnd,
message: "proc macro server crashed".into(),
});
Some(res)
}
};
sender
.send(Task { req, result_tx })
.map_err(|_| tt::ExpansionError::Unknown("proc macro server crashed".into()))?;
let res = result_rx
.recv()
.map_err(|_| tt::ExpansionError::Unknown("proc macro server crashed".into()))?;
match res {
Some(Response::Error(err)) => Err(tt::ExpansionError::ExpansionError(err.message)),
@ -88,37 +77,7 @@ impl ProcMacroProcessSrv {
}
}
fn client_loop(task_rx: Receiver<Task>, mut process: Process) {
let (mut stdin, mut stdout) = process.stdio().expect("couldn't access child stdio");
let mut buf = String::new();
for Task { req, result_tx } in task_rx {
match send_request(&mut stdin, &mut stdout, req, &mut buf) {
Ok(res) => result_tx.send(res).unwrap(),
Err(err) => {
log::error!(
"proc macro server crashed, server process state: {:?}, server request error: {:?}",
process.child.try_wait(),
err
);
let res = Response::Error(ResponseError {
code: ErrorCode::ServerErrorEnd,
message: "proc macro server crashed".into(),
});
result_tx.send(res.into()).unwrap();
// Exit the thread.
break;
}
}
}
}
struct Task {
req: Request,
result_tx: Sender<Option<Response>>,
}
#[derive(Debug)]
struct Process {
child: JodChild,
}
@ -133,7 +92,7 @@ impl Process {
Ok(Process { child })
}
fn stdio(&mut self) -> Option<(impl Write, impl BufRead)> {
fn stdio(&mut self) -> Option<(ChildStdin, BufReader<ChildStdout>)> {
let stdin = self.child.stdin.take()?;
let stdout = self.child.stdout.take()?;
let read = BufReader::new(stdout);

View File

@ -111,6 +111,7 @@ pub fn defer<F: FnOnce()>(f: F) -> impl Drop {
}
#[cfg_attr(not(target_arch = "wasm32"), repr(transparent))]
#[derive(Debug)]
pub struct JodChild(pub std::process::Child);
impl ops::Deref for JodChild {