diff --git a/src/libcore/task/mod.rs b/src/libcore/task/mod.rs index c6b0491786d..86d38a18c50 100644 --- a/src/libcore/task/mod.rs +++ b/src/libcore/task/mod.rs @@ -52,7 +52,7 @@ use prelude::*; use ptr; use result; use task::local_data_priv::{local_get, local_set}; -use task::rt::{task_id, rust_task}; +use task::rt::{task_id, sched_id, rust_task}; use task; use util; use util::replace; @@ -62,6 +62,12 @@ pub mod local_data; pub mod rt; pub mod spawn; +/// A handle to a scheduler +#[deriving_eq] +pub enum Scheduler { + SchedulerHandle(sched_id) +} + /// A handle to a task #[deriving_eq] pub enum Task { @@ -95,7 +101,21 @@ impl TaskResult : Eq { } /// Scheduler modes +#[deriving_eq] pub enum SchedMode { + /// Run task on the default scheduler + DefaultScheduler, + /// Run task on the current scheduler + CurrentScheduler, + /// Run task on a specific scheduler + ExistingScheduler(Scheduler), + /** + * Tasks are scheduled on the main OS thread + * + * The main OS thread is the thread used to launch the runtime which, + * in most cases, is the process's initial thread as created by the OS. + */ + PlatformThread, /// All tasks run in the same OS thread SingleThreaded, /// Tasks are distributed among available CPUs @@ -104,53 +124,6 @@ pub enum SchedMode { ThreadPerTask, /// Tasks are distributed among a fixed number of OS threads ManualThreads(uint), - /** - * Tasks are scheduled on the main OS thread - * - * The main OS thread is the thread used to launch the runtime which, - * in most cases, is the process's initial thread as created by the OS. - */ - PlatformThread -} - -impl SchedMode : cmp::Eq { - pure fn eq(&self, other: &SchedMode) -> bool { - match (*self) { - SingleThreaded => { - match (*other) { - SingleThreaded => true, - _ => false - } - } - ThreadPerCore => { - match (*other) { - ThreadPerCore => true, - _ => false - } - } - ThreadPerTask => { - match (*other) { - ThreadPerTask => true, - _ => false - } - } - ManualThreads(e0a) => { - match (*other) { - ManualThreads(e0b) => e0a == e0b, - _ => false - } - } - PlatformThread => { - match (*other) { - PlatformThread => true, - _ => false - } - } - } - } - pure fn ne(&self, other: &SchedMode) -> bool { - !(*self).eq(other) - } } /** @@ -204,7 +177,7 @@ pub type TaskOpts = { linked: bool, supervised: bool, mut notify_chan: Option>, - sched: Option, + sched: SchedOpts, }; /** @@ -370,7 +343,7 @@ impl TaskBuilder { linked: self.opts.linked, supervised: self.opts.supervised, mut notify_chan: move notify_chan, - sched: Some({ mode: mode, foreign_stack_size: None}) + sched: { mode: mode, foreign_stack_size: None} }, can_not_copy: None, .. self.consume() @@ -486,7 +459,10 @@ pub fn default_task_opts() -> TaskOpts { linked: true, supervised: false, mut notify_chan: None, - sched: None + sched: { + mode: DefaultScheduler, + foreign_stack_size: None + } } } @@ -539,10 +515,9 @@ pub fn spawn_with(arg: A, f: fn~(v: A)) { pub fn spawn_sched(mode: SchedMode, f: fn~()) { /*! - * Creates a new scheduler and executes a task on it - * - * Tasks subsequently spawned by that task will also execute on - * the new scheduler. When there are no more tasks to execute the + * Creates a new task on a new or existing scheduler + + * When there are no more tasks to execute the * scheduler terminates. * * # Failure @@ -590,6 +565,10 @@ pub fn get_task() -> Task { TaskHandle(rt::get_task_id()) } +pub fn get_scheduler() -> Scheduler { + SchedulerHandle(rt::rust_get_sched_id()) +} + /** * Temporarily make the task unkillable * @@ -927,16 +906,19 @@ fn test_spawn_sched() { } #[test] -fn test_spawn_sched_childs_on_same_sched() { +fn test_spawn_sched_childs_on_default_sched() { let po = oldcomm::Port(); let ch = oldcomm::Chan(&po); + // Assuming tests run on the default scheduler + let default_id = rt::rust_get_sched_id(); + do spawn_sched(SingleThreaded) { let parent_sched_id = rt::rust_get_sched_id(); do spawn { let child_sched_id = rt::rust_get_sched_id(); - // This should be on the same scheduler - assert parent_sched_id == child_sched_id; + assert parent_sched_id != child_sched_id; + assert child_sched_id == default_id; oldcomm::send(ch, ()); }; }; @@ -1206,7 +1188,7 @@ fn test_spawn_thread_on_demand() { let (port2, chan2) = pipes::stream(); - do spawn() |move chan2| { + do spawn_sched(CurrentScheduler) |move chan2| { chan2.send(()); } diff --git a/src/libcore/task/spawn.rs b/src/libcore/task/spawn.rs index 1c5531303e1..e3afa7c4535 100644 --- a/src/libcore/task/spawn.rs +++ b/src/libcore/task/spawn.rs @@ -88,6 +88,7 @@ use task::rt::rust_closure; use task::rt; use task::{Failure, ManualThreads, PlatformThread, SchedOpts, SingleThreaded}; use task::{Success, TaskOpts, TaskResult, ThreadPerCore, ThreadPerTask}; +use task::{ExistingScheduler, SchedulerHandle}; use task::{default_task_opts, unkillable}; use uint; use util; @@ -525,9 +526,9 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) { // Agh. Get move-mode items into the closure. FIXME (#2829) let (child_tg, ancestors, f) = option::swap_unwrap(child_data); // Create child task. - let new_task = match opts.sched { - None => rt::new_task(), - Some(sched_opts) => new_task_in_new_sched(sched_opts) + let new_task = match opts.sched.mode { + DefaultScheduler => rt::new_task(), + _ => new_task_in_sched(opts.sched) }; assert !new_task.is_null(); // Getting killed after here would leak the task. @@ -631,12 +632,16 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) { } } - fn new_task_in_new_sched(opts: SchedOpts) -> *rust_task { + fn new_task_in_sched(opts: SchedOpts) -> *rust_task { if opts.foreign_stack_size != None { fail ~"foreign_stack_size scheduler option unimplemented"; } let num_threads = match opts.mode { + DefaultScheduler + | CurrentScheduler + | ExistingScheduler(*) + | PlatformThread => 0u, /* Won't be used */ SingleThreaded => 1u, ThreadPerCore => rt::rust_num_threads(), ThreadPerTask => { @@ -648,13 +653,13 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) { } threads } - PlatformThread => 0u /* Won't be used */ }; - let sched_id = if opts.mode != PlatformThread { - rt::rust_new_sched(num_threads) - } else { - rt::rust_osmain_sched_id() + let sched_id = match opts.mode { + CurrentScheduler => rt::rust_get_sched_id(), + ExistingScheduler(SchedulerHandle(id)) => id, + PlatformThread => rt::rust_osmain_sched_id(), + _ => rt::rust_new_sched(num_threads) }; rt::rust_new_task_in_sched(sched_id) } diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index f21a7441640..803da32cbc8 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -43,8 +43,8 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { rust_kernel *kernel = new rust_kernel(env); - // Create the main scheduler and the main task - rust_sched_id sched_id = kernel->create_scheduler(env->num_sched_threads); + // Create the main task + rust_sched_id sched_id = kernel->main_sched_id(); rust_scheduler *sched = kernel->get_scheduler_by_id(sched_id); assert(sched != NULL); rust_task *root_task = sched->create_task(NULL, "main"); diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index de69272aca1..cbc58e85db6 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -652,7 +652,10 @@ new_task_common(rust_scheduler *sched, rust_task *parent) { extern "C" CDECL rust_task* new_task() { rust_task *task = rust_get_current_task(); - return new_task_common(task->sched, task); + rust_sched_id sched_id = task->kernel->main_sched_id(); + rust_scheduler *sched = task->kernel->get_scheduler_by_id(sched_id); + assert(sched != NULL && "should always have a main scheduler"); + return new_task_common(sched, task); } extern "C" CDECL rust_task* diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 8871d133ea1..cc98b474ee3 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -30,6 +30,7 @@ rust_kernel::rust_kernel(rust_env *env) : rval(0), max_sched_id(1), killed(false), + already_exiting(false), sched_reaper(this), osmain_driver(NULL), non_weak_tasks(0), @@ -38,13 +39,20 @@ rust_kernel::rust_kernel(rust_env *env) : env(env) { - // Create the single threaded scheduler that will run on the platform's // main thread - rust_manual_sched_launcher_factory *launchfac = + rust_manual_sched_launcher_factory *osmain_launchfac = new rust_manual_sched_launcher_factory(); - osmain_scheduler = create_scheduler(launchfac, 1, false); - osmain_driver = launchfac->get_driver(); + osmain_scheduler = create_scheduler(osmain_launchfac, 1, false); + osmain_driver = osmain_launchfac->get_driver(); + + // Create the primary scheduler + rust_thread_sched_launcher_factory *main_launchfac = + new rust_thread_sched_launcher_factory(); + main_scheduler = create_scheduler(main_launchfac, + env->num_sched_threads, + false); + sched_reaper.start(); } @@ -103,15 +111,22 @@ rust_kernel::create_scheduler(rust_sched_launcher_factory *launchfac, { scoped_lock with(sched_lock); - if (sched_table.size() == 1) { - // The OS main scheduler may not exit while there are other - // schedulers - KLOG_("Disallowing osmain scheduler to exit"); - rust_scheduler *sched = - get_scheduler_by_id_nolock(osmain_scheduler); - assert(sched != NULL); - sched->disallow_exit(); + /*if (sched_table.size() == 2) { + // The main and OS main schedulers may not exit while there are + // other schedulers + KLOG_("Disallowing main scheduler to exit"); + rust_scheduler *main_sched = + get_scheduler_by_id_nolock(main_scheduler); + assert(main_sched != NULL); + main_sched->disallow_exit(); } + if (sched_table.size() == 1) { + KLOG_("Disallowing osmain scheduler to exit"); + rust_scheduler *osmain_sched = + get_scheduler_by_id_nolock(osmain_scheduler); + assert(osmain_sched != NULL); + osmain_sched->disallow_exit(); + }*/ id = max_sched_id++; assert(id != INTPTR_MAX && "Hit the maximum scheduler id"); @@ -175,14 +190,21 @@ rust_kernel::wait_for_schedulers() sched_table.erase(iter); sched->join_task_threads(); sched->deref(); + /*if (sched_table.size() == 2) { + KLOG_("Allowing main scheduler to exit"); + // It's only the main schedulers left. Tell them to exit + rust_scheduler *main_sched = + get_scheduler_by_id_nolock(main_scheduler); + assert(main_sched != NULL); + main_sched->allow_exit(); + } if (sched_table.size() == 1) { KLOG_("Allowing osmain scheduler to exit"); - // It's only the osmain scheduler left. Tell it to exit - rust_scheduler *sched = + rust_scheduler *osmain_sched = get_scheduler_by_id_nolock(osmain_scheduler); - assert(sched != NULL); - sched->allow_exit(); - } + assert(osmain_sched != NULL); + osmain_sched->allow_exit(); + }*/ } if (!sched_table.empty()) { sched_lock.wait(); @@ -318,13 +340,31 @@ rust_kernel::register_task() { KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); } +void +rust_kernel::allow_scheduler_exit() { + scoped_lock with(sched_lock); + + KLOG_("Allowing main scheduler to exit"); + // It's only the main schedulers left. Tell them to exit + rust_scheduler *main_sched = + get_scheduler_by_id_nolock(main_scheduler); + assert(main_sched != NULL); + main_sched->allow_exit(); + + KLOG_("Allowing osmain scheduler to exit"); + rust_scheduler *osmain_sched = + get_scheduler_by_id_nolock(osmain_scheduler); + assert(osmain_sched != NULL); + osmain_sched->allow_exit(); +} + void rust_kernel::unregister_task() { KLOG_("Unregistering task"); uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks); KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); if (new_non_weak_tasks == 0) { - end_weak_tasks(); + begin_shutdown(); } } @@ -338,7 +378,7 @@ rust_kernel::weaken_task(rust_port_id chan) { uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks); KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); if (new_non_weak_tasks == 0) { - end_weak_tasks(); + begin_shutdown(); } } @@ -374,6 +414,23 @@ rust_kernel::end_weak_tasks() { } } +void +rust_kernel::begin_shutdown() { + { + scoped_lock with(sched_lock); + // FIXME #4410: This shouldn't be necessary, but because of + // unweaken_task this may end up getting called multiple times. + if (already_exiting) { + return; + } else { + already_exiting = true; + } + } + + allow_scheduler_exit(); + end_weak_tasks(); +} + bool rust_kernel::send_to_port(rust_port_id chan, void *sptr) { KLOG_("rust_port_id*_send port: 0x%" PRIxPTR, (uintptr_t) chan); diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index cd52bfae8d3..13fd8934172 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -82,7 +82,8 @@ class rust_kernel { lock_and_signal rval_lock; int rval; - // Protects max_sched_id and sched_table, join_list, killed + // Protects max_sched_id and sched_table, join_list, killed, + // already_exiting lock_and_signal sched_lock; // The next scheduler id rust_sched_id max_sched_id; @@ -95,8 +96,13 @@ class rust_kernel { // task group fails). This propagates to all new schedulers and tasks // created after it is set. bool killed; + bool already_exiting; + rust_sched_reaper sched_reaper; + + // The primary scheduler + rust_sched_id main_scheduler; // The single-threaded scheduler that uses the main thread rust_sched_id osmain_scheduler; // Runs the single-threaded scheduler that executes tasks @@ -111,7 +117,9 @@ class rust_kernel { std::vector weak_task_chans; rust_scheduler* get_scheduler_by_id_nolock(rust_sched_id id); + void allow_scheduler_exit(); void end_weak_tasks(); + void begin_shutdown(); // Used to communicate with the process-side, global libuv loop uintptr_t global_loop_chan; @@ -155,6 +163,7 @@ public: void set_exit_status(int code); + rust_sched_id main_sched_id() { return main_scheduler; } rust_sched_id osmain_sched_id() { return osmain_scheduler; } void register_task();