From 681c063ec02ce9fc6bdcd99b0b73f016a9839d59 Mon Sep 17 00:00:00 2001
From: Eric Holk <eholk@mozilla.com>
Date: Wed, 22 Jun 2011 15:44:47 -0700
Subject: [PATCH] Conservatively serialize nearly all upcalls. Successfuly ran
 make check with RUST_THREADS=8, so we're probably fairly safe now. In the
 future we can relax the synchronization to get better performance.

---
 src/rt/rust_builtin.cpp         |  5 +++
 src/rt/rust_chan.cpp            |  7 ++--
 src/rt/rust_dom.cpp             |  6 +--
 src/rt/rust_port.h              |  2 -
 src/rt/rust_task.cpp            | 30 +++++++++------
 src/rt/rust_upcall.cpp          | 68 ++++++++++++++++++++++-----------
 src/rt/sync/lock_and_signal.cpp |  4 +-
 7 files changed, 80 insertions(+), 42 deletions(-)

diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp
index 74b1075f701..27fe45e42d7 100644
--- a/src/rt/rust_builtin.cpp
+++ b/src/rt/rust_builtin.cpp
@@ -391,12 +391,17 @@ task_yield(rust_task *task) {
 
 extern "C" CDECL void
 task_join(rust_task *task, rust_task *join_task) {
+    task->dom->scheduler_lock.lock();
     // If the other task is already dying, we don't have to wait for it.
     if (join_task->dead() == false) {
         join_task->tasks_waiting_to_join.push(task);
         task->block(join_task, "joining local task");
+        task->dom->scheduler_lock.unlock();
         task->yield(2);
     }
+    else {
+        task->dom->scheduler_lock.unlock();
+    }
 }
 
 /* Debug builtins for std.dbg. */
diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp
index 4349794e658..cc03c227acd 100644
--- a/src/rt/rust_chan.cpp
+++ b/src/rt/rust_chan.cpp
@@ -10,6 +10,7 @@ rust_chan::rust_chan(rust_task *task,
                      task(task),
                      port(port),
                      buffer(task->dom, unit_sz) {
+    ++task->ref_count;
     if (port) {
         associate(port);
     }
@@ -23,6 +24,7 @@ rust_chan::~rust_chan() {
 
     A(task->dom, is_associated() == false,
       "Channel must be disassociated before being freed.");
+    --task->ref_count;
 }
 
 /**
@@ -31,10 +33,10 @@ rust_chan::~rust_chan() {
 void rust_chan::associate(maybe_proxy<rust_port> *port) {
     this->port = port;
     if (port->is_proxy() == false) {
-        scoped_lock sync(port->referent()->lock);
         LOG(task, task,
             "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
             this, port);
+        ++this->ref_count;
         this->port->referent()->chans.push(this);
     }
 }
@@ -50,10 +52,10 @@ void rust_chan::disassociate() {
     A(task->dom, is_associated(), "Channel must be associated with a port.");
 
     if (port->is_proxy() == false) {
-        scoped_lock sync(port->referent()->lock);
         LOG(task, task,
             "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
             this, port->referent());
+        --this->ref_count;
         port->referent()->chans.swap_delete(this);
     }
 
@@ -83,7 +85,6 @@ void rust_chan::send(void *sptr) {
         buffer.dequeue(NULL);
     } else {
         rust_port *target_port = port->referent();
-        scoped_lock sync(target_port->lock);
         if (target_port->task->blocked_on(target_port)) {
             DLOG(dom, comm, "dequeued in rendezvous_ptr");
             buffer.dequeue(target_port->task->rendezvous_ptr);
diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp
index ac258f5c973..5a2175f36a2 100644
--- a/src/rt/rust_dom.cpp
+++ b/src/rt/rust_dom.cpp
@@ -268,7 +268,7 @@ rust_dom::start_main_loop(int id) {
     scheduler_lock.lock();
 
     // Make sure someone is watching, to pull us out of infinite loops.
-    rust_timer timer(this);
+    //rust_timer timer(this);
 
     DLOG(this, dom, "started domain loop %d", id);
 
@@ -395,13 +395,13 @@ rust_dom::get_cache() {
 
 rust_task *
 rust_dom::create_task(rust_task *spawner, const char *name) {
-    scheduler_lock.lock();
+    //scheduler_lock.lock();
     rust_task *task =
         new (this) rust_task (this, &newborn_tasks, spawner, name);
     DLOG(this, task, "created task: " PTR ", spawner: %s, name: %s",
                         task, spawner ? spawner->name : "null", name);
     newborn_tasks.append(task);
-    scheduler_lock.unlock();
+    //scheduler_lock.unlock();
     return task;
 }
 
diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h
index 144a3b45cec..7a58f839c44 100644
--- a/src/rt/rust_port.h
+++ b/src/rt/rust_port.h
@@ -13,8 +13,6 @@ public:
     // Data sent to this port from remote tasks is buffered in this channel.
     rust_chan *remote_channel;
 
-    lock_and_signal lock;
-
     rust_port(rust_task *task, size_t unit_sz);
     ~rust_port();
     void log_state();
diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp
index c26e793b979..4b300c1a524 100644
--- a/src/rt/rust_task.cpp
+++ b/src/rt/rust_task.cpp
@@ -131,19 +131,24 @@ void task_start_wrapper(spawn_args *a)
     int rval = 42;
     
     a->f(&rval, task, a->a3, a->a4);
-
+    
     LOG(task, task, "task exited with value %d", rval);
 
-    // TODO: the old exit glue does some magical argument copying stuff. This
-    // is probably still needed.
+    {
+        scoped_lock with(task->dom->scheduler_lock);
+        
+        // TODO: the old exit glue does some magical argument copying
+        // stuff. This is probably still needed.
 
-    // This is duplicated from upcall_exit, which is probably dead code by
-    // now.
-    LOG(task, task, "task ref_count: %d", task->ref_count);
-    A(task->dom, task->ref_count >= 0,
-      "Task ref_count should not be negative on exit!");
-    task->die();
-    task->notify_tasks_waiting_to_join();
+        // This is duplicated from upcall_exit, which is probably dead code by
+        // now.
+        LOG(task, task, "task ref_count: %d", task->ref_count);
+        A(task->dom, task->ref_count >= 0,
+          "Task ref_count should not be negative on exit!");
+        task->die();
+        task->notify_tasks_waiting_to_join();
+
+    }
     task->yield(1);
 }
 
@@ -154,6 +159,9 @@ rust_task::start(uintptr_t spawnee_fn,
     LOGPTR(dom, "from spawnee", spawnee_fn);
 
     I(dom, stk->data != NULL);
+    I(dom, !dom->scheduler_lock.lock_held_by_current_thread());
+    
+    scoped_lock with(dom->scheduler_lock);
 
     char *sp = (char *)rust_sp;
 
@@ -405,7 +413,7 @@ rust_task::free(void *p, bool is_gc)
 
 void
 rust_task::transition(rust_task_list *src, rust_task_list *dst) {
-    scoped_lock sync(dom->scheduler_lock);
+    I(dom, dom->scheduler_lock.lock_held_by_current_thread());
     DLOG(dom, task,
          "task %s " PTR " state change '%s' -> '%s' while in '%s'",
          name, (uintptr_t)this, src->name, dst->name, state->name);
diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp
index ba29203ca1d..885f82ddac3 100644
--- a/src/rt/rust_upcall.cpp
+++ b/src/rt/rust_upcall.cpp
@@ -23,6 +23,7 @@ str_buf(rust_task *task, rust_str *s);
 
 extern "C" void
 upcall_grow_task(rust_task *task, size_t n_frame_bytes) {
+    I(task->dom, false);
     LOG_UPCALL_ENTRY(task);
     task->grow(n_frame_bytes);
 }
@@ -74,6 +75,7 @@ extern "C" CDECL rust_port*
 upcall_new_port(rust_task *task, size_t unit_sz) {
     LOG_UPCALL_ENTRY(task);
     rust_dom *dom = task->dom;
+    scoped_lock with(dom->scheduler_lock);
     LOG(task, comm, "upcall_new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)",
         (uintptr_t) task, task->name, unit_sz);
     return new (dom) rust_port(task, unit_sz);
@@ -82,6 +84,7 @@ upcall_new_port(rust_task *task, size_t unit_sz) {
 extern "C" CDECL void
 upcall_del_port(rust_task *task, rust_port *port) {
     LOG_UPCALL_ENTRY(task);
+    scoped_lock with(task->dom->scheduler_lock);
     LOG(task, comm, "upcall del_port(0x%" PRIxPTR ")", (uintptr_t) port);
     I(task->dom, !port->ref_count);
     delete port;
@@ -121,6 +124,7 @@ upcall_flush_chan(rust_task *task, rust_chan *chan) {
 extern "C" CDECL
 void upcall_del_chan(rust_task *task, rust_chan *chan) {
     LOG_UPCALL_ENTRY(task);
+    scoped_lock with(task->dom->scheduler_lock);
 
     LOG(task, comm, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
 
@@ -143,7 +147,7 @@ void upcall_del_chan(rust_task *task, rust_chan *chan) {
             //    here is that we can get ourselves in a deadlock if the
             //    parent task tries to join us.
             //
-            // 2. We can leave the channel in a "dormnat" state by not freeing
+            // 2. We can leave the channel in a "dormant" state by not freeing
             //    it and letting the receiver task delete it for us instead.
             if (chan->buffer.is_empty() == false) {
                 return;
@@ -162,6 +166,7 @@ extern "C" CDECL rust_chan *
 upcall_clone_chan(rust_task *task, maybe_proxy<rust_task> *target,
                   rust_chan *chan) {
     LOG_UPCALL_ENTRY(task);
+    scoped_lock with(task->dom->scheduler_lock);
     size_t unit_sz = chan->buffer.unit_sz;
     maybe_proxy<rust_port> *port = chan->port;
     rust_task *target_task = NULL;
@@ -203,28 +208,30 @@ upcall_sleep(rust_task *task, size_t time_in_us) {
 extern "C" CDECL void
 upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
     LOG_UPCALL_ENTRY(task);
+    scoped_lock with(task->dom->scheduler_lock);
     chan->send(sptr);
     LOG(task, comm, "=== sent data ===>");
 }
 
 extern "C" CDECL void
 upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
-    LOG_UPCALL_ENTRY(task);
-    LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR
-        ", size: 0x%" PRIxPTR ", chan_no: %d",
-        (uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
-        port->chans.length());
-
-    if (port->receive(dptr)) {
-        return;
-    }
-
-    // No data was buffered on any incoming channel, so block this task
-    // on the port. Remember the rendezvous location so that any sender
-    // task can write to it before waking up this task.
-
     {
-        scoped_lock sync(port->lock);
+        LOG_UPCALL_ENTRY(task);
+        scoped_lock with(task->dom->scheduler_lock);
+
+        LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR
+            ", size: 0x%" PRIxPTR ", chan_no: %d",
+            (uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
+            port->chans.length());
+
+        if (port->receive(dptr)) {
+            return;
+        }
+
+        // No data was buffered on any incoming channel, so block this task
+        // on the port. Remember the rendezvous location so that any sender
+        // task can write to it before waking up this task.
+
         LOG(task, comm, "<=== waiting for rendezvous data ===");
         task->rendezvous_ptr = dptr;
         task->block(port, "waiting for rendezvous data");
@@ -248,6 +255,7 @@ upcall_fail(rust_task *task,
 extern "C" CDECL void
 upcall_kill(rust_task *task, maybe_proxy<rust_task> *target) {
     LOG_UPCALL_ENTRY(task);
+    scoped_lock with(task->dom->scheduler_lock);
     if (target->is_proxy()) {
         notify_message::
         send(notify_message::KILL, "kill", task->get_handle(),
@@ -264,18 +272,22 @@ upcall_kill(rust_task *task, maybe_proxy<rust_task> *target) {
  */
 extern "C" CDECL void
 upcall_exit(rust_task *task) {
-    LOG_UPCALL_ENTRY(task);
-    LOG(task, task, "task ref_count: %d", task->ref_count);
-    A(task->dom, task->ref_count >= 0,
-      "Task ref_count should not be negative on exit!");
-    task->die();
-    task->notify_tasks_waiting_to_join();
+    {
+        LOG_UPCALL_ENTRY(task);
+        scoped_lock with(task->dom->scheduler_lock);
+        LOG(task, task, "task ref_count: %d", task->ref_count);
+        A(task->dom, task->ref_count >= 0,
+          "Task ref_count should not be negative on exit!");
+        task->die();
+        task->notify_tasks_waiting_to_join();
+    }
     task->yield(1);
 }
 
 extern "C" CDECL uintptr_t
 upcall_malloc(rust_task *task, size_t nbytes, type_desc *td) {
     LOG_UPCALL_ENTRY(task);
+    scoped_lock with(task->dom->scheduler_lock);
 
     LOG(task, mem,
                    "upcall malloc(%" PRIdPTR ", 0x%" PRIxPTR ")"
@@ -296,6 +308,7 @@ upcall_malloc(rust_task *task, size_t nbytes, type_desc *td) {
 extern "C" CDECL void
 upcall_free(rust_task *task, void* ptr, uintptr_t is_gc) {
     LOG_UPCALL_ENTRY(task);
+    scoped_lock with(task->dom->scheduler_lock);
     rust_dom *dom = task->dom;
     DLOG(dom, mem,
              "upcall free(0x%" PRIxPTR ", is_gc=%" PRIdPTR ")",
@@ -306,6 +319,7 @@ upcall_free(rust_task *task, void* ptr, uintptr_t is_gc) {
 extern "C" CDECL uintptr_t
 upcall_mark(rust_task *task, void* ptr) {
     LOG_UPCALL_ENTRY(task);
+    scoped_lock with(task->dom->scheduler_lock);
 
     rust_dom *dom = task->dom;
     if (ptr) {
@@ -336,6 +350,7 @@ rust_str *make_str(rust_task *task, char const *s, size_t fill) {
 extern "C" CDECL rust_str *
 upcall_new_str(rust_task *task, char const *s, size_t fill) {
     LOG_UPCALL_ENTRY(task);
+    scoped_lock with(task->dom->scheduler_lock);
     
     return make_str(task, s, fill);
 }
@@ -343,6 +358,7 @@ upcall_new_str(rust_task *task, char const *s, size_t fill) {
 extern "C" CDECL rust_str *
 upcall_dup_str(rust_task *task, rust_str *str) {
     LOG_UPCALL_ENTRY(task);
+    scoped_lock with(task->dom->scheduler_lock);
 
     return make_str(task, (char const *)str->data, str->fill);
 }
@@ -350,6 +366,7 @@ upcall_dup_str(rust_task *task, rust_str *str) {
 extern "C" CDECL rust_vec *
 upcall_new_vec(rust_task *task, size_t fill, type_desc *td) {
     LOG_UPCALL_ENTRY(task);
+    scoped_lock with(task->dom->scheduler_lock);
     rust_dom *dom = task->dom;
     DLOG(dom, mem, "upcall new_vec(%" PRIdPTR ")", fill);
     size_t alloc = next_power_of_two(sizeof(rust_vec) + fill);
@@ -454,6 +471,7 @@ upcall_vec_append(rust_task *task, type_desc *t, type_desc *elem_t,
                   rust_vec **dst_ptr, rust_vec *src, bool skip_null)
 {
     LOG_UPCALL_ENTRY(task);
+    scoped_lock with(task->dom->scheduler_lock);
     rust_vec *dst = *dst_ptr;
     uintptr_t need_copy;
     size_t n_src_bytes = skip_null ? src->fill - 1 : src->fill;
@@ -483,6 +501,7 @@ upcall_get_type_desc(rust_task *task,
                      size_t n_descs,
                      type_desc const **descs) {
     LOG_UPCALL_ENTRY(task);
+    scoped_lock with(task->dom->scheduler_lock);
     LOG(task, cache, "upcall get_type_desc with size=%" PRIdPTR
         ", align=%" PRIdPTR ", %" PRIdPTR " descs", size, align,
         n_descs);
@@ -496,6 +515,7 @@ extern "C" CDECL rust_task *
 upcall_new_task(rust_task *spawner, rust_vec *name) {
     // name is a rust string structure.
     LOG_UPCALL_ENTRY(spawner);
+    scoped_lock with(spawner->dom->scheduler_lock);
     rust_dom *dom = spawner->dom;
     rust_task *task = dom->create_task(spawner, (const char *)name->data);
     return task;
@@ -535,6 +555,7 @@ upcall_start_task(rust_task *spawner,
  */
 extern "C" CDECL maybe_proxy<rust_task> *
 upcall_new_thread(rust_task *task, const char *name) {
+    I(task->dom, false);
     LOG_UPCALL_ENTRY(task);
     rust_dom *parent_dom = task->dom;
     rust_kernel *kernel = parent_dom->kernel;
@@ -583,6 +604,7 @@ upcall_start_thread(rust_task *task,
                     rust_proxy<rust_task> *child_task_proxy,
                     uintptr_t spawnee_fn,
                     size_t callsz) {
+    I(task->dom, false);
     LOG_UPCALL_ENTRY(task);
 #if 0
     rust_dom *parenet_dom = task->dom;
@@ -615,6 +637,7 @@ extern "C" CDECL void
 upcall_ivec_resize(rust_task *task,
                    rust_ivec *v,
                    size_t newsz) {
+    scoped_lock with(task->dom->scheduler_lock);
     I(task->dom, !v->fill);
 
     size_t new_alloc = next_power_of_two(newsz);
@@ -633,6 +656,7 @@ extern "C" CDECL void
 upcall_ivec_spill(rust_task *task,
                   rust_ivec *v,
                   size_t newsz) {
+    scoped_lock with(task->dom->scheduler_lock);
     size_t new_alloc = next_power_of_two(newsz);
 
     rust_ivec_heap *heap_part = (rust_ivec_heap *)
diff --git a/src/rt/sync/lock_and_signal.cpp b/src/rt/sync/lock_and_signal.cpp
index 337a1efa4bc..6934a796521 100644
--- a/src/rt/sync/lock_and_signal.cpp
+++ b/src/rt/sync/lock_and_signal.cpp
@@ -21,7 +21,9 @@ lock_and_signal::lock_and_signal() {
 }
 
 #else
-lock_and_signal::lock_and_signal() {
+lock_and_signal::lock_and_signal() 
+    : _locked(false)
+{
     CHECKED(pthread_cond_init(&_cond, NULL));
     CHECKED(pthread_mutex_init(&_mutex, NULL));
 }