From 4bc773465fe95da37b8c867979786b190de6197c Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Mon, 20 Jun 2011 17:19:50 -0700 Subject: [PATCH] Basic multithreading support. The infinite loops test successfully maxes out the CPU. --- src/rt/rust.cpp | 2 +- src/rt/rust_dom.cpp | 85 +++++++++++++++++++++++------ src/rt/rust_dom.h | 16 +++++- src/rt/rust_task.cpp | 29 +++++++--- src/rt/rust_task.h | 6 ++ src/rt/sync/lock_and_signal.cpp | 23 ++++++++ src/rt/sync/lock_and_signal.h | 14 +++++ src/rt/test/rust_test_runtime.cpp | 2 +- src/rt/util/array_list.h | 1 + src/rt/util/indexed_list.h | 1 + src/test/run-pass/infinite-loops.rs | 29 ++++++++++ 11 files changed, 178 insertions(+), 30 deletions(-) create mode 100644 src/test/run-pass/infinite-loops.rs diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index c7cece2ef81..f1666b2e7a7 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -95,7 +95,7 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { dom->root_task->start(main_fn, (uintptr_t)args->args); - int ret = dom->start_main_loop(); + int ret = dom->start_main_loops(8); delete args; kernel->destroy_domain(dom); kernel->join_all_domains(); diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp index 1a5e1463509..7b5467ddeb1 100644 --- a/src/rt/rust_dom.cpp +++ b/src/rt/rust_dom.cpp @@ -47,16 +47,14 @@ rust_dom::~rust_dom() { void rust_dom::activate(rust_task *task) { - curr_task = task; - context ctx; task->ctx.next = &ctx; DLOG(this, task, "descheduling..."); + scheduler_lock.unlock(); task->ctx.swap(ctx); + scheduler_lock.lock(); DLOG(this, task, "task has returned"); - - curr_task = NULL; } void @@ -211,10 +209,13 @@ rust_dom::schedule_task() { // FIXME: in the face of failing tasks, this is not always right. // I(this, n_live_tasks() > 0); if (running_tasks.length() > 0) { - size_t i = rand(&rctx); - i %= running_tasks.length(); - if (running_tasks[i]->yield_timer.has_timed_out()) { - return (rust_task *)running_tasks[i]; + size_t k = rand(&rctx); + // Look around for a runnable task, starting at k. + for(size_t j = 0; j < running_tasks.length(); ++j) { + size_t i = (j + k) % running_tasks.length(); + if (running_tasks[i]->can_schedule()) { + return (rust_task *)running_tasks[i]; + } } } return NULL; @@ -261,15 +262,20 @@ rust_dom::log_state() { * drop to zero. */ int -rust_dom::start_main_loop() { +rust_dom::start_main_loop(int id) { + scheduler_lock.lock(); + // Make sure someone is watching, to pull us out of infinite loops. rust_timer timer(this); - DLOG(this, dom, "started domain loop"); + DLOG(this, dom, "started domain loop %d", id); while (number_of_live_tasks() > 0) { A(this, kernel->is_deadlocked() == false, "deadlock"); + DLOG(this, dom, "worker %d, number_of_live_tasks = %d", + id, number_of_live_tasks()); + drain_incoming_message_queue(true); rust_task *scheduled_task = schedule_task(); @@ -281,8 +287,11 @@ rust_dom::start_main_loop() { if (scheduled_task == NULL) { log_state(); DLOG(this, task, - "all tasks are blocked, scheduler yielding ..."); + "all tasks are blocked, scheduler id %d yielding ...", + id); + scheduler_lock.unlock(); sync::sleep(100); + scheduler_lock.lock(); DLOG(this, task, "scheduler resuming ..."); continue; @@ -303,15 +312,21 @@ rust_dom::start_main_loop() { interrupt_flag = 0; + DLOG(this, task, + "Running task %p on worker %d", + scheduled_task, id); + scheduled_task->active = true; activate(scheduled_task); + scheduled_task->active = false; DLOG(this, task, - "returned from task %s @0x%" PRIxPTR - " in state '%s', sp=0x%" PRIxPTR, - scheduled_task->name, - (uintptr_t)scheduled_task, - scheduled_task->state->name, - scheduled_task->rust_sp); + "returned from task %s @0x%" PRIxPTR + " in state '%s', sp=0x%, worker id=%d" PRIxPTR, + scheduled_task->name, + (uintptr_t)scheduled_task, + scheduled_task->state->name, + scheduled_task->rust_sp, + id); /* // These invariants are no longer valid, as rust_sp is not @@ -341,10 +356,32 @@ rust_dom::start_main_loop() { reap_dead_tasks(); } - DLOG(this, dom, "finished main-loop (dom.rval = %d)", rval); + DLOG(this, dom, "finished main-loop %d (dom.rval = %d)", id, rval); + + scheduler_lock.unlock(); return rval; } +int rust_dom::start_main_loops(int num_threads) +{ + dom_worker *worker = NULL; + + // -1, because this thread will also be a worker. + for(int i = 0; i < num_threads - 1; ++i) { + worker = new dom_worker(i + 1, this); + worker->start(); + threads.push(worker); + } + + start_main_loop(0); + + while(threads.pop(&worker)) { + worker->join(); + delete worker; + } + + return rval; +} rust_crate_cache * rust_dom::get_cache() { @@ -353,14 +390,26 @@ rust_dom::get_cache() { rust_task * rust_dom::create_task(rust_task *spawner, const char *name) { + scheduler_lock.lock(); rust_task *task = new (this) rust_task (this, &newborn_tasks, spawner, name); DLOG(this, task, "created task: " PTR ", spawner: %s, name: %s", task, spawner ? spawner->name : "null", name); newborn_tasks.append(task); + scheduler_lock.unlock(); return task; } +rust_dom::dom_worker::dom_worker(int id, rust_dom *owner) + : id(id), owner(owner) +{ +} + +void rust_dom::dom_worker::run() +{ + owner->start_main_loop(id); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_dom.h b/src/rt/rust_dom.h index 2b88a2c713f..7f9fa7a2901 100644 --- a/src/rt/rust_dom.h +++ b/src/rt/rust_dom.h @@ -96,11 +96,25 @@ struct rust_dom : public kernel_owned, rc_base void reap_dead_tasks(); rust_task *schedule_task(); - int start_main_loop(); + int start_main_loop(int id); + int start_main_loops(int num_threads); void log_state(); rust_task *create_task(rust_task *spawner, const char *name); + + class dom_worker : public rust_thread { + int id; + rust_dom *owner; + + public: + dom_worker(int id, rust_dom *owner); + + virtual void run(); + }; + + lock_and_signal scheduler_lock; + array_list threads; }; inline rust_log & diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index bb8261bb78a..f42f40510bd 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -70,7 +70,8 @@ rust_task::rust_task(rust_dom *dom, rust_task_list *state, list_index(-1), rendezvous_ptr(0), alarm(this), - handle(NULL) + handle(NULL), + active(false) { LOGPTR(dom, "new task", (uintptr_t)this); DLOG(dom, task, "sizeof(task) = %d (0x%x)", sizeof *this, sizeof *this); @@ -123,17 +124,12 @@ struct spawn_args { uintptr_t, uintptr_t); }; -// TODO: rewrite this in LLVM assembly so we can be sure the calling -// conventions will match. extern "C" CDECL void task_start_wrapper(spawn_args *a) { rust_task *task = a->task; int rval = 42; - // This is used by the context switching code. LLVM generates fastcall - // functions, but ucontext needs cdecl functions. This massages the - // calling conventions into the right form. a->f(&rval, task, a->a3, a->a4); LOG(task, task, "task exited with value %d", rval); @@ -174,7 +170,10 @@ rust_task::start(uintptr_t spawnee_fn, ctx.call((void *)task_start_wrapper, a, sp); yield_timer.reset(0); - transition(&dom->newborn_tasks, &dom->running_tasks); + { + scoped_lock sync(dom->scheduler_lock); + transition(&dom->newborn_tasks, &dom->running_tasks); + } } void @@ -425,7 +424,10 @@ rust_task::block(rust_cond *on, const char* name) { A(dom, cond == NULL, "Cannot block an already blocked task."); A(dom, on != NULL, "Cannot block on a NULL object."); - transition(&dom->running_tasks, &dom->blocked_tasks); + { + scoped_lock sync(dom->scheduler_lock); + transition(&dom->running_tasks, &dom->blocked_tasks); + } cond = on; cond_name = name; } @@ -437,7 +439,10 @@ rust_task::wakeup(rust_cond *from) { (uintptr_t) cond, (uintptr_t) from); A(dom, cond == from, "Cannot wake up blocked task on wrong condition."); - transition(&dom->blocked_tasks, &dom->running_tasks); + { + scoped_lock sync(dom->scheduler_lock); + transition(&dom->blocked_tasks, &dom->running_tasks); + } I(dom, cond == from); cond = NULL; cond_name = "none"; @@ -445,6 +450,7 @@ rust_task::wakeup(rust_cond *from) { void rust_task::die() { + scoped_lock sync(dom->scheduler_lock); transition(&dom->running_tasks, &dom->dead_tasks); } @@ -482,6 +488,11 @@ rust_task::get_handle() { return handle; } +bool rust_task::can_schedule() +{ + return yield_timer.has_timed_out() && !active; +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index a022a348667..db3c0367adb 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -50,6 +50,10 @@ rust_task : public maybe_proxy, rust_handle *handle; context ctx; + + // This flag indicates that a worker is either currently running the task + // or is about to run this task. + bool active; // Only a pointer to 'name' is kept, so it must live as long as this task. rust_task(rust_dom *dom, @@ -111,6 +115,8 @@ rust_task : public maybe_proxy, frame_glue_fns *get_frame_glue_fns(uintptr_t fp); rust_crate_cache * get_crate_cache(); + + bool can_schedule(); }; // diff --git a/src/rt/sync/lock_and_signal.cpp b/src/rt/sync/lock_and_signal.cpp index 4f262285826..337a1efa4bc 100644 --- a/src/rt/sync/lock_and_signal.cpp +++ b/src/rt/sync/lock_and_signal.cpp @@ -41,6 +41,8 @@ void lock_and_signal::lock() { EnterCriticalSection(&_cs); #else CHECKED(pthread_mutex_lock(&_mutex)); + _holding_thread = pthread_self(); + _locked = true; #endif } @@ -48,6 +50,7 @@ void lock_and_signal::unlock() { #if defined(__WIN32__) LeaveCriticalSection(&_cs); #else + _locked = false; CHECKED(pthread_mutex_unlock(&_mutex)); #endif } @@ -100,6 +103,26 @@ void lock_and_signal::signal_all() { #endif } +bool lock_and_signal::lock_held_by_current_thread() +{ +#if defined(__WIN32__) + // TODO: implement this functionality for win32. + return false; +#else + return _locked && _holding_thread == pthread_self(); +#endif +} + +scoped_lock::scoped_lock(lock_and_signal &lock) + : lock(lock) +{ + lock.lock(); +} + +scoped_lock::~scoped_lock() +{ + lock.unlock(); +} // // Local Variables: diff --git a/src/rt/sync/lock_and_signal.h b/src/rt/sync/lock_and_signal.h index 1f1cfb4124b..2794027d5c7 100644 --- a/src/rt/sync/lock_and_signal.h +++ b/src/rt/sync/lock_and_signal.h @@ -1,3 +1,4 @@ +// -*- c++ -*- #ifndef LOCK_AND_SIGNAL_H #define LOCK_AND_SIGNAL_H @@ -8,6 +9,9 @@ class lock_and_signal { #else pthread_cond_t _cond; pthread_mutex_t _mutex; + + pthread_t _holding_thread; + bool _locked; #endif public: lock_and_signal(); @@ -19,6 +23,16 @@ public: void timed_wait(size_t timeout_in_ns); void signal(); void signal_all(); + + bool lock_held_by_current_thread(); +}; + +class scoped_lock { + lock_and_signal &lock; + +public: + scoped_lock(lock_and_signal &lock); + ~scoped_lock(); }; #endif /* LOCK_AND_SIGNAL_H */ diff --git a/src/rt/test/rust_test_runtime.cpp b/src/rt/test/rust_test_runtime.cpp index 0fed35dad82..18a957edd60 100644 --- a/src/rt/test/rust_test_runtime.cpp +++ b/src/rt/test/rust_test_runtime.cpp @@ -53,7 +53,7 @@ rust_task_test::worker::run() { kernel->create_domain("test"); rust_dom *domain = handle->referent(); domain->root_task->start((uintptr_t)&task_entry, (uintptr_t)NULL); - domain->start_main_loop(); + domain->start_main_loop(0); kernel->destroy_domain(domain); } diff --git a/src/rt/util/array_list.h b/src/rt/util/array_list.h index 9ad4b208041..495594cf7e6 100644 --- a/src/rt/util/array_list.h +++ b/src/rt/util/array_list.h @@ -1,3 +1,4 @@ +// -*- c++ -*- #ifndef ARRAY_LIST_H #define ARRAY_LIST_H diff --git a/src/rt/util/indexed_list.h b/src/rt/util/indexed_list.h index 173e9ede2ba..0c36604328f 100644 --- a/src/rt/util/indexed_list.h +++ b/src/rt/util/indexed_list.h @@ -1,3 +1,4 @@ +// -*- c++ -*- #ifndef INDEXED_LIST_H #define INDEXED_LIST_H diff --git a/src/test/run-pass/infinite-loops.rs b/src/test/run-pass/infinite-loops.rs new file mode 100644 index 00000000000..be9fed9ea1c --- /dev/null +++ b/src/test/run-pass/infinite-loops.rs @@ -0,0 +1,29 @@ +/* + A simple way to make sure threading works. This should use all the + CPU cycles an any machines that we're likely to see for a while. +*/ + +// xfail-stage0 +// xfail-stage1 +// xfail-stage2 +// xfail-stage3 + +use std; +import std::task::join; + +fn loop(int n) { + let task t1; + let task t2; + + if(n > 0) { + t1 = spawn loop(n - 1); + t2 = spawn loop(n - 1); + } + + while(true) {} +} + +fn main() { + let task t = spawn loop(5); + join(t); +}