diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 74b1075f701..27fe45e42d7 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -391,12 +391,17 @@ task_yield(rust_task *task) { extern "C" CDECL void task_join(rust_task *task, rust_task *join_task) { + task->dom->scheduler_lock.lock(); // If the other task is already dying, we don't have to wait for it. if (join_task->dead() == false) { join_task->tasks_waiting_to_join.push(task); task->block(join_task, "joining local task"); + task->dom->scheduler_lock.unlock(); task->yield(2); } + else { + task->dom->scheduler_lock.unlock(); + } } /* Debug builtins for std.dbg. */ diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index 4349794e658..cc03c227acd 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -10,6 +10,7 @@ rust_chan::rust_chan(rust_task *task, task(task), port(port), buffer(task->dom, unit_sz) { + ++task->ref_count; if (port) { associate(port); } @@ -23,6 +24,7 @@ rust_chan::~rust_chan() { A(task->dom, is_associated() == false, "Channel must be disassociated before being freed."); + --task->ref_count; } /** @@ -31,10 +33,10 @@ rust_chan::~rust_chan() { void rust_chan::associate(maybe_proxy *port) { this->port = port; if (port->is_proxy() == false) { - scoped_lock sync(port->referent()->lock); LOG(task, task, "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, this, port); + ++this->ref_count; this->port->referent()->chans.push(this); } } @@ -50,10 +52,10 @@ void rust_chan::disassociate() { A(task->dom, is_associated(), "Channel must be associated with a port."); if (port->is_proxy() == false) { - scoped_lock sync(port->referent()->lock); LOG(task, task, "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, this, port->referent()); + --this->ref_count; port->referent()->chans.swap_delete(this); } @@ -83,7 +85,6 @@ void rust_chan::send(void *sptr) { buffer.dequeue(NULL); } else { rust_port *target_port = port->referent(); - scoped_lock sync(target_port->lock); if (target_port->task->blocked_on(target_port)) { DLOG(dom, comm, "dequeued in rendezvous_ptr"); buffer.dequeue(target_port->task->rendezvous_ptr); diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp index ac258f5c973..5a2175f36a2 100644 --- a/src/rt/rust_dom.cpp +++ b/src/rt/rust_dom.cpp @@ -268,7 +268,7 @@ rust_dom::start_main_loop(int id) { scheduler_lock.lock(); // Make sure someone is watching, to pull us out of infinite loops. - rust_timer timer(this); + //rust_timer timer(this); DLOG(this, dom, "started domain loop %d", id); @@ -395,13 +395,13 @@ rust_dom::get_cache() { rust_task * rust_dom::create_task(rust_task *spawner, const char *name) { - scheduler_lock.lock(); + //scheduler_lock.lock(); rust_task *task = new (this) rust_task (this, &newborn_tasks, spawner, name); DLOG(this, task, "created task: " PTR ", spawner: %s, name: %s", task, spawner ? spawner->name : "null", name); newborn_tasks.append(task); - scheduler_lock.unlock(); + //scheduler_lock.unlock(); return task; } diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h index 144a3b45cec..7a58f839c44 100644 --- a/src/rt/rust_port.h +++ b/src/rt/rust_port.h @@ -13,8 +13,6 @@ public: // Data sent to this port from remote tasks is buffered in this channel. rust_chan *remote_channel; - lock_and_signal lock; - rust_port(rust_task *task, size_t unit_sz); ~rust_port(); void log_state(); diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index c26e793b979..4b300c1a524 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -131,19 +131,24 @@ void task_start_wrapper(spawn_args *a) int rval = 42; a->f(&rval, task, a->a3, a->a4); - + LOG(task, task, "task exited with value %d", rval); - // TODO: the old exit glue does some magical argument copying stuff. This - // is probably still needed. + { + scoped_lock with(task->dom->scheduler_lock); + + // TODO: the old exit glue does some magical argument copying + // stuff. This is probably still needed. - // This is duplicated from upcall_exit, which is probably dead code by - // now. - LOG(task, task, "task ref_count: %d", task->ref_count); - A(task->dom, task->ref_count >= 0, - "Task ref_count should not be negative on exit!"); - task->die(); - task->notify_tasks_waiting_to_join(); + // This is duplicated from upcall_exit, which is probably dead code by + // now. + LOG(task, task, "task ref_count: %d", task->ref_count); + A(task->dom, task->ref_count >= 0, + "Task ref_count should not be negative on exit!"); + task->die(); + task->notify_tasks_waiting_to_join(); + + } task->yield(1); } @@ -154,6 +159,9 @@ rust_task::start(uintptr_t spawnee_fn, LOGPTR(dom, "from spawnee", spawnee_fn); I(dom, stk->data != NULL); + I(dom, !dom->scheduler_lock.lock_held_by_current_thread()); + + scoped_lock with(dom->scheduler_lock); char *sp = (char *)rust_sp; @@ -405,7 +413,7 @@ rust_task::free(void *p, bool is_gc) void rust_task::transition(rust_task_list *src, rust_task_list *dst) { - scoped_lock sync(dom->scheduler_lock); + I(dom, dom->scheduler_lock.lock_held_by_current_thread()); DLOG(dom, task, "task %s " PTR " state change '%s' -> '%s' while in '%s'", name, (uintptr_t)this, src->name, dst->name, state->name); diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index ba29203ca1d..885f82ddac3 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -23,6 +23,7 @@ str_buf(rust_task *task, rust_str *s); extern "C" void upcall_grow_task(rust_task *task, size_t n_frame_bytes) { + I(task->dom, false); LOG_UPCALL_ENTRY(task); task->grow(n_frame_bytes); } @@ -74,6 +75,7 @@ extern "C" CDECL rust_port* upcall_new_port(rust_task *task, size_t unit_sz) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; + scoped_lock with(dom->scheduler_lock); LOG(task, comm, "upcall_new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)", (uintptr_t) task, task->name, unit_sz); return new (dom) rust_port(task, unit_sz); @@ -82,6 +84,7 @@ upcall_new_port(rust_task *task, size_t unit_sz) { extern "C" CDECL void upcall_del_port(rust_task *task, rust_port *port) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); LOG(task, comm, "upcall del_port(0x%" PRIxPTR ")", (uintptr_t) port); I(task->dom, !port->ref_count); delete port; @@ -121,6 +124,7 @@ upcall_flush_chan(rust_task *task, rust_chan *chan) { extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); LOG(task, comm, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); @@ -143,7 +147,7 @@ void upcall_del_chan(rust_task *task, rust_chan *chan) { // here is that we can get ourselves in a deadlock if the // parent task tries to join us. // - // 2. We can leave the channel in a "dormnat" state by not freeing + // 2. We can leave the channel in a "dormant" state by not freeing // it and letting the receiver task delete it for us instead. if (chan->buffer.is_empty() == false) { return; @@ -162,6 +166,7 @@ extern "C" CDECL rust_chan * upcall_clone_chan(rust_task *task, maybe_proxy *target, rust_chan *chan) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); size_t unit_sz = chan->buffer.unit_sz; maybe_proxy *port = chan->port; rust_task *target_task = NULL; @@ -203,28 +208,30 @@ upcall_sleep(rust_task *task, size_t time_in_us) { extern "C" CDECL void upcall_send(rust_task *task, rust_chan *chan, void *sptr) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); chan->send(sptr); LOG(task, comm, "=== sent data ===>"); } extern "C" CDECL void upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { - LOG_UPCALL_ENTRY(task); - LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR - ", size: 0x%" PRIxPTR ", chan_no: %d", - (uintptr_t) port, (uintptr_t) dptr, port->unit_sz, - port->chans.length()); - - if (port->receive(dptr)) { - return; - } - - // No data was buffered on any incoming channel, so block this task - // on the port. Remember the rendezvous location so that any sender - // task can write to it before waking up this task. - { - scoped_lock sync(port->lock); + LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); + + LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR + ", size: 0x%" PRIxPTR ", chan_no: %d", + (uintptr_t) port, (uintptr_t) dptr, port->unit_sz, + port->chans.length()); + + if (port->receive(dptr)) { + return; + } + + // No data was buffered on any incoming channel, so block this task + // on the port. Remember the rendezvous location so that any sender + // task can write to it before waking up this task. + LOG(task, comm, "<=== waiting for rendezvous data ==="); task->rendezvous_ptr = dptr; task->block(port, "waiting for rendezvous data"); @@ -248,6 +255,7 @@ upcall_fail(rust_task *task, extern "C" CDECL void upcall_kill(rust_task *task, maybe_proxy *target) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); if (target->is_proxy()) { notify_message:: send(notify_message::KILL, "kill", task->get_handle(), @@ -264,18 +272,22 @@ upcall_kill(rust_task *task, maybe_proxy *target) { */ extern "C" CDECL void upcall_exit(rust_task *task) { - LOG_UPCALL_ENTRY(task); - LOG(task, task, "task ref_count: %d", task->ref_count); - A(task->dom, task->ref_count >= 0, - "Task ref_count should not be negative on exit!"); - task->die(); - task->notify_tasks_waiting_to_join(); + { + LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); + LOG(task, task, "task ref_count: %d", task->ref_count); + A(task->dom, task->ref_count >= 0, + "Task ref_count should not be negative on exit!"); + task->die(); + task->notify_tasks_waiting_to_join(); + } task->yield(1); } extern "C" CDECL uintptr_t upcall_malloc(rust_task *task, size_t nbytes, type_desc *td) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); LOG(task, mem, "upcall malloc(%" PRIdPTR ", 0x%" PRIxPTR ")" @@ -296,6 +308,7 @@ upcall_malloc(rust_task *task, size_t nbytes, type_desc *td) { extern "C" CDECL void upcall_free(rust_task *task, void* ptr, uintptr_t is_gc) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); rust_dom *dom = task->dom; DLOG(dom, mem, "upcall free(0x%" PRIxPTR ", is_gc=%" PRIdPTR ")", @@ -306,6 +319,7 @@ upcall_free(rust_task *task, void* ptr, uintptr_t is_gc) { extern "C" CDECL uintptr_t upcall_mark(rust_task *task, void* ptr) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); rust_dom *dom = task->dom; if (ptr) { @@ -336,6 +350,7 @@ rust_str *make_str(rust_task *task, char const *s, size_t fill) { extern "C" CDECL rust_str * upcall_new_str(rust_task *task, char const *s, size_t fill) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); return make_str(task, s, fill); } @@ -343,6 +358,7 @@ upcall_new_str(rust_task *task, char const *s, size_t fill) { extern "C" CDECL rust_str * upcall_dup_str(rust_task *task, rust_str *str) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); return make_str(task, (char const *)str->data, str->fill); } @@ -350,6 +366,7 @@ upcall_dup_str(rust_task *task, rust_str *str) { extern "C" CDECL rust_vec * upcall_new_vec(rust_task *task, size_t fill, type_desc *td) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); rust_dom *dom = task->dom; DLOG(dom, mem, "upcall new_vec(%" PRIdPTR ")", fill); size_t alloc = next_power_of_two(sizeof(rust_vec) + fill); @@ -454,6 +471,7 @@ upcall_vec_append(rust_task *task, type_desc *t, type_desc *elem_t, rust_vec **dst_ptr, rust_vec *src, bool skip_null) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); rust_vec *dst = *dst_ptr; uintptr_t need_copy; size_t n_src_bytes = skip_null ? src->fill - 1 : src->fill; @@ -483,6 +501,7 @@ upcall_get_type_desc(rust_task *task, size_t n_descs, type_desc const **descs) { LOG_UPCALL_ENTRY(task); + scoped_lock with(task->dom->scheduler_lock); LOG(task, cache, "upcall get_type_desc with size=%" PRIdPTR ", align=%" PRIdPTR ", %" PRIdPTR " descs", size, align, n_descs); @@ -496,6 +515,7 @@ extern "C" CDECL rust_task * upcall_new_task(rust_task *spawner, rust_vec *name) { // name is a rust string structure. LOG_UPCALL_ENTRY(spawner); + scoped_lock with(spawner->dom->scheduler_lock); rust_dom *dom = spawner->dom; rust_task *task = dom->create_task(spawner, (const char *)name->data); return task; @@ -535,6 +555,7 @@ upcall_start_task(rust_task *spawner, */ extern "C" CDECL maybe_proxy * upcall_new_thread(rust_task *task, const char *name) { + I(task->dom, false); LOG_UPCALL_ENTRY(task); rust_dom *parent_dom = task->dom; rust_kernel *kernel = parent_dom->kernel; @@ -583,6 +604,7 @@ upcall_start_thread(rust_task *task, rust_proxy *child_task_proxy, uintptr_t spawnee_fn, size_t callsz) { + I(task->dom, false); LOG_UPCALL_ENTRY(task); #if 0 rust_dom *parenet_dom = task->dom; @@ -615,6 +637,7 @@ extern "C" CDECL void upcall_ivec_resize(rust_task *task, rust_ivec *v, size_t newsz) { + scoped_lock with(task->dom->scheduler_lock); I(task->dom, !v->fill); size_t new_alloc = next_power_of_two(newsz); @@ -633,6 +656,7 @@ extern "C" CDECL void upcall_ivec_spill(rust_task *task, rust_ivec *v, size_t newsz) { + scoped_lock with(task->dom->scheduler_lock); size_t new_alloc = next_power_of_two(newsz); rust_ivec_heap *heap_part = (rust_ivec_heap *) diff --git a/src/rt/sync/lock_and_signal.cpp b/src/rt/sync/lock_and_signal.cpp index 337a1efa4bc..6934a796521 100644 --- a/src/rt/sync/lock_and_signal.cpp +++ b/src/rt/sync/lock_and_signal.cpp @@ -21,7 +21,9 @@ lock_and_signal::lock_and_signal() { } #else -lock_and_signal::lock_and_signal() { +lock_and_signal::lock_and_signal() + : _locked(false) +{ CHECKED(pthread_cond_init(&_cond, NULL)); CHECKED(pthread_mutex_init(&_mutex, NULL)); }