diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index 1de1685692d..2a491b61150 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -111,7 +111,7 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { DLOG(dom, dom, "Using %d worker threads.", num_threads); - int ret = dom->start_main_loops(num_threads); + int ret = kernel->start_task_threads(num_threads); delete args; delete kernel; delete srv; diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 27fe45e42d7..2a724ee1791 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -391,16 +391,16 @@ task_yield(rust_task *task) { extern "C" CDECL void task_join(rust_task *task, rust_task *join_task) { - task->dom->scheduler_lock.lock(); + task->kernel->scheduler_lock.lock(); // If the other task is already dying, we don't have to wait for it. if (join_task->dead() == false) { join_task->tasks_waiting_to_join.push(task); task->block(join_task, "joining local task"); - task->dom->scheduler_lock.unlock(); + task->kernel->scheduler_lock.unlock(); task->yield(2); } else { - task->dom->scheduler_lock.unlock(); + task->kernel->scheduler_lock.unlock(); } } diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp index c9c6b56c09e..65ccf158b64 100644 --- a/src/rt/rust_dom.cpp +++ b/src/rt/rust_dom.cpp @@ -51,9 +51,9 @@ rust_dom::activate(rust_task *task) { task->ctx.next = &ctx; DLOG(this, task, "descheduling..."); - scheduler_lock.unlock(); + kernel->scheduler_lock.unlock(); task->ctx.swap(ctx); - scheduler_lock.lock(); + kernel->scheduler_lock.lock(); DLOG(this, task, "task has returned"); } @@ -167,7 +167,7 @@ rust_dom::number_of_live_tasks() { */ void rust_dom::reap_dead_tasks() { - I(this, scheduler_lock.lock_held_by_current_thread()); + I(this, kernel->scheduler_lock.lock_held_by_current_thread()); for (size_t i = 0; i < dead_tasks.length(); ) { rust_task *task = dead_tasks[i]; // Make sure this task isn't still running somewhere else... @@ -266,7 +266,7 @@ rust_dom::log_state() { */ int rust_dom::start_main_loop(int id) { - scheduler_lock.lock(); + kernel->scheduler_lock.lock(); // Make sure someone is watching, to pull us out of infinite loops. // @@ -296,9 +296,9 @@ rust_dom::start_main_loop(int id) { DLOG(this, task, "all tasks are blocked, scheduler id %d yielding ...", id); - scheduler_lock.unlock(); + kernel->scheduler_lock.unlock(); sync::sleep(100); - scheduler_lock.lock(); + kernel->scheduler_lock.lock(); DLOG(this, task, "scheduler resuming ..."); continue; @@ -349,9 +349,9 @@ rust_dom::start_main_loop(int id) { "scheduler yielding ...", dead_tasks.length()); log_state(); - scheduler_lock.unlock(); + kernel->scheduler_lock.unlock(); sync::yield(); - scheduler_lock.lock(); + kernel->scheduler_lock.lock(); } else { drain_incoming_message_queue(true); } @@ -360,28 +360,7 @@ rust_dom::start_main_loop(int id) { DLOG(this, dom, "finished main-loop %d (dom.rval = %d)", id, rval); - scheduler_lock.unlock(); - return rval; -} - -int rust_dom::start_main_loops(int num_threads) -{ - dom_worker *worker = NULL; - - // -1, because this thread will also be a worker. - for(int i = 0; i < num_threads - 1; ++i) { - worker = new dom_worker(i + 1, this); - worker->start(); - threads.push(worker); - } - - start_main_loop(0); - - while(threads.pop(&worker)) { - worker->join(); - delete worker; - } - + kernel->scheduler_lock.unlock(); return rval; } @@ -392,26 +371,14 @@ rust_dom::get_cache() { rust_task * rust_dom::create_task(rust_task *spawner, const char *name) { - //scheduler_lock.lock(); rust_task *task = new (this) rust_task (this, &newborn_tasks, spawner, name); DLOG(this, task, "created task: " PTR ", spawner: %s, name: %s", task, spawner ? spawner->name : "null", name); newborn_tasks.append(task); - //scheduler_lock.unlock(); return task; } -rust_dom::dom_worker::dom_worker(int id, rust_dom *owner) - : id(id), owner(owner) -{ -} - -void rust_dom::dom_worker::run() -{ - owner->start_main_loop(id); -} - // // Local Variables: // mode: C++ diff --git a/src/rt/rust_dom.h b/src/rt/rust_dom.h index 7f9fa7a2901..dfc0960a9ea 100644 --- a/src/rt/rust_dom.h +++ b/src/rt/rust_dom.h @@ -97,24 +97,10 @@ struct rust_dom : public kernel_owned, rc_base rust_task *schedule_task(); int start_main_loop(int id); - int start_main_loops(int num_threads); void log_state(); rust_task *create_task(rust_task *spawner, const char *name); - - class dom_worker : public rust_thread { - int id; - rust_dom *owner; - - public: - dom_worker(int id, rust_dom *owner); - - virtual void run(); - }; - - lock_and_signal scheduler_lock; - array_list threads; }; inline rust_log & diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 9af5f9e2b41..5e495b8c822 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -224,6 +224,37 @@ rust_kernel::signal_kernel_lock() { _kernel_lock.unlock(); } +int rust_kernel::start_task_threads(int num_threads) +{ + rust_task_thread *thread = NULL; + + // -1, because this thread will also be a thread. + for(int i = 0; i < num_threads - 1; ++i) { + thread = new rust_task_thread(i + 1, this); + thread->start(); + threads.push(thread); + } + + dom->start_main_loop(0); + + while(threads.pop(&thread)) { + thread->join(); + delete thread; + } + + return dom->rval; +} + +rust_task_thread::rust_task_thread(int id, rust_kernel *owner) + : id(id), owner(owner) +{ +} + +void rust_task_thread::run() +{ + owner->dom->start_main_loop(id); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 70495d029bc..ee5cf99ef5d 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -34,6 +34,9 @@ public: } }; +class rust_task_thread; + + /** * A global object shared by all thread domains. Most of the data structures * in this class are synchronized since they are accessed from multiple @@ -44,8 +47,6 @@ class rust_kernel : public rust_thread { rust_log _log; rust_srv *_srv; - rust_dom *dom; - /** * Task proxy objects are kernel owned handles to Rust objects. */ @@ -69,7 +70,11 @@ class rust_kernel : public rust_thread { rust_dom *create_domain(const char *name); void destroy_domain(); + array_list threads; + public: + rust_dom *dom; + lock_and_signal scheduler_lock; /** * Message queues are kernel objects and are associated with domains. @@ -105,7 +110,10 @@ public: void *malloc(size_t size); void free(void *mem); + // TODO: this should go away inline rust_dom *get_domain() const { return dom; } + + int start_task_threads(int num_threads); }; inline void *operator new(size_t size, rust_kernel *kernel) { @@ -116,4 +124,15 @@ inline void *operator new(size_t size, rust_kernel &kernel) { return kernel.malloc(size); } + +class rust_task_thread : public rust_thread { + int id; + rust_kernel *owner; + +public: + rust_task_thread(int id, rust_kernel *owner); + + virtual void run(); +}; + #endif /* RUST_KERNEL_H */ diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 21c0f593f9a..325bb560502 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -61,6 +61,7 @@ rust_task::rust_task(rust_dom *dom, rust_task_list *state, gc_alloc_chain(0), dom(dom), cache(NULL), + kernel(dom->kernel), name(name), state(state), cond(NULL), @@ -134,7 +135,7 @@ void task_start_wrapper(spawn_args *a) LOG(task, task, "task exited with value %d", rval); { - scoped_lock with(task->dom->scheduler_lock); + scoped_lock with(task->kernel->scheduler_lock); // FIXME: the old exit glue does some magical argument copying // stuff. This is probably still needed. @@ -158,9 +159,9 @@ rust_task::start(uintptr_t spawnee_fn, LOGPTR(dom, "from spawnee", spawnee_fn); I(dom, stk->data != NULL); - I(dom, !dom->scheduler_lock.lock_held_by_current_thread()); - - scoped_lock with(dom->scheduler_lock); + I(dom, !kernel->scheduler_lock.lock_held_by_current_thread()); + + scoped_lock with(kernel->scheduler_lock); char *sp = (char *)rust_sp; @@ -412,7 +413,7 @@ rust_task::free(void *p, bool is_gc) void rust_task::transition(rust_task_list *src, rust_task_list *dst) { - I(dom, dom->scheduler_lock.lock_held_by_current_thread()); + I(dom, kernel->scheduler_lock.lock_held_by_current_thread()); DLOG(dom, task, "task %s " PTR " state change '%s' -> '%s' while in '%s'", name, (uintptr_t)this, src->name, dst->name, state->name); diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index 3f9a0660300..62a725a98d5 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -22,6 +22,7 @@ rust_task : public maybe_proxy, rust_crate_cache *cache; // Fields known only to the runtime. + rust_kernel *kernel; const char *const name; rust_task_list *state; rust_cond *cond; diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index a769051b7eb..ccb35958a80 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -75,7 +75,7 @@ extern "C" CDECL rust_port* upcall_new_port(rust_task *task, size_t unit_sz) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; - scoped_lock with(dom->scheduler_lock); + scoped_lock with(task->kernel->scheduler_lock); LOG(task, comm, "upcall_new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)", (uintptr_t) task, task->name, unit_sz); return new (dom) rust_port(task, unit_sz); @@ -84,7 +84,7 @@ upcall_new_port(rust_task *task, size_t unit_sz) { extern "C" CDECL void upcall_del_port(rust_task *task, rust_port *port) { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->dom->scheduler_lock); + scoped_lock with(task->kernel->scheduler_lock); LOG(task, comm, "upcall del_port(0x%" PRIxPTR ")", (uintptr_t) port); I(task->dom, !port->ref_count); delete port; @@ -124,7 +124,7 @@ upcall_flush_chan(rust_task *task, rust_chan *chan) { extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->dom->scheduler_lock); + scoped_lock with(task->kernel->scheduler_lock); LOG(task, comm, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); @@ -166,7 +166,7 @@ extern "C" CDECL rust_chan * upcall_clone_chan(rust_task *task, maybe_proxy *target, rust_chan *chan) { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->dom->scheduler_lock); + scoped_lock with(task->kernel->scheduler_lock); size_t unit_sz = chan->buffer.unit_sz; maybe_proxy *port = chan->port; rust_task *target_task = NULL; @@ -208,7 +208,7 @@ upcall_sleep(rust_task *task, size_t time_in_us) { extern "C" CDECL void upcall_send(rust_task *task, rust_chan *chan, void *sptr) { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->dom->scheduler_lock); + scoped_lock with(task->kernel->scheduler_lock); chan->send(sptr); LOG(task, comm, "=== sent data ===>"); } @@ -217,7 +217,7 @@ extern "C" CDECL void upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->dom->scheduler_lock); + scoped_lock with(task->kernel->scheduler_lock); LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR ", size: 0x%" PRIxPTR ", chan_no: %d", @@ -255,7 +255,7 @@ upcall_fail(rust_task *task, extern "C" CDECL void upcall_kill(rust_task *task, maybe_proxy *target) { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->dom->scheduler_lock); + scoped_lock with(task->kernel->scheduler_lock); if (target->is_proxy()) { notify_message:: send(notify_message::KILL, "kill", task->get_handle(), @@ -274,7 +274,7 @@ extern "C" CDECL void upcall_exit(rust_task *task) { { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->dom->scheduler_lock); + scoped_lock with(task->kernel->scheduler_lock); LOG(task, task, "task ref_count: %d", task->ref_count); A(task->dom, task->ref_count >= 0, "Task ref_count should not be negative on exit!"); @@ -287,7 +287,7 @@ upcall_exit(rust_task *task) { extern "C" CDECL uintptr_t upcall_malloc(rust_task *task, size_t nbytes, type_desc *td) { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->dom->scheduler_lock); + scoped_lock with(task->kernel->scheduler_lock); LOG(task, mem, "upcall malloc(%" PRIdPTR ", 0x%" PRIxPTR ")" @@ -308,7 +308,7 @@ upcall_malloc(rust_task *task, size_t nbytes, type_desc *td) { extern "C" CDECL void upcall_free(rust_task *task, void* ptr, uintptr_t is_gc) { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->dom->scheduler_lock); + scoped_lock with(task->kernel->scheduler_lock); rust_dom *dom = task->dom; DLOG(dom, mem, "upcall free(0x%" PRIxPTR ", is_gc=%" PRIdPTR ")", @@ -319,7 +319,7 @@ upcall_free(rust_task *task, void* ptr, uintptr_t is_gc) { extern "C" CDECL uintptr_t upcall_mark(rust_task *task, void* ptr) { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->dom->scheduler_lock); + scoped_lock with(task->kernel->scheduler_lock); rust_dom *dom = task->dom; if (ptr) { @@ -350,7 +350,7 @@ rust_str *make_str(rust_task *task, char const *s, size_t fill) { extern "C" CDECL rust_str * upcall_new_str(rust_task *task, char const *s, size_t fill) { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->dom->scheduler_lock); + scoped_lock with(task->kernel->scheduler_lock); return make_str(task, s, fill); } @@ -358,7 +358,7 @@ upcall_new_str(rust_task *task, char const *s, size_t fill) { extern "C" CDECL rust_str * upcall_dup_str(rust_task *task, rust_str *str) { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->dom->scheduler_lock); + scoped_lock with(task->kernel->scheduler_lock); return make_str(task, (char const *)str->data, str->fill); } @@ -366,7 +366,7 @@ upcall_dup_str(rust_task *task, rust_str *str) { extern "C" CDECL rust_vec * upcall_new_vec(rust_task *task, size_t fill, type_desc *td) { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->dom->scheduler_lock); + scoped_lock with(task->kernel->scheduler_lock); rust_dom *dom = task->dom; DLOG(dom, mem, "upcall new_vec(%" PRIdPTR ")", fill); size_t alloc = next_power_of_two(sizeof(rust_vec) + fill); @@ -471,7 +471,7 @@ upcall_vec_append(rust_task *task, type_desc *t, type_desc *elem_t, rust_vec **dst_ptr, rust_vec *src, bool skip_null) { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->dom->scheduler_lock); + scoped_lock with(task->kernel->scheduler_lock); rust_vec *dst = *dst_ptr; uintptr_t need_copy; size_t n_src_bytes = skip_null ? src->fill - 1 : src->fill; @@ -507,7 +507,7 @@ upcall_get_type_desc(rust_task *task, size_t n_descs, type_desc const **descs) { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->dom->scheduler_lock); + scoped_lock with(task->kernel->scheduler_lock); LOG(task, cache, "upcall get_type_desc with size=%" PRIdPTR ", align=%" PRIdPTR ", %" PRIdPTR " descs", size, align, n_descs); @@ -521,7 +521,7 @@ extern "C" CDECL rust_task * upcall_new_task(rust_task *spawner, rust_vec *name) { // name is a rust string structure. LOG_UPCALL_ENTRY(spawner); - scoped_lock with(spawner->dom->scheduler_lock); + scoped_lock with(spawner->kernel->scheduler_lock); rust_dom *dom = spawner->dom; rust_task *task = dom->create_task(spawner, (const char *)name->data); return task; @@ -563,7 +563,7 @@ extern "C" CDECL void upcall_ivec_resize(rust_task *task, rust_ivec *v, size_t newsz) { - scoped_lock with(task->dom->scheduler_lock); + scoped_lock with(task->kernel->scheduler_lock); I(task->dom, !v->fill); size_t new_alloc = next_power_of_two(newsz); @@ -582,7 +582,7 @@ extern "C" CDECL void upcall_ivec_spill(rust_task *task, rust_ivec *v, size_t newsz) { - scoped_lock with(task->dom->scheduler_lock); + scoped_lock with(task->kernel->scheduler_lock); size_t new_alloc = next_power_of_two(newsz); rust_ivec_heap *heap_part = (rust_ivec_heap *)