1
Fork 0

Converted over benchmarks.

This commit is contained in:
Eric Holk 2011-08-13 16:03:28 -07:00
parent be7325073a
commit aa0a51a7f5
3 changed files with 59 additions and 82 deletions

View file

@ -20,28 +20,31 @@ import std::io;
import std::getopts; import std::getopts;
import std::task; import std::task;
import std::u64; import std::u64;
import std::comm;
fn recv[T](p: &port[T]) -> T { let x: T; p |> x; ret x; } import std::comm::_port;
import std::comm::mk_port;
import std::comm::_chan;
import std::comm::send;
fn fib(n: int) -> int { fn fib(n: int) -> int {
fn pfib(c: chan[int], n: int) { fn pfib(c: _chan[int], n: int) {
if n == 0 { if n == 0 {
c <| 0; send(c, 0);
} else if (n <= 2) { } else if (n <= 2) {
c <| 1; send(c, 1);
} else { } else {
let p = port(); let p = mk_port[int]();
let t1 = spawn pfib(chan(p), n - 1); let t1 = task::_spawn(bind pfib(p.mk_chan(), n - 1));
let t2 = spawn pfib(chan(p), n - 2); let t2 = task::_spawn(bind pfib(p.mk_chan(), n - 2));
c <| recv(p) + recv(p); send(c, p.recv() + p.recv());
} }
} }
let p = port(); let p = mk_port();
let t = spawn pfib(chan(p), n); let t = task::_spawn(bind pfib(p.mk_chan(), n));
ret recv(p); ret p.recv();
} }
type config = {stress: bool}; type config = {stress: bool};
@ -69,11 +72,11 @@ fn stress_task(id: int) {
} }
fn stress(num_tasks: int) { fn stress(num_tasks: int) {
let tasks = ~[]; let tasks = [];
for each i: int in range(0, num_tasks) { for each i: int in range(0, num_tasks) {
tasks += ~[spawn stress_task(i)]; tasks += [task::_spawn(bind stress_task(i))];
} }
for each i: int in range(0, num_tasks) { task::join(tasks.(i)); } for t in tasks { task::join_id(t); }
} }
fn main(argv: vec[str]) { fn main(argv: vec[str]) {

View file

@ -7,7 +7,7 @@ import std::str;
fn f(n: uint) { fn f(n: uint) {
let i = 0u; let i = 0u;
while i < n { while i < n {
task::join(spawn g()); task::join_id(task::_spawn(bind g()));
i += 1u; i += 1u;
} }
} }
@ -24,7 +24,7 @@ fn main(args: vec[str]) {
}; };
let i = 0u; let i = 0u;
while i < n { while i < n {
spawn f(n); task::_spawn(bind f(n));
i += 1u; i += 1u;
} }
} }

View file

@ -22,38 +22,34 @@ import std::time;
import std::u64; import std::u64;
import std::task; import std::task;
import clone = std::task::clone_chan; import std::task::task_id;
import std::comm;
import std::comm::_chan;
import std::comm::_port;
import std::comm::mk_port;
import std::comm::send;
fn map(filename: str, emit: map_reduce::putter) { fn map(filename: str, emit: map_reduce::putter) {
// log_err "mapping " + filename;
let f = io::file_reader(filename); let f = io::file_reader(filename);
while true { while true {
alt read_word(f) { some(w) { emit(w, 1); } none. { break; } } alt read_word(f) { some(w) { emit(w, 1); } none. { break; } }
} }
// log_err "done mapping " + filename;
} }
fn reduce(word: str, get: map_reduce::getter) { fn reduce(word: str, get: map_reduce::getter) {
// log_err "reducing " + word;
let count = 0; let count = 0;
while true { while true {
alt get() { alt get() {
some(_) { some(_) {
// log_err "received word " + word;
count += 1; count += 1;
} }
none. { break } none. { break }
} }
} }
// auto out = io::stdout();
// out.write_line(#fmt("%s: %d", word, count));
// log_err "reduce " + word + " done.";
} }
mod map_reduce { mod map_reduce {
@ -72,80 +68,66 @@ mod map_reduce {
type reducer = fn(str, getter) ; type reducer = fn(str, getter) ;
tag ctrl_proto { tag ctrl_proto {
find_reducer([u8], chan[chan[reduce_proto]]); find_reducer([u8], _chan[_chan[reduce_proto]]);
mapper_done; mapper_done;
} }
tag reduce_proto { emit_val(int); done; ref; release; } tag reduce_proto { emit_val(int); done; ref; release; }
fn start_mappers(ctrl: chan[ctrl_proto], inputs: &[str]) -> [task] { fn start_mappers(ctrl: _chan[ctrl_proto], inputs: &[str]) -> [task_id] {
let tasks = ~[]; let tasks = [];
// log_err "starting mappers";
for i: str in inputs { for i: str in inputs {
// log_err "starting mapper for " + i; tasks += ~[task::_spawn(bind map_task(ctrl, i))];
tasks += ~[spawn map_task(ctrl, i)];
} }
// log_err "done starting mappers";
ret tasks; ret tasks;
} }
fn map_task(ctrl: chan[ctrl_proto], input: str) { fn map_task(ctrl: _chan[ctrl_proto], input: str) {
// log_err "map_task " + input; // log_err "map_task " + input;
let intermediates = map::new_str_hash(); let intermediates = map::new_str_hash();
fn emit(im: &map::hashmap[str, chan[reduce_proto]], fn emit(im: &map::hashmap[str, _chan[reduce_proto]],
ctrl: chan[ctrl_proto], key: str, val: int) { ctrl: _chan[ctrl_proto], key: str, val: int) {
// log_err "emitting " + key;
let c; let c;
alt im.find(key) { alt im.find(key) {
some(_c) { some(_c) {
// log_err "reusing saved channel for " + key;
c = _c c = _c
} }
none. { none. {
// log_err "fetching new channel for " + key; let p = mk_port[_chan[reduce_proto]]();
let p = port[chan[reduce_proto]]();
let keyi = str::bytes(key); let keyi = str::bytes(key);
ctrl <| find_reducer(keyi, chan(p)); send(ctrl, find_reducer(keyi, p.mk_chan()));
p |> c; c = p.recv();
im.insert(key, clone(c)); im.insert(key, c);
c <| ref; send(c, ref);
} }
} }
c <| emit_val(val); send(c, emit_val(val));
} }
map(input, bind emit(intermediates, ctrl, _, _)); map(input, bind emit(intermediates, ctrl, _, _));
for each kv: @{key: str, val: chan[reduce_proto]} in for each kv: @{key: str, val: _chan[reduce_proto]} in
intermediates.items() { intermediates.items() {
// log_err "sending done to reducer for " + kv._0; send(kv.val, release);
kv.val <| release;
} }
ctrl <| mapper_done; send(ctrl, mapper_done);
// log_err "~map_task " + input;
} }
fn reduce_task(key: str, out: chan[chan[reduce_proto]]) { fn reduce_task(key: str, out: _chan[_chan[reduce_proto]]) {
// log_err "reduce_task " + key; let p = mk_port();
let p = port();
out <| chan(p); send(out, p.mk_chan());
let ref_count = 0; let ref_count = 0;
let is_done = false; let is_done = false;
fn get(p: &port[reduce_proto], ref_count: &mutable int, fn get(p: &_port[reduce_proto], ref_count: &mutable int,
is_done: &mutable bool) -> option[int] { is_done: &mutable bool) -> option[int] {
while !is_done || ref_count > 0 { while !is_done || ref_count > 0 {
let m; alt p.recv() {
p |> m;
alt m {
emit_val(v) { emit_val(v) {
// log_err #fmt("received %d", v); // log_err #fmt("received %d", v);
ret some(v); ret some(v);
@ -162,28 +144,24 @@ mod map_reduce {
} }
reduce(key, bind get(p, ref_count, is_done)); reduce(key, bind get(p, ref_count, is_done));
// log_err "~reduce_task " + key;
} }
fn map_reduce(inputs: &[str]) { fn map_reduce(inputs: &[str]) {
let ctrl = port[ctrl_proto](); let ctrl = mk_port[ctrl_proto]();
// This task becomes the master control task. It spawns others // This task becomes the master control task. It task::_spawns
// to do the rest. // to do the rest.
let reducers: map::hashmap[str, chan[reduce_proto]]; let reducers: map::hashmap[str, _chan[reduce_proto]];
reducers = map::new_str_hash(); reducers = map::new_str_hash();
let tasks = start_mappers(chan(ctrl), inputs); let tasks = start_mappers(ctrl.mk_chan(), inputs);
let num_mappers = ivec::len(inputs) as int; let num_mappers = ivec::len(inputs) as int;
while num_mappers > 0 { while num_mappers > 0 {
let m; alt ctrl.recv() {
ctrl |> m;
alt m {
mapper_done. { mapper_done. {
// log_err "received mapper terminated."; // log_err "received mapper terminated.";
num_mappers -= 1; num_mappers -= 1;
@ -199,27 +177,23 @@ mod map_reduce {
} }
none. { none. {
// log_err "creating new reducer for " + k; // log_err "creating new reducer for " + k;
let p = port(); let p = mk_port();
tasks += ~[spawn reduce_task(k, chan(p))]; tasks += [task::_spawn(bind reduce_task(k, p.mk_chan()))];
p |> c; c = p.recv();
reducers.insert(k, c); reducers.insert(k, c);
} }
} }
cc <| clone(c); send(cc, c);
} }
} }
} }
for each kv: @{key: str, val: chan[reduce_proto]} in reducers.items() for each kv: @{key: str, val: _chan[reduce_proto]} in reducers.items()
{ {
// log_err "sending done to reducer for " + kv._0; send(kv.val, done);
kv.val <| done;
} }
for t in tasks { task::join_id(t); }
// log_err #fmt("joining %u tasks", ivec::len(tasks));
for t: task in tasks { task::join(t); }
// log_err "control task done.";
} }
} }