From 326ffcefe09906560a03d3184a2ce76841448702 Mon Sep 17 00:00:00 2001
From: Aleksey Kladov <aleksey.kladov@gmail.com>
Date: Sat, 8 Sep 2018 12:36:02 +0300
Subject: [PATCH] Deal with deadlocks in a more principaled way

---
 crates/server/src/lib.rs                   |  3 +--
 crates/server/src/main_loop/mod.rs         |  4 ++--
 crates/server/src/project_model.rs         | 15 +++++++--------
 crates/server/src/thread_watcher.rs        | 10 ++++++++++
 crates/server/src/vfs.rs                   | 16 ++++++++--------
 crates/server/tests/heavy_tests/support.rs | 18 +++++++++++-------
 6 files changed, 39 insertions(+), 27 deletions(-)

diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs
index 9e094af1092..c8aebc59c62 100644
--- a/crates/server/src/lib.rs
+++ b/crates/server/src/lib.rs
@@ -30,9 +30,8 @@ mod vfs;
 mod path_map;
 mod server_world;
 mod project_model;
-mod thread_watcher;
+pub mod thread_watcher;
 
 pub type Result<T> = ::std::result::Result<T, ::failure::Error>;
 pub use caps::server_capabilities;
 pub use main_loop::main_loop;
-
diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs
index 2ef1e2d17bc..b7f5efbb158 100644
--- a/crates/server/src/main_loop/mod.rs
+++ b/crates/server/src/main_loop/mod.rs
@@ -43,8 +43,8 @@ pub fn main_loop(
         .build()
         .unwrap();
     let (task_sender, task_receiver) = unbounded::<Task>();
-    let (fs_sender, fs_receiver, fs_watcher) = vfs::roots_loader();
-    let (ws_sender, ws_receiver, ws_watcher) = workspace_loader();
+    let ((fs_sender, fs_receiver), fs_watcher) = vfs::roots_loader();
+    let ((ws_sender, ws_receiver), ws_watcher) = workspace_loader();
 
     info!("server initialized, serving requests");
     let mut state = ServerWorldState::new();
diff --git a/crates/server/src/project_model.rs b/crates/server/src/project_model.rs
index a712106d91e..b9d6872c8a4 100644
--- a/crates/server/src/project_model.rs
+++ b/crates/server/src/project_model.rs
@@ -3,12 +3,12 @@ use std::{
     path::{Path, PathBuf},
 };
 use cargo_metadata::{metadata_run, CargoOpt};
-use crossbeam_channel::{bounded, Sender, Receiver};
+use crossbeam_channel::{Sender, Receiver};
 use libsyntax2::SmolStr;
 
 use {
     Result,
-    thread_watcher::ThreadWatcher,
+    thread_watcher::{ThreadWatcher, worker_chan},
 };
 
 #[derive(Debug, Clone)]
@@ -162,15 +162,14 @@ impl TargetKind {
     }
 }
 
-pub fn workspace_loader() -> (Sender<PathBuf>, Receiver<Result<CargoWorkspace>>, ThreadWatcher) {
-    let (path_sender, path_receiver) = bounded::<PathBuf>(16);
-    let (ws_sender, ws_receiver) = bounded::<Result<CargoWorkspace>>(1);
+pub fn workspace_loader() -> ((Sender<PathBuf>, Receiver<Result<CargoWorkspace>>), ThreadWatcher) {
+    let (interface, input_receiver, output_sender) = worker_chan::<PathBuf, Result<CargoWorkspace>>(1);
     let thread = ThreadWatcher::spawn("workspace loader", move || {
-        path_receiver
+        input_receiver
             .into_iter()
             .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path()))
-            .for_each(|it| ws_sender.send(it))
+            .for_each(|it| output_sender.send(it))
     });
 
-    (path_sender, ws_receiver, thread)
+    (interface, thread)
 }
diff --git a/crates/server/src/thread_watcher.rs b/crates/server/src/thread_watcher.rs
index 98bcdfd6c2f..74a0a58b75a 100644
--- a/crates/server/src/thread_watcher.rs
+++ b/crates/server/src/thread_watcher.rs
@@ -1,4 +1,5 @@
 use std::thread;
+use crossbeam_channel::{bounded, unbounded, Sender, Receiver};
 use drop_bomb::DropBomb;
 use Result;
 
@@ -31,3 +32,12 @@ impl ThreadWatcher {
         res
     }
 }
