1
Fork 0

Spawn new tasks onto the primary scheduler by default. #3760

This commit is contained in:
Brian Anderson 2013-01-08 19:46:12 -08:00
parent 989667e545
commit 090b247056
6 changed files with 147 additions and 91 deletions

View file

@ -52,7 +52,7 @@ use prelude::*;
use ptr; use ptr;
use result; use result;
use task::local_data_priv::{local_get, local_set}; use task::local_data_priv::{local_get, local_set};
use task::rt::{task_id, rust_task}; use task::rt::{task_id, sched_id, rust_task};
use task; use task;
use util; use util;
use util::replace; use util::replace;
@ -62,6 +62,12 @@ pub mod local_data;
pub mod rt; pub mod rt;
pub mod spawn; pub mod spawn;
/// A handle to a scheduler
#[deriving_eq]
pub enum Scheduler {
SchedulerHandle(sched_id)
}
/// A handle to a task /// A handle to a task
#[deriving_eq] #[deriving_eq]
pub enum Task { pub enum Task {
@ -95,7 +101,21 @@ impl TaskResult : Eq {
} }
/// Scheduler modes /// Scheduler modes
#[deriving_eq]
pub enum SchedMode { pub enum SchedMode {
/// Run task on the default scheduler
DefaultScheduler,
/// Run task on the current scheduler
CurrentScheduler,
/// Run task on a specific scheduler
ExistingScheduler(Scheduler),
/**
* Tasks are scheduled on the main OS thread
*
* The main OS thread is the thread used to launch the runtime which,
* in most cases, is the process's initial thread as created by the OS.
*/
PlatformThread,
/// All tasks run in the same OS thread /// All tasks run in the same OS thread
SingleThreaded, SingleThreaded,
/// Tasks are distributed among available CPUs /// Tasks are distributed among available CPUs
@ -104,53 +124,6 @@ pub enum SchedMode {
ThreadPerTask, ThreadPerTask,
/// Tasks are distributed among a fixed number of OS threads /// Tasks are distributed among a fixed number of OS threads
ManualThreads(uint), ManualThreads(uint),
/**
* Tasks are scheduled on the main OS thread
*
* The main OS thread is the thread used to launch the runtime which,
* in most cases, is the process's initial thread as created by the OS.
*/
PlatformThread
}
impl SchedMode : cmp::Eq {
pure fn eq(&self, other: &SchedMode) -> bool {
match (*self) {
SingleThreaded => {
match (*other) {
SingleThreaded => true,
_ => false
}
}
ThreadPerCore => {
match (*other) {
ThreadPerCore => true,
_ => false
}
}
ThreadPerTask => {
match (*other) {
ThreadPerTask => true,
_ => false
}
}
ManualThreads(e0a) => {
match (*other) {
ManualThreads(e0b) => e0a == e0b,
_ => false
}
}
PlatformThread => {
match (*other) {
PlatformThread => true,
_ => false
}
}
}
}
pure fn ne(&self, other: &SchedMode) -> bool {
!(*self).eq(other)
}
} }
/** /**
@ -204,7 +177,7 @@ pub type TaskOpts = {
linked: bool, linked: bool,
supervised: bool, supervised: bool,
mut notify_chan: Option<Chan<TaskResult>>, mut notify_chan: Option<Chan<TaskResult>>,
sched: Option<SchedOpts>, sched: SchedOpts,
}; };
/** /**
@ -370,7 +343,7 @@ impl TaskBuilder {
linked: self.opts.linked, linked: self.opts.linked,
supervised: self.opts.supervised, supervised: self.opts.supervised,
mut notify_chan: move notify_chan, mut notify_chan: move notify_chan,
sched: Some({ mode: mode, foreign_stack_size: None}) sched: { mode: mode, foreign_stack_size: None}
}, },
can_not_copy: None, can_not_copy: None,
.. self.consume() .. self.consume()
@ -486,7 +459,10 @@ pub fn default_task_opts() -> TaskOpts {
linked: true, linked: true,
supervised: false, supervised: false,
mut notify_chan: None, mut notify_chan: None,
sched: None sched: {
mode: DefaultScheduler,
foreign_stack_size: None
}
} }
} }
@ -539,10 +515,9 @@ pub fn spawn_with<A:Owned>(arg: A, f: fn~(v: A)) {
pub fn spawn_sched(mode: SchedMode, f: fn~()) { pub fn spawn_sched(mode: SchedMode, f: fn~()) {
/*! /*!
* Creates a new scheduler and executes a task on it * Creates a new task on a new or existing scheduler
*
* Tasks subsequently spawned by that task will also execute on * When there are no more tasks to execute the
* the new scheduler. When there are no more tasks to execute the
* scheduler terminates. * scheduler terminates.
* *
* # Failure * # Failure
@ -590,6 +565,10 @@ pub fn get_task() -> Task {
TaskHandle(rt::get_task_id()) TaskHandle(rt::get_task_id())
} }
pub fn get_scheduler() -> Scheduler {
SchedulerHandle(rt::rust_get_sched_id())
}
/** /**
* Temporarily make the task unkillable * Temporarily make the task unkillable
* *
@ -927,16 +906,19 @@ fn test_spawn_sched() {
} }
#[test] #[test]
fn test_spawn_sched_childs_on_same_sched() { fn test_spawn_sched_childs_on_default_sched() {
let po = oldcomm::Port(); let po = oldcomm::Port();
let ch = oldcomm::Chan(&po); let ch = oldcomm::Chan(&po);
// Assuming tests run on the default scheduler
let default_id = rt::rust_get_sched_id();
do spawn_sched(SingleThreaded) { do spawn_sched(SingleThreaded) {
let parent_sched_id = rt::rust_get_sched_id(); let parent_sched_id = rt::rust_get_sched_id();
do spawn { do spawn {
let child_sched_id = rt::rust_get_sched_id(); let child_sched_id = rt::rust_get_sched_id();
// This should be on the same scheduler assert parent_sched_id != child_sched_id;
assert parent_sched_id == child_sched_id; assert child_sched_id == default_id;
oldcomm::send(ch, ()); oldcomm::send(ch, ());
}; };
}; };
@ -1206,7 +1188,7 @@ fn test_spawn_thread_on_demand() {
let (port2, chan2) = pipes::stream(); let (port2, chan2) = pipes::stream();
do spawn() |move chan2| { do spawn_sched(CurrentScheduler) |move chan2| {
chan2.send(()); chan2.send(());
} }

View file

@ -88,6 +88,7 @@ use task::rt::rust_closure;
use task::rt; use task::rt;
use task::{Failure, ManualThreads, PlatformThread, SchedOpts, SingleThreaded}; use task::{Failure, ManualThreads, PlatformThread, SchedOpts, SingleThreaded};
use task::{Success, TaskOpts, TaskResult, ThreadPerCore, ThreadPerTask}; use task::{Success, TaskOpts, TaskResult, ThreadPerCore, ThreadPerTask};
use task::{ExistingScheduler, SchedulerHandle};
use task::{default_task_opts, unkillable}; use task::{default_task_opts, unkillable};
use uint; use uint;
use util; use util;
@ -525,9 +526,9 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) {
// Agh. Get move-mode items into the closure. FIXME (#2829) // Agh. Get move-mode items into the closure. FIXME (#2829)
let (child_tg, ancestors, f) = option::swap_unwrap(child_data); let (child_tg, ancestors, f) = option::swap_unwrap(child_data);
// Create child task. // Create child task.
let new_task = match opts.sched { let new_task = match opts.sched.mode {
None => rt::new_task(), DefaultScheduler => rt::new_task(),
Some(sched_opts) => new_task_in_new_sched(sched_opts) _ => new_task_in_sched(opts.sched)
}; };
assert !new_task.is_null(); assert !new_task.is_null();
// Getting killed after here would leak the task. // Getting killed after here would leak the task.
@ -631,12 +632,16 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) {
} }
} }
fn new_task_in_new_sched(opts: SchedOpts) -> *rust_task { fn new_task_in_sched(opts: SchedOpts) -> *rust_task {
if opts.foreign_stack_size != None { if opts.foreign_stack_size != None {
fail ~"foreign_stack_size scheduler option unimplemented"; fail ~"foreign_stack_size scheduler option unimplemented";
} }
let num_threads = match opts.mode { let num_threads = match opts.mode {
DefaultScheduler
| CurrentScheduler
| ExistingScheduler(*)
| PlatformThread => 0u, /* Won't be used */
SingleThreaded => 1u, SingleThreaded => 1u,
ThreadPerCore => rt::rust_num_threads(), ThreadPerCore => rt::rust_num_threads(),
ThreadPerTask => { ThreadPerTask => {
@ -648,13 +653,13 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) {
} }
threads threads
} }
PlatformThread => 0u /* Won't be used */
}; };
let sched_id = if opts.mode != PlatformThread { let sched_id = match opts.mode {
rt::rust_new_sched(num_threads) CurrentScheduler => rt::rust_get_sched_id(),
} else { ExistingScheduler(SchedulerHandle(id)) => id,
rt::rust_osmain_sched_id() PlatformThread => rt::rust_osmain_sched_id(),
_ => rt::rust_new_sched(num_threads)
}; };
rt::rust_new_task_in_sched(sched_id) rt::rust_new_task_in_sched(sched_id)
} }

