diff --git a/src/libcore/task.rs b/src/libcore/task.rs index 070da8ffd4a..d9b5eb15a71 100644 --- a/src/libcore/task.rs +++ b/src/libcore/task.rs @@ -1661,7 +1661,8 @@ extern mod rustrt { fn rust_get_sched_id() -> sched_id; fn rust_new_sched(num_threads: libc::uintptr_t) -> sched_id; - fn sched_threads() -> libc::size_t; + fn rust_max_sched_threads() -> libc::size_t; + fn rust_sched_threads() -> libc::size_t; fn rust_num_threads() -> libc::uintptr_t; fn get_task_id() -> task_id; @@ -2435,10 +2436,36 @@ fn test_sched_thread_per_core() { do spawn_sched(ThreadPerCore) { let cores = rustrt::rust_num_threads(); - let reported_threads = rustrt::sched_threads(); + let reported_threads = rustrt::rust_max_sched_threads(); assert(cores as uint == reported_threads as uint); chan.send(()); } port.recv(); } + +#[test] +fn test_spawn_thread_on_demand() { + let (chan, port) = pipes::stream(); + + do spawn_sched(ManualThreads(2)) { + let max_threads = rustrt::rust_max_sched_threads(); + assert(max_threads as int == 2); + let running_threads = rustrt::rust_sched_threads(); + assert(running_threads as int == 1); + + let (chan2, port2) = pipes::stream(); + + do spawn() { + chan2.send(()); + } + + let running_threads2 = rustrt::rust_sched_threads(); + assert(running_threads2 as int == 2); + + port2.recv(); + chan.send(()); + } + + port.recv(); +} diff --git a/src/libstd/test.rs b/src/libstd/test.rs index 8692a9a440a..ad0003934e4 100644 --- a/src/libstd/test.rs +++ b/src/libstd/test.rs @@ -26,7 +26,7 @@ export run_tests_console; #[abi = "cdecl"] extern mod rustrt { - fn sched_threads() -> libc::size_t; + fn rust_max_sched_threads() -> libc::size_t; } // The name of a test. By convention this follows the rules for rust @@ -327,7 +327,7 @@ const sched_overcommit : uint = 1u; const sched_overcommit : uint = 4u; fn get_concurrency() -> uint { - let threads = rustrt::sched_threads() as uint; + let threads = rustrt::rust_max_sched_threads() as uint; if threads == 1u { 1u } else { threads * sched_overcommit } } diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index a601908359c..f1c2afc0f4b 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -627,11 +627,17 @@ start_task(rust_task *target, fn_env_pair *f) { } extern "C" CDECL size_t -sched_threads() { +rust_sched_threads() { rust_task *task = rust_get_current_task(); return task->sched->number_of_threads(); } +extern "C" CDECL size_t +rust_max_sched_threads() { + rust_task *task = rust_get_current_task(); + return task->sched->max_number_of_threads(); +} + extern "C" CDECL rust_port* rust_port_take(rust_port_id id) { rust_task *task = rust_get_current_task(); diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 698ee866728..669ebd55a7c 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -31,9 +31,10 @@ rust_kernel::rust_kernel(rust_env *env) : // Create the single threaded scheduler that will run on the platform's // main thread - rust_manual_sched_launcher_factory launchfac; - osmain_scheduler = create_scheduler(&launchfac, 1, false); - osmain_driver = launchfac.get_driver(); + rust_manual_sched_launcher_factory *launchfac = + new rust_manual_sched_launcher_factory(); + osmain_scheduler = create_scheduler(launchfac, 1, false); + osmain_driver = launchfac->get_driver(); sched_reaper.start(); } @@ -79,8 +80,9 @@ void rust_kernel::free(void *mem) { rust_sched_id rust_kernel::create_scheduler(size_t num_threads) { - rust_thread_sched_launcher_factory launchfac; - return create_scheduler(&launchfac, num_threads, true); + rust_thread_sched_launcher_factory *launchfac = + new rust_thread_sched_launcher_factory(); + return create_scheduler(launchfac, num_threads, true); } rust_sched_id diff --git a/src/rt/rust_scheduler.cpp b/src/rt/rust_scheduler.cpp index 5dd1a261c0e..aa288cf3b94 100644 --- a/src/rt/rust_scheduler.cpp +++ b/src/rt/rust_scheduler.cpp @@ -6,34 +6,39 @@ #include "rust_sched_launcher.h" rust_scheduler::rust_scheduler(rust_kernel *kernel, - size_t num_threads, + size_t max_num_threads, rust_sched_id id, bool allow_exit, bool killed, rust_sched_launcher_factory *launchfac) : ref_count(1), kernel(kernel), - live_threads(num_threads), + live_threads(0), live_tasks(0), cur_thread(0), may_exit(allow_exit), - num_threads(num_threads), + killed(killed), + launchfac(launchfac), + max_num_threads(max_num_threads), id(id) { - create_task_threads(launchfac, killed); + // Create the first thread + threads.push(create_task_thread(0)); } void rust_scheduler::delete_this() { destroy_task_threads(); + delete launchfac; delete this; } rust_sched_launcher * -rust_scheduler::create_task_thread(rust_sched_launcher_factory *launchfac, - int id, bool killed) { +rust_scheduler::create_task_thread(int id) { + live_threads++; rust_sched_launcher *thread = launchfac->create(this, id, killed); - KLOG(kernel, kern, "created task thread: " PTR ", id: %d", - thread, id); + KLOG(kernel, kern, "created task thread: " PTR + ", id: %d, live_threads: %d", + thread, id, live_threads); return thread; } @@ -43,19 +48,9 @@ rust_scheduler::destroy_task_thread(rust_sched_launcher *thread) { delete thread; } -void -rust_scheduler::create_task_threads(rust_sched_launcher_factory *launchfac, - bool killed) { - KLOG(kernel, kern, "Using %d scheduler threads.", num_threads); - - for(size_t i = 0; i < num_threads; ++i) { - threads.push(create_task_thread(launchfac, i, killed)); - } -} - void rust_scheduler::destroy_task_threads() { - for(size_t i = 0; i < num_threads; ++i) { + for(size_t i = 0; i < threads.size(); ++i) { destroy_task_thread(threads[i]); } } @@ -63,7 +58,7 @@ rust_scheduler::destroy_task_threads() { void rust_scheduler::start_task_threads() { - for(size_t i = 0; i < num_threads; ++i) { + for(size_t i = 0; i < threads.size(); ++i) { rust_sched_launcher *thread = threads[i]; thread->start(); } @@ -72,7 +67,7 @@ rust_scheduler::start_task_threads() void rust_scheduler::join_task_threads() { - for(size_t i = 0; i < num_threads; ++i) { + for(size_t i = 0; i < threads.size(); ++i) { rust_sched_launcher *thread = threads[i]; thread->join(); } @@ -80,7 +75,7 @@ rust_scheduler::join_task_threads() void rust_scheduler::kill_all_tasks() { - for(size_t i = 0; i < num_threads; ++i) { + for(size_t i = 0; i < threads.size(); ++i) { rust_sched_launcher *thread = threads[i]; thread->get_loop()->kill_all_tasks(); } @@ -92,10 +87,29 @@ rust_scheduler::create_task(rust_task *spawner, const char *name) { { scoped_lock with(lock); live_tasks++; - thread_no = cur_thread++; - if (cur_thread >= num_threads) - cur_thread = 0; + + // Find unoccupied thread + for (thread_no = 0; thread_no < threads.size(); ++thread_no) { + if (threads[thread_no]->get_loop()->number_of_live_tasks() == 0) + break; + } + + if (thread_no == threads.size()) { + if (threads.size() < max_num_threads) { + // Else create new thread + thread_no = threads.size(); + rust_sched_launcher *thread = create_task_thread(thread_no); + thread->start(); + threads.push(thread); + } else { + // Or use round robin allocation + thread_no = cur_thread++; + if (cur_thread >= max_num_threads) + cur_thread = 0; + } + } } + KLOG(kernel, kern, "Creating task %s, on thread %d.", name, thread_no); kernel->register_task(); rust_sched_launcher *thread = threads[thread_no]; return thread->get_loop()->create_task(spawner, name); @@ -119,17 +133,22 @@ rust_scheduler::release_task() { void rust_scheduler::exit() { - // Take a copy of num_threads. After the last thread exits this + // Take a copy of the number of threads. After the last thread exits this // scheduler will get destroyed, and our fields will cease to exist. - size_t current_num_threads = num_threads; + size_t current_num_threads = threads.size(); for(size_t i = 0; i < current_num_threads; ++i) { threads[i]->get_loop()->exit(); } } +size_t +rust_scheduler::max_number_of_threads() { + return max_num_threads; +} + size_t rust_scheduler::number_of_threads() { - return num_threads; + return threads.size(); } void diff --git a/src/rt/rust_scheduler.h b/src/rt/rust_scheduler.h index 767ecaf7d1e..019f69f7a31 100644 --- a/src/rt/rust_scheduler.h +++ b/src/rt/rust_scheduler.h @@ -30,19 +30,17 @@ private: uintptr_t live_tasks; size_t cur_thread; bool may_exit; + bool killed; + rust_sched_launcher_factory *launchfac; array_list threads; - const size_t num_threads; + const size_t max_num_threads; rust_sched_id id; - void create_task_threads(rust_sched_launcher_factory *launchfac, - bool killed); void destroy_task_threads(); - rust_sched_launcher * - create_task_thread(rust_sched_launcher_factory *launchfac, int id, - bool killed); + rust_sched_launcher *create_task_thread(int id); void destroy_task_thread(rust_sched_launcher *thread); void exit(); @@ -51,7 +49,7 @@ private: void delete_this(); public: - rust_scheduler(rust_kernel *kernel, size_t num_threads, + rust_scheduler(rust_kernel *kernel, size_t max_num_threads, rust_sched_id id, bool allow_exit, bool killed, rust_sched_launcher_factory *launchfac); @@ -62,6 +60,7 @@ public: void release_task(); + size_t max_number_of_threads(); size_t number_of_threads(); // Called by each thread when it terminates. When all threads // terminate the scheduler does as well. diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index cb2f36fe31b..e0930dbf753 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -30,6 +30,7 @@ rand_new_seeded rand_next rand_seed rust_get_sched_id +rust_max_sched_threads rust_new_sched rust_new_task_in_sched rust_num_threads @@ -48,6 +49,7 @@ rust_port_size rust_process_wait rust_ptr_eq rust_run_program +rust_sched_threads rust_set_exit_status rust_start rust_getcwd @@ -58,7 +60,6 @@ rust_get_task rust_get_stack_segment rust_task_weaken rust_task_unweaken -sched_threads shape_log_str start_task vec_reserve_shared_actual diff --git a/src/test/run-pass/morestack6.rs b/src/test/run-pass/morestack6.rs index 3036d4c201f..b10c18ffd3e 100644 --- a/src/test/run-pass/morestack6.rs +++ b/src/test/run-pass/morestack6.rs @@ -8,7 +8,7 @@ extern mod rustrt { fn last_os_error() -> ~str; fn rust_getcwd() -> ~str; fn get_task_id() -> libc::intptr_t; - fn sched_threads(); + fn rust_max_sched_threads(); fn rust_get_task(); } @@ -16,7 +16,7 @@ fn calllink01() { rustrt::rust_get_sched_id(); } fn calllink02() { rustrt::last_os_error(); } fn calllink03() { rustrt::rust_getcwd(); } fn calllink08() { rustrt::get_task_id(); } -fn calllink09() { rustrt::sched_threads(); } +fn calllink09() { rustrt::rust_max_sched_threads(); } fn calllink10() { rustrt::rust_get_task(); } fn runtest(f: fn~(), frame_backoff: u32) {