Handle notification channels in task.rs
This commit is contained in:
parent
1e241b5abd
commit
d9e8efca19
1 changed files with 34 additions and 17 deletions
|
@ -612,18 +612,23 @@ class taskgroup {
|
||||||
// Lists of tasks who will kill us if they fail, but whom we won't kill.
|
// Lists of tasks who will kill us if they fail, but whom we won't kill.
|
||||||
let parents: option<(taskgroup_arc,uint)>;
|
let parents: option<(taskgroup_arc,uint)>;
|
||||||
let is_main: bool;
|
let is_main: bool;
|
||||||
|
let notifier: option<auto_notify>;
|
||||||
new(me: *rust_task, -tasks: taskgroup_arc, my_pos: uint,
|
new(me: *rust_task, -tasks: taskgroup_arc, my_pos: uint,
|
||||||
-parents: option<(taskgroup_arc,uint)>, is_main: bool) {
|
-parents: option<(taskgroup_arc,uint)>, is_main: bool,
|
||||||
self.me = me;
|
-notifier: option<auto_notify>) {
|
||||||
self.tasks = tasks;
|
self.me = me;
|
||||||
self.my_pos = my_pos;
|
self.tasks = tasks;
|
||||||
self.parents = parents;
|
self.my_pos = my_pos;
|
||||||
self.is_main = is_main;
|
self.parents = parents;
|
||||||
|
self.is_main = is_main;
|
||||||
|
self.notifier = notifier;
|
||||||
|
self.notifier.iter(|x| { x.failed = false; });
|
||||||
}
|
}
|
||||||
// Runs on task exit.
|
// Runs on task exit.
|
||||||
drop {
|
drop {
|
||||||
// If we are failing, the whole taskgroup needs to die.
|
// If we are failing, the whole taskgroup needs to die.
|
||||||
if rustrt::rust_task_is_unwinding(self.me) {
|
if rustrt::rust_task_is_unwinding(self.me) {
|
||||||
|
self.notifier.iter(|x| { x.failed = true; });
|
||||||
// Take everybody down with us.
|
// Take everybody down with us.
|
||||||
kill_taskgroup(self.tasks, self.me, self.my_pos, self.is_main);
|
kill_taskgroup(self.tasks, self.me, self.my_pos, self.is_main);
|
||||||
} else {
|
} else {
|
||||||
|
@ -641,6 +646,19 @@ class taskgroup {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class auto_notify {
|
||||||
|
let notify_chan: comm::chan<notification>;
|
||||||
|
let mut failed: bool;
|
||||||
|
new(chan: comm::chan<notification>) {
|
||||||
|
self.notify_chan = chan;
|
||||||
|
self.failed = true; // Un-set above when taskgroup successfully made.
|
||||||
|
}
|
||||||
|
drop {
|
||||||
|
let result = if self.failed { failure } else { success };
|
||||||
|
comm::send(self.notify_chan, exit(get_task(), result));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn enlist_in_taskgroup(group_arc: taskgroup_arc,
|
fn enlist_in_taskgroup(group_arc: taskgroup_arc,
|
||||||
me: *rust_task) -> option<uint> {
|
me: *rust_task) -> option<uint> {
|
||||||
do group_arc.with |_c, state| {
|
do group_arc.with |_c, state| {
|
||||||
|
@ -750,7 +768,7 @@ fn share_spawner_taskgroup(linked: bool)
|
||||||
let tasks = arc::exclusive(some((dvec::from_elem(some(me)),
|
let tasks = arc::exclusive(some((dvec::from_elem(some(me)),
|
||||||
dvec::dvec())));
|
dvec::dvec())));
|
||||||
// Main group has no parent group.
|
// Main group has no parent group.
|
||||||
let group = @taskgroup(me, tasks.clone(), 0, none, true);
|
let group = @taskgroup(me, tasks.clone(), 0, none, true, none);
|
||||||
unsafe { local_set(me, taskgroup_key(), group); }
|
unsafe { local_set(me, taskgroup_key(), group); }
|
||||||
// Tell child task it's also in the main group.
|
// Tell child task it's also in the main group.
|
||||||
// Whether or not it wanted our parent group, we haven't got one.
|
// Whether or not it wanted our parent group, we haven't got one.
|
||||||
|
@ -796,15 +814,11 @@ fn spawn_raw(opts: task_opts, +f: fn~()) {
|
||||||
// Getting killed after here would leak the task.
|
// Getting killed after here would leak the task.
|
||||||
|
|
||||||
let child_wrapper =
|
let child_wrapper =
|
||||||
make_child_wrapper(new_task, child_tg, parent_tg, is_main, f);
|
make_child_wrapper(new_task, child_tg, parent_tg, is_main,
|
||||||
|
opts.notify_chan, f);
|
||||||
let fptr = ptr::addr_of(child_wrapper);
|
let fptr = ptr::addr_of(child_wrapper);
|
||||||
let closure: *rust_closure = unsafe::reinterpret_cast(fptr);
|
let closure: *rust_closure = unsafe::reinterpret_cast(fptr);
|
||||||
|
|
||||||
do option::iter(opts.notify_chan) |c| {
|
|
||||||
// FIXME (#1087): Would like to do notification in Rust
|
|
||||||
rustrt::rust_task_config_notify(new_task, c);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Getting killed between these two calls would free the child's
|
// Getting killed between these two calls would free the child's
|
||||||
// closure. (Reordering them wouldn't help - then getting killed
|
// closure. (Reordering them wouldn't help - then getting killed
|
||||||
// between them would leak.)
|
// between them would leak.)
|
||||||
|
@ -829,6 +843,7 @@ fn spawn_raw(opts: task_opts, +f: fn~()) {
|
||||||
// }
|
// }
|
||||||
fn make_child_wrapper(child: *rust_task, -child_tg: taskgroup_arc,
|
fn make_child_wrapper(child: *rust_task, -child_tg: taskgroup_arc,
|
||||||
-parent_tg: option<taskgroup_arc>, is_main: bool,
|
-parent_tg: option<taskgroup_arc>, is_main: bool,
|
||||||
|
notify_chan: option<comm::chan<notification>>,
|
||||||
-f: fn~()) -> fn~() {
|
-f: fn~()) -> fn~() {
|
||||||
let child_tg_ptr = ~mut some((child_tg, parent_tg));
|
let child_tg_ptr = ~mut some((child_tg, parent_tg));
|
||||||
fn~() {
|
fn~() {
|
||||||
|
@ -837,6 +852,11 @@ fn spawn_raw(opts: task_opts, +f: fn~()) {
|
||||||
*child_tg_ptr <-> tg_data_opt;
|
*child_tg_ptr <-> tg_data_opt;
|
||||||
let (child_tg, parent_tg) = option::unwrap(tg_data_opt);
|
let (child_tg, parent_tg) = option::unwrap(tg_data_opt);
|
||||||
// Child task runs this code.
|
// Child task runs this code.
|
||||||
|
|
||||||
|
// Even if the below code fails to kick the child off, we must
|
||||||
|
// send something on the notify channel.
|
||||||
|
let notifier = notify_chan.map(|c| auto_notify(c));
|
||||||
|
|
||||||
// Set up membership in taskgroup. If this returns none, some
|
// Set up membership in taskgroup. If this returns none, some
|
||||||
// task was already failing, so don't bother doing anything.
|
// task was already failing, so don't bother doing anything.
|
||||||
alt enlist_in_taskgroup(child_tg, child) {
|
alt enlist_in_taskgroup(child_tg, child) {
|
||||||
|
@ -862,7 +882,7 @@ fn spawn_raw(opts: task_opts, +f: fn~()) {
|
||||||
};
|
};
|
||||||
if enlist_ok {
|
if enlist_ok {
|
||||||
let group = @taskgroup(child, child_tg, my_pos,
|
let group = @taskgroup(child, child_tg, my_pos,
|
||||||
pg, is_main);
|
pg, is_main, notifier);
|
||||||
unsafe { local_set(child, taskgroup_key(), group); }
|
unsafe { local_set(child, taskgroup_key(), group); }
|
||||||
// Run the child's body.
|
// Run the child's body.
|
||||||
f();
|
f();
|
||||||
|
@ -1129,9 +1149,6 @@ extern mod rustrt {
|
||||||
fn new_task() -> *rust_task;
|
fn new_task() -> *rust_task;
|
||||||
fn rust_new_task_in_sched(id: sched_id) -> *rust_task;
|
fn rust_new_task_in_sched(id: sched_id) -> *rust_task;
|
||||||
|
|
||||||
fn rust_task_config_notify(
|
|
||||||
task: *rust_task, &&chan: comm::chan<notification>);
|
|
||||||
|
|
||||||
fn start_task(task: *rust_task, closure: *rust_closure);
|
fn start_task(task: *rust_task, closure: *rust_closure);
|
||||||
|
|
||||||
fn rust_task_is_unwinding(task: *rust_task) -> bool;
|
fn rust_task_is_unwinding(task: *rust_task) -> bool;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue