// Copyright 2013 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. use either::{Left, Right}; use option::{Option, Some, None}; use cast::{transmute, transmute_mut_region, transmute_mut_unsafe}; use clone::Clone; use unstable::raw; use super::sleeper_list::SleeperList; use super::work_queue::WorkQueue; use super::stack::{StackPool}; use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject}; use super::context::Context; use super::task::{Task, AnySched, Sched}; use super::message_queue::MessageQueue; use rt::kill::BlockedTask; use rt::local_ptr; use rt::local::Local; use rt::rtio::RemoteCallback; use rt::metrics::SchedMetrics; use borrow::{to_uint}; use cell::Cell; use rand::{XorShiftRng, RngUtil}; use iterator::{range}; use vec::{OwnedVector}; use rt::uv::idle::IdleWatcher; /// The Scheduler is responsible for coordinating execution of Coroutines /// on a single thread. When the scheduler is running it is owned by /// thread local storage and the running task is owned by the /// scheduler. /// /// XXX: This creates too many callbacks to run_sched_once, resulting /// in too much allocation and too many events. pub struct Scheduler { /// There are N work queues, one per scheduler. priv work_queue: WorkQueue<~Task>, /// Work queues for the other schedulers. These are created by /// cloning the core work queues. work_queues: ~[WorkQueue<~Task>], /// The queue of incoming messages from other schedulers. /// These are enqueued by SchedHandles after which a remote callback /// is triggered to handle the message. priv message_queue: MessageQueue, /// A shared list of sleeping schedulers. We'll use this to wake /// up schedulers when pushing work onto the work queue. sleeper_list: SleeperList, /// Indicates that we have previously pushed a handle onto the /// SleeperList but have not yet received the Wake message. /// Being `true` does not necessarily mean that the scheduler is /// not active since there are multiple event sources that may /// wake the scheduler. It just prevents the scheduler from pushing /// multiple handles onto the sleeper list. priv sleepy: bool, /// A flag to indicate we've received the shutdown message and should /// no longer try to go to sleep, but exit instead. no_sleep: bool, stack_pool: StackPool, /// The event loop used to drive the scheduler and perform I/O event_loop: ~EventLoopObject, /// The scheduler runs on a special task. sched_task: Option<~Task>, /// An action performed after a context switch on behalf of the /// code running before the context switch priv cleanup_job: Option, metrics: SchedMetrics, /// Should this scheduler run any task, or only pinned tasks? run_anything: bool, /// If the scheduler shouldn't run some tasks, a friend to send /// them to. friend_handle: Option, /// A fast XorShift rng for scheduler use rng: XorShiftRng, /// An IdleWatcher idle_watcher: IdleWatcher, /// A flag to indicate whether or not the idle callback is active. idle_flag: bool } pub struct SchedHandle { priv remote: ~RemoteCallbackObject, priv queue: MessageQueue, sched_id: uint } pub enum SchedMessage { Wake, Shutdown, PinnedTask(~Task), TaskFromFriend(~Task) } enum CleanupJob { DoNothing, GiveTask(~Task, UnsafeTaskReceiver) } impl Scheduler { pub fn sched_id(&self) -> uint { to_uint(self) } pub fn new(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Task>, work_queues: ~[WorkQueue<~Task>], sleeper_list: SleeperList) -> Scheduler { Scheduler::new_special(event_loop, work_queue, work_queues, sleeper_list, true, None) } // When you create a scheduler it isn't yet "in" a task, so the // task field is None. pub fn new_special(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Task>, work_queues: ~[WorkQueue<~Task>], sleeper_list: SleeperList, run_anything: bool, friend: Option) -> Scheduler { let mut event_loop = event_loop; let idle_watcher = IdleWatcher::new(event_loop.uvio.uv_loop()); Scheduler { sleeper_list: sleeper_list, message_queue: MessageQueue::new(), sleepy: false, no_sleep: false, event_loop: event_loop, work_queue: work_queue, work_queues: work_queues, stack_pool: StackPool::new(), sched_task: None, cleanup_job: None, metrics: SchedMetrics::new(), run_anything: run_anything, friend_handle: friend, rng: XorShiftRng::new(), idle_watcher: idle_watcher, idle_flag: true } } // XXX: This may eventually need to be refactored so that // the scheduler itself doesn't have to call event_loop.run. // That will be important for embedding the runtime into external // event loops. // Take a main task to run, and a scheduler to run it in. Create a // scheduler task and bootstrap into it. pub fn bootstrap(~self, task: ~Task) { let mut this = self; // Initialize the TLS key. local_ptr::init_tls_key(); // Create a task for the scheduler with an empty context. let sched_task = ~Task::new_sched_task(); // Now that we have an empty task struct for the scheduler // task, put it in TLS. Local::put::(sched_task); // Before starting our first task, make sure the idle callback // is active. As we do not start in the sleep state this is // important. do this.idle_watcher.start |_idle_watcher, _status| { Scheduler::run_sched_once(); } // Now, as far as all the scheduler state is concerned, we are // inside the "scheduler" context. So we can act like the // scheduler and resume the provided task. this.resume_task_immediately(task); // Now we are back in the scheduler context, having // successfully run the input task. Start by running the // scheduler. Grab it out of TLS - performing the scheduler // action will have given it away. let sched = Local::take::(); rtdebug!("starting scheduler %u", sched.sched_id()); sched.run(); // Now that we are done with the scheduler, clean up the // scheduler task. Do so by removing it from TLS and manually // cleaning up the memory it uses. As we didn't actually call // task.run() on the scheduler task we never get through all // the cleanup code it runs. let mut stask = Local::take::(); rtdebug!("stopping scheduler %u", stask.sched.get_ref().sched_id()); // Should not have any messages let message = stask.sched.get_mut_ref().message_queue.pop(); assert!(message.is_none()); stask.destroyed = true; } // This does not return a scheduler, as the scheduler is placed // inside the task. pub fn run(~self) { let mut self_sched = self; // Always run through the scheduler loop at least once so that // we enter the sleep state and can then be woken up by other // schedulers. // self_sched.event_loop.callback(Scheduler::run_sched_once); // This is unsafe because we need to place the scheduler, with // the event_loop inside, inside our task. But we still need a // mutable reference to the event_loop to give it the "run" // command. unsafe { let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop; // Our scheduler must be in the task before the event loop // is started. let self_sched = Cell::new(self_sched); do Local::borrow:: |stask| { stask.sched = Some(self_sched.take()); }; (*event_loop).run(); } } // One iteration of the scheduler loop, always run at least once. // The model for this function is that you continue through it // until you either use the scheduler while performing a schedule // action, in which case you give it away and do not return, or // you reach the end and sleep. In the case that a scheduler // action is performed the loop is evented such that this function // is called again. fn run_sched_once() { // When we reach the scheduler context via the event loop we // already have a scheduler stored in our local task, so we // start off by taking it. This is the only path through the // scheduler where we get the scheduler this way. let mut sched = Local::take::(); // Assume that we need to continue idling unless we reach the // end of this function without performing an action. sched.activate_idle(); // Our first task is to read mail to see if we have important // messages. // 1) A wake message is easy, mutate sched struct and return // it. // 2) A shutdown is also easy, shutdown. // 3) A pinned task - we resume immediately and do not return // here. // 4) A message from another scheduler with a non-homed task // to run here. let result = sched.interpret_message_queue(); let sched = match result { Some(sched) => { // We did not resume a task, so we returned. sched } None => { return; } }; // Second activity is to try resuming a task from the queue. let result = sched.do_work(); let mut sched = match result { Some(sched) => { // Failed to dequeue a task, so we return. sched } None => { return; } }; // If we got here then there was no work to do. // Generate a SchedHandle and push it to the sleeper list so // somebody can wake us up later. sched.metrics.wasted_turns += 1; if !sched.sleepy && !sched.no_sleep { rtdebug!("scheduler has no work to do, going to sleep"); sched.metrics.sleepy_times += 1; sched.sleepy = true; let handle = sched.make_handle(); sched.sleeper_list.push(handle); // Since we are sleeping, deactivate the idle callback. sched.pause_idle(); } else { rtdebug!("not sleeping, already doing so or no_sleep set"); // We may not be sleeping, but we still need to deactivate // the idle callback. sched.pause_idle(); } // Finished a cycle without using the Scheduler. Place it back // in TLS. Local::put(sched); } fn activate_idle(&mut self) { if self.idle_flag { rtdebug!("idle flag already set, not reactivating idle watcher"); } else { rtdebug!("idle flag was false, reactivating idle watcher"); self.idle_flag = true; self.idle_watcher.restart(); } } fn pause_idle(&mut self) { if !self.idle_flag { rtdebug!("idle flag false, not stopping idle watcher"); } else { rtdebug!("idle flag true, stopping idle watcher"); self.idle_flag = false; self.idle_watcher.stop(); } } pub fn make_handle(&mut self) -> SchedHandle { let remote = self.event_loop.remote_callback(Scheduler::run_sched_once); return SchedHandle { remote: remote, queue: self.message_queue.clone(), sched_id: self.sched_id() }; } /// Schedule a task to be executed later. /// /// Pushes the task onto the work stealing queue and tells the /// event loop to run it later. Always use this instead of pushing /// to the work queue directly. pub fn enqueue_task(&mut self, task: ~Task) { let this = self; // We push the task onto our local queue clone. this.work_queue.push(task); // this.event_loop.callback(Scheduler::run_sched_once); // We've made work available. Notify a // sleeping scheduler. // XXX: perf. Check for a sleeper without // synchronizing memory. It's not critical // that we always find it. // XXX: perf. If there's a sleeper then we // might as well just send it the task // directly instead of pushing it to the // queue. That is essentially the intent here // and it is less work. match this.sleeper_list.pop() { Some(handle) => { let mut handle = handle; handle.send(Wake) } None => { (/* pass */) } }; } /// As enqueue_task, but with the possibility for the blocked task to /// already have been killed. pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) { do blocked_task.wake().map_move |task| { self.enqueue_task(task); }; } // * Scheduler-context operations // This function returns None if the scheduler is "used", or it // returns the still-available scheduler. Note: currently // considers *any* message receive a use and returns None. fn interpret_message_queue(~self) -> Option<~Scheduler> { let mut this = self; match this.message_queue.pop() { Some(PinnedTask(task)) => { // this.event_loop.callback(Scheduler::run_sched_once); let mut task = task; task.give_home(Sched(this.make_handle())); this.resume_task_immediately(task); return None; } Some(TaskFromFriend(task)) => { // this.event_loop.callback(Scheduler::run_sched_once); rtdebug!("got a task from a friend. lovely!"); this.sched_schedule_task(task).map_move(Local::put); return None; } Some(Wake) => { // this.event_loop.callback(Scheduler::run_sched_once); this.sleepy = false; Local::put(this); return None; // return Some(this); } Some(Shutdown) => { // this.event_loop.callback(Scheduler::run_sched_once); if this.sleepy { // There may be an outstanding handle on the // sleeper list. Pop them all to make sure that's // not the case. loop { match this.sleeper_list.pop() { Some(handle) => { let mut handle = handle; handle.send(Wake); } None => break } } } // No more sleeping. After there are no outstanding // event loop references we will shut down. this.no_sleep = true; this.sleepy = false; Local::put(this); return None; // return Some(this); } None => { return Some(this); } } } /// Given an input Coroutine sends it back to its home scheduler. fn send_task_home(task: ~Task) { let mut task = task; let mut home = task.take_unwrap_home(); match home { Sched(ref mut home_handle) => { home_handle.send(PinnedTask(task)); } AnySched => { rtabort!("error: cannot send anysched task home"); } } } /// Take a non-homed task we aren't allowed to run here and send /// it to the designated friend scheduler to execute. fn send_to_friend(&mut self, task: ~Task) { rtdebug!("sending a task to friend"); match self.friend_handle { Some(ref mut handle) => { handle.send(TaskFromFriend(task)); } None => { rtabort!("tried to send task to a friend but scheduler has no friends"); } } } // Workstealing: In this iteration of the runtime each scheduler // thread has a distinct work queue. When no work is available // locally, make a few attempts to steal work from the queues of // other scheduler threads. If a few steals fail we end up in the // old "no work" path which is fine. // First step in the process is to find a task. This function does // that by first checking the local queue, and if there is no work // there, trying to steal from the remote work queues. fn find_work(&mut self) -> Option<~Task> { rtdebug!("scheduler looking for work"); match self.work_queue.pop() { Some(task) => { rtdebug!("found a task locally"); return Some(task) } None => { // Our naive stealing, try kinda hard. rtdebug!("scheduler trying to steal"); let _len = self.work_queues.len(); return self.try_steals(2); } } } // With no backoff try stealing n times from the queues the // scheduler knows about. This naive implementation can steal from // our own queue or from other special schedulers. fn try_steals(&mut self, n: uint) -> Option<~Task> { for _ in range(0, n) { let index = self.rng.gen_uint_range(0, self.work_queues.len()); let work_queues = &mut self.work_queues; match work_queues[index].steal() { Some(task) => { rtdebug!("found task by stealing"); return Some(task) } None => () } }; rtdebug!("giving up on stealing"); return None; } // Given a task, execute it correctly. fn process_task(~self, task: ~Task) -> Option<~Scheduler> { let mut this = self; let mut task = task; rtdebug!("processing a task"); let home = task.take_unwrap_home(); match home { Sched(home_handle) => { if home_handle.sched_id != this.sched_id() { rtdebug!("sending task home"); task.give_home(Sched(home_handle)); Scheduler::send_task_home(task); return Some(this); } else { rtdebug!("running task here"); task.give_home(Sched(home_handle)); this.resume_task_immediately(task); return None; } } AnySched if this.run_anything => { rtdebug!("running anysched task here"); task.give_home(AnySched); this.resume_task_immediately(task); return None; } AnySched => { rtdebug!("sending task to friend"); task.give_home(AnySched); this.send_to_friend(task); return Some(this); } } } // Bundle the helpers together. fn do_work(~self) -> Option<~Scheduler> { let mut this = self; rtdebug!("scheduler calling do work"); match this.find_work() { Some(task) => { rtdebug!("found some work! processing the task"); return this.process_task(task); } None => { rtdebug!("no work was found, returning the scheduler struct"); return Some(this); } } } /// Called by a running task to end execution, after which it will /// be recycled by the scheduler for reuse in a new task. pub fn terminate_current_task(~self) { // Similar to deschedule running task and then, but cannot go through // the task-blocking path. The task is already dying. let mut this = self; let stask = this.sched_task.take_unwrap(); do this.change_task_context(stask) |sched, mut dead_task| { let coroutine = dead_task.coroutine.take_unwrap(); coroutine.recycle(&mut sched.stack_pool); } } // Scheduling a task requires a few checks to make sure the task // ends up in the appropriate location. The run_anything flag on // the scheduler and the home on the task need to be checked. This // helper performs that check. It takes a function that specifies // how to queue the the provided task if that is the correct // action. This is a "core" function that requires handling the // returned Option correctly. pub fn schedule_task(~self, task: ~Task, schedule_fn: ~fn(sched: ~Scheduler, task: ~Task)) -> Option<~Scheduler> { // is the task home? let is_home = task.is_home_no_tls(&self); // does the task have a home? let homed = task.homed(); let mut this = self; if is_home || (!homed && this.run_anything) { // here we know we are home, execute now OR we know we // aren't homed, and that this sched doesn't care rtdebug!("task: %u is on ok sched, executing", to_uint(task)); schedule_fn(this, task); return None; } else if !homed && !this.run_anything { // the task isn't homed, but it can't be run here this.send_to_friend(task); return Some(this); } else { // task isn't home, so don't run it here, send it home Scheduler::send_task_home(task); return Some(this); } } // There are two contexts in which schedule_task can be called: // inside the scheduler, and inside a task. These contexts handle // executing the task slightly differently. In the scheduler // context case we want to receive the scheduler as an input, and // manually deal with the option. In the task context case we want // to use TLS to find the scheduler, and deal with the option // inside the helper. pub fn sched_schedule_task(~self, task: ~Task) -> Option<~Scheduler> { do self.schedule_task(task) |sched, next_task| { sched.resume_task_immediately(next_task); } } // Task context case - use TLS. pub fn run_task(task: ~Task) { let sched = Local::take::(); let opt = do sched.schedule_task(task) |sched, next_task| { do sched.switch_running_tasks_and_then(next_task) |sched, last_task| { sched.enqueue_blocked_task(last_task); } }; opt.map_move(Local::put); } // The primary function for changing contexts. In the current // design the scheduler is just a slightly modified GreenTask, so // all context swaps are from Task to Task. The only difference // between the various cases is where the inputs come from, and // what is done with the resulting task. That is specified by the // cleanup function f, which takes the scheduler and the // old task as inputs. pub fn change_task_context(~self, next_task: ~Task, f: &fn(&mut Scheduler, ~Task)) { let mut this = self; // The current task is grabbed from TLS, not taken as an input. let current_task: ~Task = Local::take::(); // Check that the task is not in an atomically() section (e.g., // holding a pthread mutex, which could deadlock the scheduler). current_task.death.assert_may_sleep(); // These transmutes do something fishy with a closure. let f_fake_region = unsafe { transmute::<&fn(&mut Scheduler, ~Task), &fn(&mut Scheduler, ~Task)>(f) }; let f_opaque = ClosureConverter::from_fn(f_fake_region); // The current task is placed inside an enum with the cleanup // function. This enum is then placed inside the scheduler. this.enqueue_cleanup_job(GiveTask(current_task, f_opaque)); // The scheduler is then placed inside the next task. let mut next_task = next_task; next_task.sched = Some(this); // However we still need an internal mutable pointer to the // original task. The strategy here was "arrange memory, then // get pointers", so we crawl back up the chain using // transmute to eliminate borrowck errors. unsafe { let sched: &mut Scheduler = transmute_mut_region(*next_task.sched.get_mut_ref()); let current_task: &mut Task = match sched.cleanup_job { Some(GiveTask(ref task, _)) => { transmute_mut_region(*transmute_mut_unsafe(task)) } Some(DoNothing) => { rtabort!("no next task"); } None => { rtabort!("no cleanup job"); } }; let (current_task_context, next_task_context) = Scheduler::get_contexts(current_task, next_task); // Done with everything - put the next task in TLS. This // works because due to transmute the borrow checker // believes that we have no internal pointers to // next_task. Local::put(next_task); // The raw context swap operation. The next action taken // will be running the cleanup job from the context of the // next task. Context::swap(current_task_context, next_task_context); } // When the context swaps back to this task we immediately // run the cleanup job, as expected by the previously called // swap_contexts function. unsafe { let sched = Local::unsafe_borrow::(); (*sched).run_cleanup_job(); // Must happen after running the cleanup job (of course). let task = Local::unsafe_borrow::(); (*task).death.check_killed((*task).unwinder.unwinding); } } // Old API for task manipulation implemented over the new core // function. pub fn resume_task_immediately(~self, task: ~Task) { do self.change_task_context(task) |sched, stask| { sched.sched_task = Some(stask); } } pub fn resume_blocked_task_immediately(~self, blocked_task: BlockedTask) { match blocked_task.wake() { Some(task) => self.resume_task_immediately(task), None => Local::put(self), }; } /// Block a running task, context switch to the scheduler, then pass the /// blocked task to a closure. /// /// # Safety note /// /// The closure here is a *stack* closure that lives in the /// running task. It gets transmuted to the scheduler's lifetime /// and called while the task is blocked. /// /// This passes a Scheduler pointer to the fn after the context switch /// in order to prevent that fn from performing further scheduling operations. /// Doing further scheduling could easily result in infinite recursion. pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, BlockedTask)) { // Trickier - we need to get the scheduler task out of self // and use it as the destination. let mut this = self; let stask = this.sched_task.take_unwrap(); // Otherwise this is the same as below. this.switch_running_tasks_and_then(stask, f); } pub fn switch_running_tasks_and_then(~self, next_task: ~Task, f: &fn(&mut Scheduler, BlockedTask)) { // This is where we convert the BlockedTask-taking closure into one // that takes just a Task, and is aware of the block-or-killed protocol. do self.change_task_context(next_task) |sched, task| { // Task might need to receive a kill signal instead of blocking. // We can call the "and_then" only if it blocks successfully. match BlockedTask::try_block(task) { Left(killed_task) => sched.enqueue_task(killed_task), Right(blocked_task) => f(sched, blocked_task), } } } // A helper that looks up the scheduler and runs a task later by // enqueuing it. pub fn run_task_later(next_task: ~Task) { // We aren't performing a scheduler operation, so we want to // put the Scheduler back when we finish. let next_task = Cell::new(next_task); do Local::borrow:: |sched| { sched.enqueue_task(next_task.take()); }; } // Returns a mutable reference to both contexts involved in this // swap. This is unsafe - we are getting mutable internal // references to keep even when we don't own the tasks. It looks // kinda safe because we are doing transmutes before passing in // the arguments. pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) -> (&'a mut Context, &'a mut Context) { let current_task_context = &mut current_task.coroutine.get_mut_ref().saved_context; let next_task_context = &mut next_task.coroutine.get_mut_ref().saved_context; unsafe { (transmute_mut_region(current_task_context), transmute_mut_region(next_task_context)) } } pub fn enqueue_cleanup_job(&mut self, job: CleanupJob) { self.cleanup_job = Some(job); } pub fn run_cleanup_job(&mut self) { rtdebug!("running cleanup job"); let cleanup_job = self.cleanup_job.take_unwrap(); match cleanup_job { DoNothing => { } GiveTask(task, f) => f.to_fn()(self, task) } } } // The cases for the below function. enum ResumeAction { SendHome, Requeue, ResumeNow, Homeless } impl SchedHandle { pub fn send(&mut self, msg: SchedMessage) { self.queue.push(msg); self.remote.fire(); } } // XXX: Some hacks to put a &fn in Scheduler without borrowck // complaining type UnsafeTaskReceiver = raw::Closure; trait ClosureConverter { fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self; fn to_fn(self) -> &fn(&mut Scheduler, ~Task); } impl ClosureConverter for UnsafeTaskReceiver { fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } } } #[cfg(test)] mod test { extern mod extra; use prelude::*; use rt::test::*; use unstable::run_in_bare_thread; use borrow::to_uint; use rt::local::*; use rt::sched::{Scheduler}; use cell::Cell; use rt::thread::Thread; use rt::task::{Task, Sched}; use option::{Some}; #[test] fn trivial_run_in_newsched_task_test() { let mut task_ran = false; let task_ran_ptr: *mut bool = &mut task_ran; do run_in_newsched_task || { unsafe { *task_ran_ptr = true }; rtdebug!("executed from the new scheduler") } assert!(task_ran); } #[test] fn multiple_task_test() { let total = 10; let mut task_run_count = 0; let task_run_count_ptr: *mut uint = &mut task_run_count; do run_in_newsched_task || { for _ in range(0u, total) { do spawntask || { unsafe { *task_run_count_ptr = *task_run_count_ptr + 1}; } } } assert!(task_run_count == total); } #[test] fn multiple_task_nested_test() { let mut task_run_count = 0; let task_run_count_ptr: *mut uint = &mut task_run_count; do run_in_newsched_task || { do spawntask || { unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; do spawntask || { unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; do spawntask || { unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; } } } } assert!(task_run_count == 3); } // Confirm that a sched_id actually is the uint form of the // pointer to the scheduler struct. #[test] fn simple_sched_id_test() { do run_in_bare_thread { let sched = ~new_test_uv_sched(); assert!(to_uint(sched) == sched.sched_id()); } } // Compare two scheduler ids that are different, this should never // fail but may catch a mistake someday. #[test] fn compare_sched_id_test() { do run_in_bare_thread { let sched_one = ~new_test_uv_sched(); let sched_two = ~new_test_uv_sched(); assert!(sched_one.sched_id() != sched_two.sched_id()); } } // A very simple test that confirms that a task executing on the // home scheduler notices that it is home. #[test] fn test_home_sched() { do run_in_bare_thread { let mut task_ran = false; let task_ran_ptr: *mut bool = &mut task_ran; let mut sched = ~new_test_uv_sched(); let sched_handle = sched.make_handle(); let mut task = ~do Task::new_root_homed(&mut sched.stack_pool, None, Sched(sched_handle)) { unsafe { *task_ran_ptr = true }; assert!(Task::on_appropriate_sched()); }; let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); task.death.on_exit = Some(on_exit); sched.bootstrap(task); } } // An advanced test that checks all four possible states that a // (task,sched) can be in regarding homes. #[test] fn test_schedule_home_states() { use rt::uv::uvio::UvEventLoop; use rt::sleeper_list::SleeperList; use rt::work_queue::WorkQueue; use rt::sched::Shutdown; use borrow; use rt::comm::*; do run_in_bare_thread { let sleepers = SleeperList::new(); let normal_queue = WorkQueue::new(); let special_queue = WorkQueue::new(); let queues = ~[normal_queue.clone(), special_queue.clone()]; // Our normal scheduler let mut normal_sched = ~Scheduler::new( ~UvEventLoop::new(), normal_queue, queues.clone(), sleepers.clone()); let normal_handle = Cell::new(normal_sched.make_handle()); let friend_handle = normal_sched.make_handle(); // Our special scheduler let mut special_sched = ~Scheduler::new_special( ~UvEventLoop::new(), special_queue.clone(), queues.clone(), sleepers.clone(), false, Some(friend_handle)); let special_handle = Cell::new(special_sched.make_handle()); let t1_handle = special_sched.make_handle(); let t4_handle = special_sched.make_handle(); // Four test tasks: // 1) task is home on special // 2) task not homed, sched doesn't care // 3) task not homed, sched requeues // 4) task not home, send home let task1 = ~do Task::new_root_homed(&mut special_sched.stack_pool, None, Sched(t1_handle)) || { rtassert!(Task::on_appropriate_sched()); }; rtdebug!("task1 id: **%u**", borrow::to_uint(task1)); let task2 = ~do Task::new_root(&mut normal_sched.stack_pool, None) { rtassert!(Task::on_appropriate_sched()); }; let task3 = ~do Task::new_root(&mut normal_sched.stack_pool, None) { rtassert!(Task::on_appropriate_sched()); }; let task4 = ~do Task::new_root_homed(&mut special_sched.stack_pool, None, Sched(t4_handle)) { rtassert!(Task::on_appropriate_sched()); }; rtdebug!("task4 id: **%u**", borrow::to_uint(task4)); let task1 = Cell::new(task1); let task2 = Cell::new(task2); let task3 = Cell::new(task3); let task4 = Cell::new(task4); // Signal from the special task that we are done. let (port, chan) = oneshot::<()>(); let port = Cell::new(port); let chan = Cell::new(chan); let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool, None) { rtdebug!("*about to submit task2*"); Scheduler::run_task(task2.take()); rtdebug!("*about to submit task4*"); Scheduler::run_task(task4.take()); rtdebug!("*normal_task done*"); port.take().recv(); let mut nh = normal_handle.take(); nh.send(Shutdown); let mut sh = special_handle.take(); sh.send(Shutdown); }; rtdebug!("normal task: %u", borrow::to_uint(normal_task)); let special_task = ~do Task::new_root(&mut special_sched.stack_pool, None) { rtdebug!("*about to submit task1*"); Scheduler::run_task(task1.take()); rtdebug!("*about to submit task3*"); Scheduler::run_task(task3.take()); rtdebug!("*done with special_task*"); chan.take().send(()); }; rtdebug!("special task: %u", borrow::to_uint(special_task)); let special_sched = Cell::new(special_sched); let normal_sched = Cell::new(normal_sched); let special_task = Cell::new(special_task); let normal_task = Cell::new(normal_task); let normal_thread = do Thread::start { normal_sched.take().bootstrap(normal_task.take()); rtdebug!("finished with normal_thread"); }; let special_thread = do Thread::start { special_sched.take().bootstrap(special_task.take()); rtdebug!("finished with special_sched"); }; normal_thread.join(); special_thread.join(); } } #[test] fn test_stress_schedule_task_states() { let n = stress_factor() * 120; for _ in range(0, n as int) { test_schedule_home_states(); } } #[test] fn test_io_callback() { // This is a regression test that when there are no schedulable tasks // in the work queue, but we are performing I/O, that once we do put // something in the work queue again the scheduler picks it up and doesn't // exit before emptying the work queue do run_in_newsched_task { do spawntask { let sched = Local::take::(); do sched.deschedule_running_task_and_then |sched, task| { let task = Cell::new(task); do sched.event_loop.callback_ms(10) { rtdebug!("in callback"); let mut sched = Local::take::(); sched.enqueue_blocked_task(task.take()); Local::put(sched); } } } } } #[test] fn handle() { use rt::comm::*; do run_in_bare_thread { let (port, chan) = oneshot::<()>(); let port = Cell::new(port); let chan = Cell::new(chan); let thread_one = do Thread::start { let chan = Cell::new(chan.take()); do run_in_newsched_task_core { chan.take().send(()); } }; let thread_two = do Thread::start { let port = Cell::new(port.take()); do run_in_newsched_task_core { port.take().recv(); } }; thread_two.join(); thread_one.join(); } } #[test] fn multithreading() { use rt::comm::*; use iter::Times; use vec::OwnedVector; use container::Container; do run_in_mt_newsched_task { let mut ports = ~[]; do 10.times { let (port, chan) = oneshot(); let chan_cell = Cell::new(chan); do spawntask_later { chan_cell.take().send(()); } ports.push(port); } while !ports.is_empty() { ports.pop().recv(); } } } #[test] fn thread_ring() { use rt::comm::*; use comm::{GenericPort, GenericChan}; do run_in_mt_newsched_task { let (end_port, end_chan) = oneshot(); let n_tasks = 10; let token = 2000; let (p, ch1) = stream(); let mut p = p; ch1.send((token, end_chan)); let mut i = 2; while i <= n_tasks { let (next_p, ch) = stream(); let imm_i = i; let imm_p = p; do spawntask_random { roundtrip(imm_i, n_tasks, &imm_p, &ch); }; p = next_p; i += 1; } let imm_p = p; let imm_ch = ch1; do spawntask_random { roundtrip(1, n_tasks, &imm_p, &imm_ch); } end_port.recv(); } fn roundtrip(id: int, n_tasks: int, p: &Port<(int, ChanOne<()>)>, ch: &Chan<(int, ChanOne<()>)>) { while (true) { match p.recv() { (1, end_chan) => { debug!("%d\n", id); end_chan.send(()); return; } (token, end_chan) => { debug!("thread: %d got token: %d", id, token); ch.send((token - 1, end_chan)); if token <= n_tasks { return; } } } } } } #[test] fn start_closure_dtor() { use ops::Drop; // Regression test that the `start` task entrypoint can // contain dtors that use task resources do run_in_newsched_task { struct S { field: () } impl Drop for S { fn drop(&self) { let _foo = @0; } } let s = S { field: () }; do spawntask { let _ss = &s; } } } }