View file

@ -43,8 +43,8 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) {
rust_kernel *kernel = new rust_kernel(env); rust_kernel *kernel = new rust_kernel(env);
// Create the main scheduler and the main task // Create the main task
rust_sched_id sched_id = kernel->create_scheduler(env->num_sched_threads); rust_sched_id sched_id = kernel->main_sched_id();
rust_scheduler *sched = kernel->get_scheduler_by_id(sched_id); rust_scheduler *sched = kernel->get_scheduler_by_id(sched_id);
assert(sched != NULL); assert(sched != NULL);
rust_task *root_task = sched->create_task(NULL, "main"); rust_task *root_task = sched->create_task(NULL, "main");

View file

@ -652,7 +652,10 @@ new_task_common(rust_scheduler *sched, rust_task *parent) {
extern "C" CDECL rust_task* extern "C" CDECL rust_task*
new_task() { new_task() {
rust_task *task = rust_get_current_task(); rust_task *task = rust_get_current_task();
return new_task_common(task->sched, task); rust_sched_id sched_id = task->kernel->main_sched_id();
rust_scheduler *sched = task->kernel->get_scheduler_by_id(sched_id);
assert(sched != NULL && "should always have a main scheduler");
return new_task_common(sched, task);
} }
extern "C" CDECL rust_task* extern "C" CDECL rust_task*

View file

@ -30,6 +30,7 @@ rust_kernel::rust_kernel(rust_env *env) :
rval(0), rval(0),
max_sched_id(1), max_sched_id(1),
killed(false), killed(false),
already_exiting(false),
sched_reaper(this), sched_reaper(this),
osmain_driver(NULL), osmain_driver(NULL),
non_weak_tasks(0), non_weak_tasks(0),
@ -38,13 +39,20 @@ rust_kernel::rust_kernel(rust_env *env) :
env(env) env(env)
{ {
// Create the single threaded scheduler that will run on the platform's // Create the single threaded scheduler that will run on the platform's
// main thread // main thread
rust_manual_sched_launcher_factory *launchfac = rust_manual_sched_launcher_factory *osmain_launchfac =
new rust_manual_sched_launcher_factory(); new rust_manual_sched_launcher_factory();
osmain_scheduler = create_scheduler(launchfac, 1, false); osmain_scheduler = create_scheduler(osmain_launchfac, 1, false);
osmain_driver = launchfac->get_driver(); osmain_driver = osmain_launchfac->get_driver();
// Create the primary scheduler
rust_thread_sched_launcher_factory *main_launchfac =
new rust_thread_sched_launcher_factory();
main_scheduler = create_scheduler(main_launchfac,
env->num_sched_threads,
false);
sched_reaper.start(); sched_reaper.start();
} }
@ -103,15 +111,22 @@ rust_kernel::create_scheduler(rust_sched_launcher_factory *launchfac,
{ {
scoped_lock with(sched_lock); scoped_lock with(sched_lock);
if (sched_table.size() == 1) { /*if (sched_table.size() == 2) {
// The OS main scheduler may not exit while there are other // The main and OS main schedulers may not exit while there are
// schedulers // other schedulers
KLOG_("Disallowing osmain scheduler to exit"); KLOG_("Disallowing main scheduler to exit");
rust_scheduler *sched = rust_scheduler *main_sched =
get_scheduler_by_id_nolock(osmain_scheduler); get_scheduler_by_id_nolock(main_scheduler);
assert(sched != NULL); assert(main_sched != NULL);
sched->disallow_exit(); main_sched->disallow_exit();
} }
if (sched_table.size() == 1) {
KLOG_("Disallowing osmain scheduler to exit");
rust_scheduler *osmain_sched =
get_scheduler_by_id_nolock(osmain_scheduler);
assert(osmain_sched != NULL);
osmain_sched->disallow_exit();
}*/
id = max_sched_id++; id = max_sched_id++;
assert(id != INTPTR_MAX && "Hit the maximum scheduler id"); assert(id != INTPTR_MAX && "Hit the maximum scheduler id");
@ -175,14 +190,21 @@ rust_kernel::wait_for_schedulers()
sched_table.erase(iter); sched_table.erase(iter);
sched->join_task_threads(); sched->join_task_threads();
sched->deref(); sched->deref();
/*if (sched_table.size() == 2) {
KLOG_("Allowing main scheduler to exit");
// It's only the main schedulers left. Tell them to exit
rust_scheduler *main_sched =
get_scheduler_by_id_nolock(main_scheduler);
assert(main_sched != NULL);
main_sched->allow_exit();
}
if (sched_table.size() == 1) { if (sched_table.size() == 1) {
KLOG_("Allowing osmain scheduler to exit"); KLOG_("Allowing osmain scheduler to exit");
// It's only the osmain scheduler left. Tell it to exit rust_scheduler *osmain_sched =
rust_scheduler *sched =
get_scheduler_by_id_nolock(osmain_scheduler); get_scheduler_by_id_nolock(osmain_scheduler);
assert(sched != NULL); assert(osmain_sched != NULL);
sched->allow_exit(); osmain_sched->allow_exit();
} }*/
} }
if (!sched_table.empty()) { if (!sched_table.empty()) {
sched_lock.wait(); sched_lock.wait();
@ -318,13 +340,31 @@ rust_kernel::register_task() {
KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
} }
void
rust_kernel::allow_scheduler_exit() {
scoped_lock with(sched_lock);
KLOG_("Allowing main scheduler to exit");
// It's only the main schedulers left. Tell them to exit
rust_scheduler *main_sched =
get_scheduler_by_id_nolock(main_scheduler);
assert(main_sched != NULL);
main_sched->allow_exit();
KLOG_("Allowing osmain scheduler to exit");
rust_scheduler *osmain_sched =
get_scheduler_by_id_nolock(osmain_scheduler);
assert(osmain_sched != NULL);
osmain_sched->allow_exit();
}
void void
rust_kernel::unregister_task() { rust_kernel::unregister_task() {
KLOG_("Unregistering task"); KLOG_("Unregistering task");
uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks); uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks);
KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
if (new_non_weak_tasks == 0) { if (new_non_weak_tasks == 0) {
end_weak_tasks(); begin_shutdown();
} }
} }
@ -338,7 +378,7 @@ rust_kernel::weaken_task(rust_port_id chan) {
uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks); uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks);
KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks); KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
if (new_non_weak_tasks == 0) { if (new_non_weak_tasks == 0) {
end_weak_tasks(); begin_shutdown();
} }
} }
@ -374,6 +414,23 @@ rust_kernel::end_weak_tasks() {
} }
} }
void
rust_kernel::begin_shutdown() {
{
scoped_lock with(sched_lock);
// FIXME #4410: This shouldn't be necessary, but because of
// unweaken_task this may end up getting called multiple times.
if (already_exiting) {
return;
} else {
already_exiting = true;
}
}
allow_scheduler_exit();
end_weak_tasks();
}
bool bool
rust_kernel::send_to_port(rust_port_id chan, void *sptr) { rust_kernel::send_to_port(rust_port_id chan, void *sptr) {
KLOG_("rust_port_id*_send port: 0x%" PRIxPTR, (uintptr_t) chan); KLOG_("rust_port_id*_send port: 0x%" PRIxPTR, (uintptr_t) chan);

View file

@ -82,7 +82,8 @@ class rust_kernel {
lock_and_signal rval_lock; lock_and_signal rval_lock;
int rval; int rval;
// Protects max_sched_id and sched_table, join_list, killed // Protects max_sched_id and sched_table, join_list, killed,
// already_exiting
lock_and_signal sched_lock; lock_and_signal sched_lock;
// The next scheduler id // The next scheduler id
rust_sched_id max_sched_id; rust_sched_id max_sched_id;
@ -95,8 +96,13 @@ class rust_kernel {
// task group fails). This propagates to all new schedulers and tasks // task group fails). This propagates to all new schedulers and tasks
// created after it is set. // created after it is set.
bool killed; bool killed;
bool already_exiting;
rust_sched_reaper sched_reaper; rust_sched_reaper sched_reaper;
// The primary scheduler
rust_sched_id main_scheduler;
// The single-threaded scheduler that uses the main thread // The single-threaded scheduler that uses the main thread
rust_sched_id osmain_scheduler; rust_sched_id osmain_scheduler;
// Runs the single-threaded scheduler that executes tasks // Runs the single-threaded scheduler that executes tasks
@ -111,7 +117,9 @@ class rust_kernel {
std::vector<rust_port_id> weak_task_chans; std::vector<rust_port_id> weak_task_chans;
rust_scheduler* get_scheduler_by_id_nolock(rust_sched_id id); rust_scheduler* get_scheduler_by_id_nolock(rust_sched_id id);
void allow_scheduler_exit();
void end_weak_tasks(); void end_weak_tasks();
void begin_shutdown();
// Used to communicate with the process-side, global libuv loop // Used to communicate with the process-side, global libuv loop
uintptr_t global_loop_chan; uintptr_t global_loop_chan;
@ -155,6 +163,7 @@ public:
void set_exit_status(int code); void set_exit_status(int code);
rust_sched_id main_sched_id() { return main_scheduler; }
rust_sched_id osmain_sched_id() { return osmain_scheduler; } rust_sched_id osmain_sched_id() { return osmain_scheduler; }
void register_task(); void register_task();