+
+/// Sets up worker channels in a deadlock-avoind way.
+/// If one sets both input and output buffers to a fixed size,
+/// a worker might get stuck.
+pub fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) {
+    let (input_sender, input_receiver) = bounded::<I>(buf);
+    let (output_sender, output_receiver) = unbounded::<O>();
+    ((input_sender, output_receiver), input_receiver, output_sender)
+}
diff --git a/crates/server/src/vfs.rs b/crates/server/src/vfs.rs
index 2699fc21e37..c228f0b0a1b 100644
--- a/crates/server/src/vfs.rs
+++ b/crates/server/src/vfs.rs
@@ -3,11 +3,11 @@ use std::{
     fs,
 };
 
-use crossbeam_channel::{Sender, Receiver, unbounded};
+use crossbeam_channel::{Sender, Receiver};
 use walkdir::WalkDir;
 
 use {
-    thread_watcher::ThreadWatcher,
+    thread_watcher::{ThreadWatcher, worker_chan},
 };
 
 
@@ -22,11 +22,11 @@ pub enum FileEventKind {
     Add(String),
 }
 
-pub fn roots_loader() -> (Sender<PathBuf>, Receiver<(PathBuf, Vec<FileEvent>)>, ThreadWatcher) {
-    let (path_sender, path_receiver) = unbounded::<PathBuf>();
-    let (event_sender, event_receiver) = unbounded::<(PathBuf, Vec<FileEvent>)>();
+pub fn roots_loader() -> ((Sender<PathBuf>, Receiver<(PathBuf, Vec<FileEvent>)>), ThreadWatcher) {
+    let (interface, input_receiver, output_sender) =
+        worker_chan::<PathBuf, (PathBuf, Vec<FileEvent>)>(128);
     let thread = ThreadWatcher::spawn("roots loader", move || {
-        path_receiver
+        input_receiver
             .into_iter()
             .map(|path| {
                 debug!("loading {} ...", path.as_path().display());
@@ -34,10 +34,10 @@ pub fn roots_loader() -> (Sender<PathBuf>, Receiver<(PathBuf, Vec<FileEvent>)>,
                 debug!("... loaded {}", path.as_path().display());
                 (path, events)
             })
-            .for_each(|it| event_sender.send(it))
+            .for_each(|it| output_sender.send(it))
     });
 
-    (path_sender, event_receiver, thread)
+    (interface, thread)
 }
 
 fn load_root(path: &Path) -> Vec<FileEvent> {
diff --git a/crates/server/tests/heavy_tests/support.rs b/crates/server/tests/heavy_tests/support.rs
index 297dcd9ae22..2710ab59bdc 100644
--- a/crates/server/tests/heavy_tests/support.rs
+++ b/crates/server/tests/heavy_tests/support.rs
@@ -8,7 +8,7 @@ use std::{
 };
 
 use tempdir::TempDir;
-use crossbeam_channel::{unbounded, after, Sender, Receiver};
+use crossbeam_channel::{after, Sender, Receiver};
 use flexi_logger::Logger;
 use languageserver_types::{
     Url,
@@ -22,7 +22,7 @@ use serde::Serialize;
 use serde_json::{Value, from_str, to_string_pretty};
 use gen_lsp_server::{RawMessage, RawRequest, RawNotification};
 
-use m::{Result, main_loop, req};
+use m::{Result, main_loop, req, thread_watcher::worker_chan};
 
 pub fn project(fixture: &str) -> Server {
     static INIT: Once = Once::new();
@@ -69,15 +69,19 @@ pub struct Server {
 impl Server {
     fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server {
         let path = dir.path().to_path_buf();
-        let (client_sender, mut server_receiver) = unbounded();
-        let (mut server_sender, client_receiver) = unbounded();
-        let server = thread::spawn(move || main_loop(true, path, &mut server_receiver, &mut server_sender));
+        let ((msg_sender, msg_receiver), server) = {
+            let (api, mut msg_receiver, mut msg_sender) = worker_chan::<RawMessage, RawMessage>(128);
+            let server = thread::spawn(move || {
+                main_loop(true, path, &mut msg_receiver, &mut msg_sender)
+            });
+            (api, server)
+        };
         let res = Server {
             req_id: Cell::new(1),
             dir,
             messages: Default::default(),
-            sender: Some(client_sender),
-            receiver: client_receiver,
+            sender: Some(msg_sender),
+            receiver: msg_receiver,
             server: Some(server),
         };