1
Fork 0

Cleaning up task and comm exports, updating all the test cases.

This commit is contained in:
Eric Holk 2011-08-25 11:20:43 -07:00
parent b31815f8a0
commit 2f7c583bc1
60 changed files with 542 additions and 522 deletions

View file

@ -2,11 +2,10 @@ import task;
import vec; import vec;
import comm; import comm;
import comm::_chan; import comm::chan;
import comm::_port; import comm::port;
import comm::mk_port;
import comm::send; import comm::send;
import comm::recv;
import net; import net;
native "rust" mod rustrt { native "rust" mod rustrt {
@ -15,11 +14,11 @@ native "rust" mod rustrt {
fn aio_init(); fn aio_init();
fn aio_run(); fn aio_run();
fn aio_stop(); fn aio_stop();
fn aio_connect(host: *u8, port: int, connected: &_chan<socket>); fn aio_connect(host: *u8, port: int, connected: &chan<socket>);
fn aio_serve(host: *u8, port: int, acceptChan: &_chan<socket>) -> server; fn aio_serve(host: *u8, port: int, acceptChan: &chan<socket>) -> server;
fn aio_writedata(s: socket, buf: *u8, size: uint, status: &_chan<bool>); fn aio_writedata(s: socket, buf: *u8, size: uint, status: &chan<bool>);
fn aio_read(s: socket, reader: &_chan<[u8]>); fn aio_read(s: socket, reader: &chan<[u8]>);
fn aio_close_server(s: server, status: &_chan<bool>); fn aio_close_server(s: server, status: &chan<bool>);
fn aio_close_socket(s: socket); fn aio_close_socket(s: socket);
fn aio_is_null_client(s: socket) -> bool; fn aio_is_null_client(s: socket) -> bool;
} }
@ -32,42 +31,42 @@ tag pending_connection { remote(net::ip_addr, int); incoming(server); }
tag socket_event { connected(client); closed; received([u8]); } tag socket_event { connected(client); closed; received([u8]); }
tag server_event { pending(_chan<_chan<socket_event>>); } tag server_event { pending(chan<chan<socket_event>>); }
tag request { tag request {
quit; quit;
connect(pending_connection, _chan<socket_event>); connect(pending_connection, chan<socket_event>);
serve(net::ip_addr, int, _chan<server_event>, _chan<server>); serve(net::ip_addr, int, chan<server_event>, chan<server>);
write(client, [u8], _chan<bool>); write(client, [u8], chan<bool>);
close_server(server, _chan<bool>); close_server(server, chan<bool>);
close_client(client); close_client(client);
} }
type ctx = _chan<request>; type ctx = chan<request>;
fn ip_to_sbuf(ip: net::ip_addr) -> *u8 { fn ip_to_sbuf(ip: net::ip_addr) -> *u8 {
vec::to_ptr(str::bytes(net::format_addr(ip))) vec::to_ptr(str::bytes(net::format_addr(ip)))
} }
fn connect_task(ip: net::ip_addr, portnum: int, evt: _chan<socket_event>) { fn connect_task(ip: net::ip_addr, portnum: int, evt: chan<socket_event>) {
let connecter: _port<client> = mk_port(); let connecter = port();
rustrt::aio_connect(ip_to_sbuf(ip), portnum, connecter.mk_chan()); rustrt::aio_connect(ip_to_sbuf(ip), portnum, chan(connecter));
let client = connecter.recv(); let client = recv(connecter);
new_client(client, evt); new_client(client, evt);
} }
fn new_client(client: client, evt: _chan<socket_event>) { fn new_client(client: client, evt: chan<socket_event>) {
// Start the read before notifying about the connect. This avoids a race // Start the read before notifying about the connect. This avoids a race
// condition where the receiver can close the socket before we start // condition where the receiver can close the socket before we start
// reading. // reading.
let reader: _port<[u8]> = mk_port(); let reader: port<[u8]> = port();
rustrt::aio_read(client, reader.mk_chan()); rustrt::aio_read(client, chan(reader));
send(evt, connected(client)); send(evt, connected(client));
while true { while true {
log "waiting for bytes"; log "waiting for bytes";
let data: [u8] = reader.recv(); let data: [u8] = recv(reader);
log "got some bytes"; log "got some bytes";
log vec::len::<u8>(data); log vec::len::<u8>(data);
if vec::len::<u8>(data) == 0u { if vec::len::<u8>(data) == 0u {
@ -83,42 +82,42 @@ fn new_client(client: client, evt: _chan<socket_event>) {
log "close message sent"; log "close message sent";
} }
fn accept_task(client: client, events: _chan<server_event>) { fn accept_task(client: client, events: chan<server_event>) {
log "accept task was spawned"; log "accept task was spawned";
let p: _port<_chan<socket_event>> = mk_port(); let p = port();
send(events, pending(p.mk_chan())); send(events, pending(chan(p)));
let evt = p.recv(); let evt = recv(p);
new_client(client, evt); new_client(client, evt);
log "done accepting"; log "done accepting";
} }
fn server_task(ip: net::ip_addr, portnum: int, events: _chan<server_event>, fn server_task(ip: net::ip_addr, portnum: int, events: chan<server_event>,
server: _chan<server>) { server: chan<server>) {
let accepter: _port<client> = mk_port(); let accepter = port();
send(server, send(server,
rustrt::aio_serve(ip_to_sbuf(ip), portnum, accepter.mk_chan())); rustrt::aio_serve(ip_to_sbuf(ip), portnum, chan(accepter)));
let client: client; let client: client;
while true { while true {
log "preparing to accept a client"; log "preparing to accept a client";
client = accepter.recv(); client = recv(accepter);
if rustrt::aio_is_null_client(client) { if rustrt::aio_is_null_client(client) {
log "client was actually null, returning"; log "client was actually null, returning";
ret; ret;
} else { task::_spawn(bind accept_task(client, events)); } } else { task::spawn(bind accept_task(client, events)); }
} }
} }
fn request_task(c: _chan<ctx>) { fn request_task(c: chan<ctx>) {
// Create a port to accept IO requests on // Create a port to accept IO requests on
let p: _port<request> = mk_port(); let p = port();
// Hand of its channel to our spawner // Hand of its channel to our spawner
send(c, p.mk_chan()); send(c, chan(p));
log "uv run task spawned"; log "uv run task spawned";
// Spin for requests // Spin for requests
let req: request; let req: request;
while true { while true {
req = p.recv(); req = recv(p);
alt req { alt req {
quit. { quit. {
log "got quit message"; log "got quit message";
@ -127,10 +126,10 @@ fn request_task(c: _chan<ctx>) {
ret; ret;
} }
connect(remote(ip, portnum), client) { connect(remote(ip, portnum), client) {
task::_spawn(bind connect_task(ip, portnum, client)); task::spawn(bind connect_task(ip, portnum, client));
} }
serve(ip, portnum, events, server) { serve(ip, portnum, events, server) {
task::_spawn(bind server_task(ip, portnum, events, server)); task::spawn(bind server_task(ip, portnum, events, server));
} }
write(socket, v, status) { write(socket, v, status) {
rustrt::aio_writedata(socket, vec::to_ptr::<u8>(v), rustrt::aio_writedata(socket, vec::to_ptr::<u8>(v),
@ -148,27 +147,27 @@ fn request_task(c: _chan<ctx>) {
} }
} }
fn iotask(c: _chan<ctx>) { fn iotask(c: chan<ctx>) {
log "io task spawned"; log "io task spawned";
// Initialize before accepting requests // Initialize before accepting requests
rustrt::aio_init(); rustrt::aio_init();
log "io task init"; log "io task init";
// Spawn our request task // Spawn our request task
let reqtask = task::_spawn(bind request_task(c)); let reqtask = task::spawn_joinable(bind request_task(c));
log "uv run task init"; log "uv run task init";
// Enter IO loop. This never returns until aio_stop is called. // Enter IO loop. This never returns until aio_stop is called.
rustrt::aio_run(); rustrt::aio_run();
log "waiting for request task to finish"; log "waiting for request task to finish";
task::join_id(reqtask); task::join(reqtask);
} }
fn new() -> ctx { fn new() -> ctx {
let p: _port<ctx> = mk_port(); let p: port<ctx> = port();
task::_spawn(bind iotask(p.mk_chan())); task::spawn(bind iotask(chan(p)));
ret p.recv(); ret recv(p);
} }
// Local Variables: // Local Variables:

View file

@ -2,12 +2,7 @@ import sys;
import ptr; import ptr;
import unsafe; import unsafe;
import task; import task;
import task::task_id;
export _chan;
export _port;
export chan_handle;
export mk_port;
export send; export send;
export recv; export recv;
export chan; export chan;
@ -17,7 +12,8 @@ native "rust" mod rustrt {
type void; type void;
type rust_port; type rust_port;
fn chan_id_send<~T>(target_task: task_id, target_port: port_id, data: -T); fn chan_id_send<~T>(target_task: task::task,
target_port: port_id, data: -T);
fn new_port(unit_sz: uint) -> *rust_port; fn new_port(unit_sz: uint) -> *rust_port;
fn del_port(po: *rust_port); fn del_port(po: *rust_port);
@ -31,10 +27,9 @@ native "rust-intrinsic" mod rusti {
type port_id = int; type port_id = int;
type chan_handle<~T> = {task: task_id, port: port_id}; // It's critical that this only have one variant, so it has a record
// layout, and will work in the rust_task structure in task.rs.
tag chan<~T> { chan_t(chan_handle<T>); } tag chan<~T> { chan_t(task::task, port_id); }
type _chan<~T> = chan<T>;
resource port_ptr(po: *rustrt::rust_port) { resource port_ptr(po: *rustrt::rust_port) {
rustrt::drop_port(po); rustrt::drop_port(po);
@ -43,17 +38,9 @@ resource port_ptr(po: *rustrt::rust_port) {
tag port<~T> { port_t(@port_ptr); } tag port<~T> { port_t(@port_ptr); }
obj port_obj<~T>(raw_port: port<T>) {
fn mk_chan() -> chan<T> { chan(raw_port) }
fn recv() -> T { recv(raw_port) }
}
type _port<~T> = port_obj<T>;
fn mk_port<~T>() -> _port<T> { ret port_obj::<T>(port::<T>()); }
fn send<~T>(ch: &chan<T>, data: -T) { fn send<~T>(ch: &chan<T>, data: -T) {
rustrt::chan_id_send(ch.task, ch.port, data); let chan_t(t, p) = ch;
rustrt::chan_id_send(t, p, data);
} }
fn port<~T>() -> port<T> { fn port<~T>() -> port<T> {
@ -63,5 +50,5 @@ fn port<~T>() -> port<T> {
fn recv<~T>(p: &port<T>) -> T { ret rusti::recv(***p) } fn recv<~T>(p: &port<T>) -> T { ret rusti::recv(***p) }
fn chan<~T>(p: &port<T>) -> chan<T> { fn chan<~T>(p: &port<T>) -> chan<T> {
chan_t({task: task::get_task_id(), port: rustrt::get_port_id(***p)}) chan_t(task::get_task_id(), rustrt::get_port_id(***p))
} }

View file

@ -1,21 +1,21 @@
import comm::_port; import comm::port;
import comm::_chan; import comm::chan;
import comm::mk_port;
import comm::send; import comm::send;
import comm::recv;
import str; import str;
import net; import net;
type ctx = aio::ctx; type ctx = aio::ctx;
type client = {ctx: ctx, client: aio::client, evt: _port<aio::socket_event>}; type client = {ctx: ctx, client: aio::client, evt: port<aio::socket_event>};
type server = {ctx: ctx, server: aio::server, evt: _port<aio::server_event>}; type server = {ctx: ctx, server: aio::server, evt: port<aio::server_event>};
fn new() -> ctx { ret aio::new(); } fn new() -> ctx { ret aio::new(); }
fn destroy(ctx: ctx) { send(ctx, aio::quit); } fn destroy(ctx: ctx) { send(ctx, aio::quit); }
fn make_socket(ctx: ctx, p: _port<aio::socket_event>) -> client { fn make_socket(ctx: ctx, p: port<aio::socket_event>) -> client {
let evt: aio::socket_event = p.recv(); let evt: aio::socket_event = recv(p);
alt evt { alt evt {
aio::connected(client) { ret {ctx: ctx, client: client, evt: p}; } aio::connected(client) { ret {ctx: ctx, client: client, evt: p}; }
_ { fail "Could not connect to client"; } _ { fail "Could not connect to client"; }
@ -23,56 +23,56 @@ fn make_socket(ctx: ctx, p: _port<aio::socket_event>) -> client {
} }
fn connect_to(ctx: ctx, ip: net::ip_addr, portnum: int) -> client { fn connect_to(ctx: ctx, ip: net::ip_addr, portnum: int) -> client {
let p: _port<aio::socket_event> = mk_port(); let p: port<aio::socket_event> = port();
send(ctx, aio::connect(aio::remote(ip, portnum), p.mk_chan())); send(ctx, aio::connect(aio::remote(ip, portnum), chan(p)));
ret make_socket(ctx, p); ret make_socket(ctx, p);
} }
fn read(c: client) -> [u8] { fn read(c: client) -> [u8] {
alt c.evt.recv() { alt recv(c.evt) {
aio::closed. { ret []; } aio::closed. { ret []; }
aio::received(buf) { ret buf; } aio::received(buf) { ret buf; }
} }
} }
fn create_server(ctx: ctx, ip: net::ip_addr, portnum: int) -> server { fn create_server(ctx: ctx, ip: net::ip_addr, portnum: int) -> server {
let evt: _port<aio::server_event> = mk_port(); let evt: port<aio::server_event> = port();
let p: _port<aio::server> = mk_port(); let p: port<aio::server> = port();
send(ctx, aio::serve(ip, portnum, evt.mk_chan(), p.mk_chan())); send(ctx, aio::serve(ip, portnum, chan(evt), chan(p)));
let srv: aio::server = p.recv(); let srv: aio::server = recv(p);
ret {ctx: ctx, server: srv, evt: evt}; ret {ctx: ctx, server: srv, evt: evt};
} }
fn accept_from(server: server) -> client { fn accept_from(server: server) -> client {
let evt: aio::server_event = server.evt.recv(); let evt: aio::server_event = recv(server.evt);
alt evt { alt evt {
aio::pending(callback) { aio::pending(callback) {
let p: _port<aio::socket_event> = mk_port(); let p = port();
send(callback, p.mk_chan()); send(callback, chan(p));
ret make_socket(server.ctx, p); ret make_socket(server.ctx, p);
} }
} }
} }
fn write_data(c: client, data: [u8]) -> bool { fn write_data(c: client, data: [u8]) -> bool {
let p: _port<bool> = mk_port(); let p = port();
send(c.ctx, aio::write(c.client, data, p.mk_chan())); send(c.ctx, aio::write(c.client, data, chan(p)));
ret p.recv(); ret recv(p);
} }
fn close_server(server: server) { fn close_server(server: server) {
// TODO: make this unit once we learn to send those from native code // TODO: make this unit once we learn to send those from native code
let p: _port<bool> = mk_port(); let p = port();
send(server.ctx, aio::close_server(server.server, p.mk_chan())); send(server.ctx, aio::close_server(server.server, chan(p)));
log "Waiting for close"; log "Waiting for close";
p.recv(); recv(p);
log "Got close"; log "Got close";
} }
fn close_client(client: client) { fn close_client(client: client) {
send(client.ctx, aio::close_client(client.client)); send(client.ctx, aio::close_client(client.client));
let evt: aio::socket_event; let evt: aio::socket_event;
do { evt = client.evt.recv(); alt evt { aio::closed. { ret; } _ { } } } do { evt = recv(client.evt); alt evt { aio::closed. { ret; } _ { } } }
while true while true
} }

View file

@ -5,6 +5,24 @@ import option::none;
import option = option::t; import option = option::t;
import ptr; import ptr;
export task;
export joinable_task;
export sleep;
export yield;
export task_notification;
export join;
export unsupervise;
export pin;
export unpin;
export set_min_stack;
export spawn;
export spawn_notify;
export spawn_joinable;
export task_result;
export tr_success;
export tr_failure;
export get_task_id;
native "rust" mod rustrt { native "rust" mod rustrt {
fn task_sleep(time_in_us: uint); fn task_sleep(time_in_us: uint);
fn task_yield(); fn task_yield();
@ -29,8 +47,8 @@ native "rust" mod rustrt {
type rust_task = type rust_task =
{id: task, {id: task,
mutable notify_enabled: u8, mutable notify_enabled: u32,
mutable notify_chan: comm::chan_handle<task_notification>, mutable notify_chan: comm::chan<task_notification>,
ctx: task_context, ctx: task_context,
stack_ptr: *u8}; stack_ptr: *u8};
@ -40,6 +58,7 @@ resource rust_task_ptr(task: *rust_task) { rustrt::drop_task(task); }
type task = int; type task = int;
type task_id = task; type task_id = task;
type joinable_task = (task_id, comm::port<task_notification>);
fn get_task_id() -> task_id { rustrt::get_task_id() } fn get_task_id() -> task_id { rustrt::get_task_id() }
@ -79,15 +98,13 @@ fn unpin() { rustrt::unpin_task(); }
fn set_min_stack(stack_size: uint) { rustrt::set_min_stack(stack_size); } fn set_min_stack(stack_size: uint) { rustrt::set_min_stack(stack_size); }
fn _spawn(thunk: -fn()) -> task { spawn(thunk) }
fn spawn(thunk: -fn()) -> task { spawn_inner(thunk, none) } fn spawn(thunk: -fn()) -> task { spawn_inner(thunk, none) }
fn spawn_notify(thunk: -fn(), notify: comm::chan<task_notification>) -> task { fn spawn_notify(thunk: -fn(), notify: comm::chan<task_notification>) -> task {
spawn_inner(thunk, some(notify)) spawn_inner(thunk, some(notify))
} }
fn spawn_joinable(thunk: -fn()) -> (task_id, comm::port<task_notification>) { fn spawn_joinable(thunk: -fn()) -> joinable_task {
let p = comm::port::<task_notification>(); let p = comm::port::<task_notification>();
let id = spawn_notify(thunk, comm::chan::<task_notification>(p)); let id = spawn_notify(thunk, comm::chan::<task_notification>(p));
ret (id, p); ret (id, p);
@ -105,7 +122,7 @@ fn spawn_inner(thunk: -fn(), notify: option<comm::chan<task_notification>>) ->
// set up the task pointer // set up the task pointer
let task_ptr = rust_task_ptr(rustrt::get_task_pointer(id)); let task_ptr = rust_task_ptr(rustrt::get_task_pointer(id));
let regs = ptr::addr_of((**task_ptr).ctx.regs); let regs = ptr::addr_of((**task_ptr).ctx.regs);
(*regs).edx = cast(*task_ptr);; (*regs).edx = cast(*task_ptr);
(*regs).esp = cast((**task_ptr).stack_ptr); (*regs).esp = cast((**task_ptr).stack_ptr);
assert (ptr::null() != (**task_ptr).stack_ptr); assert (ptr::null() != (**task_ptr).stack_ptr);
@ -116,8 +133,8 @@ fn spawn_inner(thunk: -fn(), notify: option<comm::chan<task_notification>>) ->
// set up notifications if they are enabled. // set up notifications if they are enabled.
alt notify { alt notify {
some(c) { some(c) {
(**task_ptr).notify_enabled = 1u8;; (**task_ptr).notify_enabled = 1u32;;
(**task_ptr).notify_chan = *c; (**task_ptr).notify_chan = c;
} }
none { } none { }
}; };

View file

@ -4,7 +4,7 @@
// while providing a base that other test frameworks may build off of. // while providing a base that other test frameworks may build off of.
import generic_os::getenv; import generic_os::getenv;
import task::task_id; import task::task;
export test_name; export test_name;
export test_fn; export test_fn;
@ -88,7 +88,7 @@ fn parse_opts(args: &[str]) : vec::is_not_empty(args) -> opt_res {
tag test_result { tr_ok; tr_failed; tr_ignored; } tag test_result { tr_ok; tr_failed; tr_ignored; }
type joinable = (task_id, comm::port<task::task_notification>); type joinable = (task, comm::port<task::task_notification>);
// To get isolation and concurrency tests have to be run in their own tasks. // To get isolation and concurrency tests have to be run in their own tasks.
// In cases where test functions and closures it is not ok to just dump them // In cases where test functions and closures it is not ok to just dump them

View file

@ -64,7 +64,7 @@ void rust_chan::disassociate() {
port->chans.swap_delete(this); port->chans.swap_delete(this);
// Delete reference to the port. // Delete reference to the port.
port = NULL; port = NULL;
this->deref(); this->deref();
} }

View file

@ -26,7 +26,8 @@ struct frame_glue_fns {
// library. This struct must agree with the std::task::rust_task record. // library. This struct must agree with the std::task::rust_task record.
struct rust_task_user { struct rust_task_user {
rust_task_id id; rust_task_id id;
uint8_t notify_enabled; uint32_t notify_enabled; // this is way more bits than necessary, but it
// simplifies the alignment.
chan_handle notify_chan; chan_handle notify_chan;
context ctx; context ctx;
uintptr_t rust_sp; // Saved sp when not running. uintptr_t rust_sp; // Saved sp when not running.

View file

@ -21,30 +21,30 @@ import std::getopts;
import std::task; import std::task;
import std::u64; import std::u64;
import std::comm; import std::comm;
import std::comm::_port; import std::comm::port;
import std::comm::mk_port; import std::comm::chan;
import std::comm::_chan;
import std::comm::send; import std::comm::send;
import std::comm::recv;
fn fib(n: int) -> int { fn fib(n: int) -> int {
fn pfib(c: _chan<int>, n: int) { fn pfib(c: chan<int>, n: int) {
if n == 0 { if n == 0 {
send(c, 0); send(c, 0);
} else if n <= 2 { } else if n <= 2 {
send(c, 1); send(c, 1);
} else { } else {
let p = mk_port::<int>(); let p = port();
let t1 = task::_spawn(bind pfib(p.mk_chan(), n - 1)); let t1 = task::spawn(bind pfib(chan(p), n - 1));
let t2 = task::_spawn(bind pfib(p.mk_chan(), n - 2)); let t2 = task::spawn(bind pfib(chan(p), n - 2));
send(c, p.recv() + p.recv()); send(c, recv(p) + recv(p));
} }
} }
let p = mk_port(); let p = port();
let t = task::_spawn(bind pfib(p.mk_chan(), n)); let t = task::spawn(bind pfib(chan(p), n));
ret p.recv(); ret recv(p);
} }
type config = {stress: bool}; type config = {stress: bool};
@ -74,9 +74,9 @@ fn stress_task(id: int) {
fn stress(num_tasks: int) { fn stress(num_tasks: int) {
let tasks = []; let tasks = [];
for each i: int in range(0, num_tasks) { for each i: int in range(0, num_tasks) {
tasks += [task::_spawn(bind stress_task(i))]; tasks += [task::spawn_joinable(bind stress_task(i))];
} }
for t in tasks { task::join_id(t); } for t in tasks { task::join(t); }
} }
fn main(argv: [str]) { fn main(argv: [str]) {
@ -97,7 +97,6 @@ fn main(argv: [str]) {
let out = io::stdout(); let out = io::stdout();
for each n: int in range(1, max + 1) { for each n: int in range(1, max + 1) {
for each i: int in range(0, num_trials) { for each i: int in range(0, num_trials) {
let start = time::precise_time_ns(); let start = time::precise_time_ns();

View file

@ -22,11 +22,11 @@ import std::time;
import std::u64; import std::u64;
import std::task; import std::task;
import std::task::task_id; import std::task::joinable_task;
import std::comm; import std::comm;
import std::comm::_chan; import std::comm::chan;
import std::comm::_port; import std::comm::port;
import std::comm::mk_port; import std::comm::recv;
import std::comm::send; import std::comm::send;
fn map(filename: str, emit: map_reduce::putter) { fn map(filename: str, emit: map_reduce::putter) {
@ -61,26 +61,27 @@ mod map_reduce {
type reducer = fn(str, getter); type reducer = fn(str, getter);
tag ctrl_proto { tag ctrl_proto {
find_reducer([u8], _chan<_chan<reduce_proto>>); find_reducer([u8], chan<chan<reduce_proto>>);
mapper_done; mapper_done;
} }
tag reduce_proto { emit_val(int); done; ref; release; } tag reduce_proto { emit_val(int); done; ref; release; }
fn start_mappers(ctrl: _chan<ctrl_proto>, inputs: &[str]) -> [task_id] { fn start_mappers(ctrl: chan<ctrl_proto>, inputs: &[str])
-> [joinable_task] {
let tasks = []; let tasks = [];
for i: str in inputs { for i: str in inputs {
tasks += [task::spawn(bind map_task(ctrl, i))]; tasks += [task::spawn_joinable(bind map_task(ctrl, i))];
} }
ret tasks; ret tasks;
} }
fn map_task(ctrl: _chan<ctrl_proto>, input: str) { fn map_task(ctrl: chan<ctrl_proto>, input: str) {
// log_err "map_task " + input; // log_err "map_task " + input;
let intermediates = map::new_str_hash(); let intermediates = map::new_str_hash();
fn emit(im: &map::hashmap<str, _chan<reduce_proto>>, fn emit(im: &map::hashmap<str, chan<reduce_proto>>,
ctrl: _chan<ctrl_proto>, key: str, val: int) { ctrl: chan<ctrl_proto>, key: str, val: int) {
let c; let c;
alt im.find(key) { alt im.find(key) {
some(_c) { some(_c) {
@ -88,10 +89,10 @@ mod map_reduce {
c = _c c = _c
} }
none. { none. {
let p = mk_port::<_chan<reduce_proto>>(); let p = port();
let keyi = str::bytes(key); let keyi = str::bytes(key);
send(ctrl, find_reducer(keyi, p.mk_chan())); send(ctrl, find_reducer(keyi, chan(p)));
c = p.recv(); c = recv(p);
im.insert(key, c); im.insert(key, c);
send(c, ref); send(c, ref);
} }
@ -101,7 +102,7 @@ mod map_reduce {
map(input, bind emit(intermediates, ctrl, _, _)); map(input, bind emit(intermediates, ctrl, _, _));
for each kv: @{key: str, val: _chan<reduce_proto>} in for each kv: @{key: str, val: chan<reduce_proto>} in
intermediates.items() { intermediates.items() {
send(kv.val, release); send(kv.val, release);
} }
@ -109,18 +110,18 @@ mod map_reduce {
send(ctrl, mapper_done); send(ctrl, mapper_done);
} }
fn reduce_task(key: str, out: _chan<_chan<reduce_proto>>) { fn reduce_task(key: str, out: chan<chan<reduce_proto>>) {
let p = mk_port(); let p = port();
send(out, p.mk_chan()); send(out, chan(p));
let ref_count = 0; let ref_count = 0;
let is_done = false; let is_done = false;
fn get(p: &_port<reduce_proto>, ref_count: &mutable int, fn get(p: &port<reduce_proto>, ref_count: &mutable int,
is_done: &mutable bool) -> option<int> { is_done: &mutable bool) -> option<int> {
while !is_done || ref_count > 0 { while !is_done || ref_count > 0 {
alt p.recv() { alt recv(p) {
emit_val(v) { emit_val(v) {
// log_err #fmt("received %d", v); // log_err #fmt("received %d", v);
ret some(v); ret some(v);
@ -140,21 +141,21 @@ mod map_reduce {
} }
fn map_reduce(inputs: &[str]) { fn map_reduce(inputs: &[str]) {
let ctrl = mk_port::<ctrl_proto>(); let ctrl = port::<ctrl_proto>();
// This task becomes the master control task. It task::_spawns // This task becomes the master control task. It task::_spawns
// to do the rest. // to do the rest.
let reducers: map::hashmap<str, _chan<reduce_proto>>; let reducers: map::hashmap<str, chan<reduce_proto>>;
reducers = map::new_str_hash(); reducers = map::new_str_hash();
let tasks = start_mappers(ctrl.mk_chan(), inputs); let tasks = start_mappers(chan(ctrl), inputs);
let num_mappers = vec::len(inputs) as int; let num_mappers = vec::len(inputs) as int;
while num_mappers > 0 { while num_mappers > 0 {
alt ctrl.recv() { alt recv(ctrl) {
mapper_done. { mapper_done. {
// log_err "received mapper terminated."; // log_err "received mapper terminated.";
num_mappers -= 1; num_mappers -= 1;
@ -170,9 +171,10 @@ mod map_reduce {
} }
none. { none. {
// log_err "creating new reducer for " + k; // log_err "creating new reducer for " + k;
let p = mk_port(); let p = port();
tasks += [task::spawn(bind reduce_task(k, p.mk_chan()))]; tasks +=
c = p.recv(); [task::spawn_joinable(bind reduce_task(k, chan(p)))];
c = recv(p);
reducers.insert(k, c); reducers.insert(k, c);
} }
} }
@ -181,12 +183,12 @@ mod map_reduce {
} }
} }
for each kv: @{key: str, val: _chan<reduce_proto>} in reducers.items() for each kv: @{key: str, val: chan<reduce_proto>} in reducers.items()
{ {
send(kv.val, done); send(kv.val, done);
} }
for t in tasks { task::join_id(t); } for t in tasks { task::join(t); }
} }
} }

View file

@ -5,4 +5,4 @@ import std::task;
fn f(x: int) -> int { ret x; } fn f(x: int) -> int { ret x; }
fn main() { task::_spawn(bind f(10)); } fn main() { task::spawn(bind f(10)); }

View file

@ -5,13 +5,12 @@ import std::fs;
import std::str; import std::str;
import std::vec; import std::vec;
import std::task; import std::task;
import std::task::task_id;
import std::comm; import std::comm;
import std::comm::_port; import std::comm::port;
import std::comm::_chan; import std::comm::chan;
import std::comm::send; import std::comm::send;
import std::comm::mk_port; import std::comm::recv;
import common::cx; import common::cx;
import common::config; import common::config;
@ -125,7 +124,7 @@ type tests_and_conv_fn =
fn make_tests(cx: &cx) -> tests_and_conv_fn { fn make_tests(cx: &cx) -> tests_and_conv_fn {
log #fmt["making tests from %s", cx.config.src_base]; log #fmt["making tests from %s", cx.config.src_base];
let configport = mk_port::<[u8]>(); let configport = port::<[u8]>();
let tests = []; let tests = [];
for file: str in fs::list_dir(cx.config.src_base) { for file: str in fs::list_dir(cx.config.src_base) {
log #fmt["inspecting file %s", file]; log #fmt["inspecting file %s", file];
@ -156,10 +155,10 @@ fn is_test(config: &config, testfile: &str) -> bool {
ret valid; ret valid;
} }
fn make_test(cx: &cx, testfile: &str, configport: &_port<[u8]>) -> fn make_test(cx: &cx, testfile: &str, configport: &port<[u8]>) ->
test::test_desc { test::test_desc {
{name: make_test_name(cx.config, testfile), {name: make_test_name(cx.config, testfile),
fn: make_test_closure(testfile, configport.mk_chan()), fn: make_test_closure(testfile, chan(configport)),
ignore: header::is_test_ignored(cx.config, testfile)} ignore: header::is_test_ignored(cx.config, testfile)}
} }
@ -186,12 +185,12 @@ up. Then we'll spawn that data into another task and return the task.
Really convoluted. Need to think up of a better definition for tests. Really convoluted. Need to think up of a better definition for tests.
*/ */
fn make_test_closure(testfile: &str, configchan: _chan<[u8]>) -> fn make_test_closure(testfile: &str, configchan: chan<[u8]>) ->
test::test_fn { test::test_fn {
bind send_config(testfile, configchan) bind send_config(testfile, configchan)
} }
fn send_config(testfile: str, configchan: _chan<[u8]>) { fn send_config(testfile: str, configchan: chan<[u8]>) {
send(configchan, str::bytes(testfile)); send(configchan, str::bytes(testfile));
} }
@ -205,10 +204,10 @@ break up the config record and pass everything individually to the spawned
function. function.
*/ */
fn closure_to_task(cx: cx, configport: _port<[u8]>, testfn: &fn()) -> fn closure_to_task(cx: cx, configport: port<[u8]>, testfn: &fn()) ->
test::joinable { test::joinable {
testfn(); testfn();
let testfile = configport.recv(); let testfile = recv(configport);
let testthunk = let testthunk =
bind run_test_task(cx.config.compile_lib_path, cx.config.run_lib_path, bind run_test_task(cx.config.compile_lib_path, cx.config.run_lib_path,
cx.config.rustc_path, cx.config.src_base, cx.config.rustc_path, cx.config.src_base,

View file

@ -6,7 +6,6 @@
import std::option; import std::option;
import std::task; import std::task;
import std::task::task_id;
import std::generic_os::setenv; import std::generic_os::setenv;
import std::generic_os::getenv; import std::generic_os::getenv;
import std::vec; import std::vec;
@ -28,7 +27,8 @@ export reqchan;
type reqchan = chan<request>; type reqchan = chan<request>;
type handle = {task: option::t<task_id>, chan: reqchan}; type handle = {task: option::t<(task::task, port<task::task_notification>)>,
chan: reqchan};
tag request { exec([u8], [u8], [[u8]], chan<response>); stop; } tag request { exec([u8], [u8], [[u8]], chan<response>); stop; }
@ -37,12 +37,12 @@ type response = {pid: int, infd: int, outfd: int, errfd: int};
fn mk() -> handle { fn mk() -> handle {
let setupport = port(); let setupport = port();
let task = let task =
task::spawn(bind fn (setupchan: chan<chan<request>>) { task::spawn_joinable(bind fn (setupchan: chan<chan<request>>) {
let reqport = port(); let reqport = port();
let reqchan = chan(reqport); let reqchan = chan(reqport);
send(setupchan, reqchan); send(setupchan, reqchan);
worker(reqport); worker(reqport);
}(chan(setupport))); }(chan(setupport)));
ret {task: option::some(task), chan: recv(setupport)}; ret {task: option::some(task), chan: recv(setupport)};
} }
@ -50,7 +50,7 @@ fn from_chan(ch: &reqchan) -> handle { {task: option::none, chan: ch} }
fn close(handle: &handle) { fn close(handle: &handle) {
send(handle.chan, stop); send(handle.chan, stop);
task::join_id(option::get(handle.task)); task::join(option::get(handle.task));
} }
fn run(handle: &handle, lib_path: &str, prog: &str, args: &[str], fn run(handle: &handle, lib_path: &str, prog: &str, args: &[str],

View file

@ -5,13 +5,14 @@
use std; use std;
import std::task; import std::task;
import std::comm::mk_port; import std::comm::port;
import std::comm::recv;
fn child() { assert (1 == 2); } fn child() { assert (1 == 2); }
fn main() { fn main() {
let p = mk_port::<int>(); let p = port::<int>();
let f = child; let f = child;
task::_spawn(f); task::spawn(f);
let x = p.recv(); let x = recv(p);
} }

View file

@ -1,16 +1,17 @@
// error-pattern:meep // error-pattern:meep
use std; use std;
import std::comm::_chan; import std::comm::chan;
import std::comm::mk_port; import std::comm::port;
import std::comm::send; import std::comm::send;
import std::comm::recv;
fn echo<~T>(c: &_chan<T>, oc: &_chan<_chan<T>>) { fn echo<~T>(c: &chan<T>, oc: &chan<chan<T>>) {
// Tests that the type argument in port gets // Tests that the type argument in port gets
// visited // visited
let p = mk_port::<T>(); let p = port::<T>();
send(oc, p.mk_chan()); send(oc, chan(p));
let x = p.recv(); let x = recv(p);
send(c, x); send(c, x);
} }

View file

@ -1,24 +1,25 @@
// -*- rust -*- // -*- rust -*-
use std; use std;
import std::comm::_chan; import std::comm::chan;
import std::comm::mk_port; import std::comm::port;
import std::comm::send; import std::comm::send;
import std::comm::recv;
import std::task; import std::task;
fn a(c: _chan<int>) { send(c, 10); } fn a(c: chan<int>) { send(c, 10); }
fn main() { fn main() {
let p = mk_port(); let p = port();
task::_spawn(bind a(p.mk_chan())); task::spawn(bind a(chan(p)));
task::_spawn(bind b(p.mk_chan())); task::spawn(bind b(chan(p)));
let n: int = 0; let n: int = 0;
n = p.recv(); n = recv(p);
n = p.recv(); n = recv(p);
// log "Finished."; // log "Finished.";
} }
fn b(c: _chan<int>) { fn b(c: chan<int>) {
// log "task b0"; // log "task b0";
// log "task b1"; // log "task b1";
// log "task b2"; // log "task b2";

View file

@ -3,22 +3,23 @@
use std; use std;
import std::comm; import std::comm;
import std::comm::send; import std::comm::send;
import std::comm::_chan; import std::comm::chan;
import std::comm::recv;
import std::task; import std::task;
fn a(c: _chan<int>) { log "task a0"; log "task a1"; send(c, 10); } fn a(c: chan<int>) { log "task a0"; log "task a1"; send(c, 10); }
fn main() { fn main() {
let p = comm::mk_port(); let p = comm::port();
task::_spawn(bind a(p.mk_chan())); task::spawn(bind a(chan(p)));
task::_spawn(bind b(p.mk_chan())); task::spawn(bind b(chan(p)));
let n: int = 0; let n: int = 0;
n = p.recv(); n = recv(p);
n = p.recv(); n = recv(p);
log "Finished."; log "Finished.";
} }
fn b(c: _chan<int>) { fn b(c: chan<int>) {
log "task b0"; log "task b0";
log "task b1"; log "task b1";
log "task b2"; log "task b2";

View file

@ -3,10 +3,11 @@
use std; use std;
import std::comm; import std::comm;
import std::comm::send; import std::comm::send;
import std::comm::_chan; import std::comm::chan;
import std::comm::recv;
import std::task; import std::task;
fn a(c: _chan<int>) { fn a(c: chan<int>) {
if true { if true {
log "task a"; log "task a";
log "task a"; log "task a";
@ -24,19 +25,19 @@ fn g(x: int, y: str) -> int { log x; log y; let z: int = k(1); ret z; }
fn main() { fn main() {
let n: int = 2 + 3 * 7; let n: int = 2 + 3 * 7;
let s: str = "hello there"; let s: str = "hello there";
let p = comm::mk_port(); let p = comm::port();
task::_spawn(bind a(p.mk_chan())); task::spawn(bind a(chan(p)));
task::_spawn(bind b(p.mk_chan())); task::spawn(bind b(chan(p)));
let x: int = 10; let x: int = 10;
x = g(n, s); x = g(n, s);
log x; log x;
n = p.recv(); n = recv(p);
n = p.recv(); n = recv(p);
// FIXME: use signal-channel for this. // FIXME: use signal-channel for this.
log "children finished, root finishing"; log "children finished, root finishing";
} }
fn b(c: _chan<int>) { fn b(c: chan<int>) {
if true { if true {
log "task b"; log "task b";
log "task b"; log "task b";

View file

@ -61,17 +61,20 @@ fn test_box() {
} }
fn test_port() { fn test_port() {
let p1 = comm::mk_port::<int>(); // FIXME: Re-enable this once we can compare resources.
let p2 = comm::mk_port::<int>(); /*
let p1 = comm::port::<int>();
let p2 = comm::port::<int>();
assert (p1 == p1); assert (p1 == p1);
assert (p1 != p2); assert (p1 != p2);
*/
} }
fn test_chan() { fn test_chan() {
let p: comm::_port<int> = comm::mk_port(); let p: comm::port<int> = comm::port();
let ch1 = p.mk_chan(); let ch1 = comm::chan(p);
let ch2 = p.mk_chan(); let ch2 = comm::chan(p);
assert (ch1 == ch1); assert (ch1 == ch1);
// Chans are equal because they are just task:port addresses. // Chans are equal because they are just task:port addresses.

View file

@ -2,36 +2,37 @@
use std; use std;
import std::task; import std::task;
import std::comm::_chan; import std::comm::chan;
import std::comm::send; import std::comm::send;
import std::comm; import std::comm;
import std::comm::mk_port; import std::comm::port;
import std::comm::recv;
tag request { quit; close(_chan<bool>); } tag request { quit; close(chan<bool>); }
type ctx = _chan<request>; type ctx = chan<request>;
fn request_task(c: _chan<ctx>) { fn request_task(c: chan<ctx>) {
let p = mk_port(); let p = port();
send(c, p.mk_chan()); send(c, chan(p));
let req: request; let req: request;
req = p.recv(); req = recv(p);
// Need to drop req before receiving it again // Need to drop req before receiving it again
req = p.recv(); req = recv(p);
} }
fn new() -> ctx { fn new() -> ctx {
let p = mk_port(); let p = port();
let t = task::_spawn(bind request_task(p.mk_chan())); let t = task::spawn(bind request_task(chan(p)));
let cx: ctx; let cx: ctx;
cx = p.recv(); cx = recv(p);
ret cx; ret cx;
} }
fn main() { fn main() {
let cx = new(); let cx = new();
let p = mk_port::<bool>(); let p = port::<bool>();
send(cx, close(p.mk_chan())); send(cx, close(chan(p)));
send(cx, quit); send(cx, quit);
} }

View file

@ -5,4 +5,4 @@ import std::task;
fn child2(s: str) { } fn child2(s: str) { }
fn main() { let x = task::_spawn(bind child2("hi")); } fn main() { let x = task::spawn(bind child2("hi")); }

View file

@ -2,20 +2,21 @@
use std; use std;
import std::comm; import std::comm;
import std::comm::_chan; import std::comm::chan;
import std::comm::send; import std::comm::send;
import std::comm::recv;
import std::task; import std::task;
fn main() { fn main() {
let p = comm::mk_port(); let p = comm::port();
let t = task::_spawn(bind child(p.mk_chan())); let t = task::spawn(bind child(chan(p)));
let y = p.recv(); let y = recv(p);
log_err "received"; log_err "received";
log_err y; log_err y;
assert (y == 10); assert (y == 10);
} }
fn child(c: _chan<int>) { fn child(c: chan<int>) {
log_err "sending"; log_err "sending";
send(c, 10); send(c, 10);
log_err "value sent" log_err "value sent"

View file

@ -1,16 +1,18 @@
// -*- rust -*- // -*- rust -*-
use std; use std;
import std::comm::mk_port; import std::comm::port;
import std::comm::chan;
import std::comm::send; import std::comm::send;
import std::comm::recv;
fn main() { fn main() {
let po = mk_port(); let po = port();
let ch = po.mk_chan(); let ch = chan(po);
send(ch, 10); send(ch, 10);
let i = po.recv(); let i = recv(po);
assert (i == 10); assert (i == 10);
send(ch, 11); send(ch, 11);
let j = po.recv(); let j = recv(po);
assert (j == 11); assert (j == 11);
} }

View file

@ -13,10 +13,10 @@ import std::str;
import std::vec; import std::vec;
import std::map; import std::map;
import std::task; import std::task;
import std::comm::_chan; import std::comm::chan;
import std::comm::_port; import std::comm::port;
import std::comm::send; import std::comm::send;
import std::comm::mk_port; import std::comm::recv;
import std::comm; import std::comm;
fn map(filename: str, emit: map_reduce::putter) { emit(filename, "1"); } fn map(filename: str, emit: map_reduce::putter) { emit(filename, "1"); }
@ -30,27 +30,27 @@ mod map_reduce {
type mapper = fn(str, putter); type mapper = fn(str, putter);
tag ctrl_proto { find_reducer([u8], _chan<int>); mapper_done; } tag ctrl_proto { find_reducer([u8], chan<int>); mapper_done; }
fn start_mappers(ctrl: _chan<ctrl_proto>, inputs: &[str]) { fn start_mappers(ctrl: chan<ctrl_proto>, inputs: &[str]) {
for i: str in inputs { task::_spawn(bind map_task(ctrl, i)); } for i: str in inputs { task::spawn(bind map_task(ctrl, i)); }
} }
fn map_task(ctrl: _chan<ctrl_proto>, input: str) { fn map_task(ctrl: chan<ctrl_proto>, input: str) {
let intermediates = map::new_str_hash(); let intermediates = map::new_str_hash();
fn emit(im: &map::hashmap<str, int>, ctrl: _chan<ctrl_proto>, fn emit(im: &map::hashmap<str, int>, ctrl: chan<ctrl_proto>,
key: str, val: str) { key: str, val: str) {
let c; let c;
alt im.find(key) { alt im.find(key) {
some(_c) { c = _c } some(_c) { c = _c }
none. { none. {
let p = mk_port(); let p = port();
log_err "sending find_reducer"; log_err "sending find_reducer";
send(ctrl, find_reducer(str::bytes(key), p.mk_chan())); send(ctrl, find_reducer(str::bytes(key), chan(p)));
log_err "receiving"; log_err "receiving";
c = p.recv(); c = recv(p);
log_err c; log_err c;
im.insert(key, c); im.insert(key, c);
} }
@ -62,7 +62,7 @@ mod map_reduce {
} }
fn map_reduce(inputs: &[str]) { fn map_reduce(inputs: &[str]) {
let ctrl = mk_port::<ctrl_proto>(); let ctrl = port();
// This task becomes the master control task. It spawns others // This task becomes the master control task. It spawns others
// to do the rest. // to do the rest.
@ -71,12 +71,12 @@ mod map_reduce {
reducers = map::new_str_hash(); reducers = map::new_str_hash();
start_mappers(ctrl.mk_chan(), inputs); start_mappers(chan(ctrl), inputs);
let num_mappers = vec::len(inputs) as int; let num_mappers = vec::len(inputs) as int;
while num_mappers > 0 { while num_mappers > 0 {
alt ctrl.recv() { alt recv(ctrl) {
mapper_done. { num_mappers -= 1; } mapper_done. { num_mappers -= 1; }
find_reducer(k, cc) { find_reducer(k, cc) {
let c; let c;

View file

@ -11,4 +11,4 @@ native "rust" mod rustrt {
fn yield_wrap() { rustrt::task_yield(); } fn yield_wrap() { rustrt::task_yield(); }
fn main() { let f = yield_wrap; task::_spawn(f); } fn main() { let f = yield_wrap; task::spawn(f); }

View file

@ -8,28 +8,30 @@
use std; use std;
import std::task; import std::task;
import std::task::join_id; import std::task::join;
import std::comm; import std::comm;
import std::comm::_chan; import std::comm::chan;
import std::comm::send; import std::comm::send;
import std::comm::port;
import std::comm::recv;
fn grandchild(c: _chan<int>) { send(c, 42); } fn grandchild(c: chan<int>) { send(c, 42); }
fn child(c: _chan<int>) { fn child(c: chan<int>) {
let _grandchild = task::_spawn(bind grandchild(c)); let _grandchild = task::spawn_joinable(bind grandchild(c));
join_id(_grandchild); join(_grandchild);
} }
fn main() { fn main() {
let p = comm::mk_port(); let p = comm::port();
let _child = task::_spawn(bind child(p.mk_chan())); let _child = task::spawn_joinable(bind child(chan(p)));
let x: int = p.recv(); let x: int = recv(p);
log x; log x;
assert (x == 42); assert (x == 42);
join_id(_child); join(_child);
} }

View file

@ -2,25 +2,25 @@ use std;
import std::vec; import std::vec;
import std::task; import std::task;
import std::comm; import std::comm;
import std::comm::_chan; import std::comm::chan;
import std::comm::_port; import std::comm::port;
import std::comm::mk_port; import std::comm::recv;
import std::comm::send; import std::comm::send;
tag msg { closed; received([u8]); } tag msg { closed; received([u8]); }
fn producer(c: _chan<[u8]>) { fn producer(c: chan<[u8]>) {
send(c, [1u8, 2u8, 3u8, 4u8]); send(c, [1u8, 2u8, 3u8, 4u8]);
let empty: [u8] = []; let empty: [u8] = [];
send(c, empty); send(c, empty);
} }
fn packager(cb: _chan<_chan<[u8]>>, msg: _chan<msg>) { fn packager(cb: chan<chan<[u8]>>, msg: chan<msg>) {
let p: _port<[u8]> = mk_port(); let p: port<[u8]> = port();
send(cb, p.mk_chan()); send(cb, chan(p));
while true { while true {
log "waiting for bytes"; log "waiting for bytes";
let data = p.recv(); let data = recv(p);
log "got bytes"; log "got bytes";
if vec::len(data) == 0u { log "got empty bytes, quitting"; break; } if vec::len(data) == 0u { log "got empty bytes, quitting"; break; }
log "sending non-empty buffer of length"; log "sending non-empty buffer of length";
@ -34,16 +34,16 @@ fn packager(cb: _chan<_chan<[u8]>>, msg: _chan<msg>) {
} }
fn main() { fn main() {
let p: _port<msg> = mk_port(); let p: port<msg> = port();
let recv_reader: _port<_chan<[u8]>> = mk_port(); let recv_reader: port<chan<[u8]>> = port();
let pack = let pack =
task::_spawn(bind packager(recv_reader.mk_chan(), p.mk_chan())); task::spawn(bind packager(chan(recv_reader), chan(p)));
let source_chan: _chan<[u8]> = recv_reader.recv(); let source_chan: chan<[u8]> = recv(recv_reader);
let prod = task::_spawn(bind producer(source_chan)); let prod = task::spawn(bind producer(source_chan));
while true { while true {
let msg = p.recv(); let msg = recv(p);
alt msg { alt msg {
closed. { log "Got close message"; break; } closed. { log "Got close message"; break; }
received(data) { received(data) {

View file

@ -2,20 +2,20 @@ use std;
import std::task; import std::task;
import std::comm; import std::comm;
import std::comm::_chan; import std::comm::chan;
import std::comm::_port; import std::comm::port;
import std::comm::mk_port;
import std::comm::send; import std::comm::send;
import std::comm::recv;
fn producer(c: _chan<[u8]>) { fn producer(c: chan<[u8]>) {
send(c, send(c,
[1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8, 10u8, 11u8, 12u8, [1u8, 2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8, 10u8, 11u8, 12u8,
13u8]); 13u8]);
} }
fn main() { fn main() {
let p: _port<[u8]> = mk_port(); let p: port<[u8]> = port();
let prod = task::_spawn(bind producer(p.mk_chan())); let prod = task::spawn(bind producer(chan(p)));
let data: [u8] = p.recv(); let data: [u8] = recv(p);
} }

View file

@ -6,10 +6,10 @@ import std::task::*;
fn main() { fn main() {
let f = child; let f = child;
let other = _spawn(f); let other = spawn_joinable(f);
log_err "1"; log_err "1";
yield(); yield();
join_id(other); join(other);
log_err "3"; log_err "3";
} }

View file

@ -5,20 +5,20 @@ import std::task;
import std::comm::*; import std::comm::*;
fn main() { fn main() {
let p = mk_port(); let p = port();
let y: int; let y: int;
task::_spawn(bind child(p.mk_chan())); task::spawn(bind child(chan(p)));
y = p.recv(); y = recv(p);
log "received 1"; log "received 1";
log y; log y;
assert (y == 10); assert (y == 10);
task::_spawn(bind child(p.mk_chan())); task::spawn(bind child(chan(p)));
y = p.recv(); y = recv(p);
log "received 2"; log "received 2";
log y; log y;
assert (y == 10); assert (y == 10);
} }
fn child(c: _chan<int>) { send(c, 10); } fn child(c: chan<int>) { send(c, 10); }

View file

@ -4,21 +4,21 @@ use std;
import std::task; import std::task;
import std::comm; import std::comm;
fn sub(parent: comm::_chan<int>, id: int) { fn sub(parent: comm::chan<int>, id: int) {
if id == 0 { if id == 0 {
comm::send(parent, 0); comm::send(parent, 0);
} else { } else {
let p = comm::mk_port(); let p = comm::port();
let child = task::_spawn(bind sub(p.mk_chan(), id - 1)); let child = task::spawn(bind sub(comm::chan(p), id - 1));
let y = p.recv(); let y = comm::recv(p);
comm::send(parent, y + 1); comm::send(parent, y + 1);
} }
} }
fn main() { fn main() {
let p = comm::mk_port(); let p = comm::port();
let child = task::_spawn(bind sub(p.mk_chan(), 200)); let child = task::spawn(bind sub(comm::chan(p), 200));
let y = p.recv(); let y = comm::recv(p);
log "transmission complete"; log "transmission complete";
log y; log y;
assert (y == 200); assert (y == 200);

View file

@ -1,9 +1,9 @@
// Sanity-check the code examples that appear in the object system // Sanity-check the code examples that appear in the object system
// documentation. // documentation.
use std; use std;
import std::comm::_chan; import std::comm::chan;
import std::comm::send; import std::comm::send;
import std::comm::mk_port; import std::comm::port;
fn main() { fn main() {
@ -40,16 +40,16 @@ fn main() {
fn take(y: int) { *x += y; } fn take(y: int) { *x += y; }
} }
obj sender(c: _chan<int>) { obj sender(c: chan<int>) {
fn take(z: int) { send(c, z); } fn take(z: int) { send(c, z); }
} }
fn give_ints(t: taker) { t.take(1); t.take(2); t.take(3); } fn give_ints(t: taker) { t.take(1); t.take(2); t.take(3); }
let p = mk_port::<int>(); let p = port();
let t1: taker = adder(@mutable 0); let t1: taker = adder(@mutable 0);
let t2: taker = sender(p.mk_chan()); let t2: taker = sender(chan(p));
give_ints(t1); give_ints(t1);
give_ints(t2); give_ints(t2);

View file

@ -6,8 +6,10 @@ use std;
import std::option; import std::option;
import std::uint; import std::uint;
import std::comm; import std::comm;
import std::comm::mk_port; import std::comm::port;
import std::comm::chan;
import std::comm::send; import std::comm::send;
import std::comm::recv;
// A 12-byte unit to send over the channel // A 12-byte unit to send over the channel
type record = {val1: u32, val2: u32, val3: u32}; type record = {val1: u32, val2: u32, val3: u32};
@ -18,8 +20,8 @@ type record = {val1: u32, val2: u32, val3: u32};
// power of two so needs to be rounded up. Don't trigger any // power of two so needs to be rounded up. Don't trigger any
// assertions. // assertions.
fn test_init() { fn test_init() {
let myport = mk_port::<record>(); let myport = port();
let mychan = myport.mk_chan(); let mychan = chan(myport);
let val: record = {val1: 0u32, val2: 0u32, val3: 0u32}; let val: record = {val1: 0u32, val2: 0u32, val3: 0u32};
send(mychan, val); send(mychan, val);
} }
@ -28,8 +30,8 @@ fn test_init() {
// Dump lots of items into the channel so it has to grow. // Dump lots of items into the channel so it has to grow.
// Don't trigger any assertions. // Don't trigger any assertions.
fn test_grow() { fn test_grow() {
let myport: comm::_port<record> = comm::mk_port(); let myport = port();
let mychan = myport.mk_chan(); let mychan = chan(myport);
for each i: uint in uint::range(0u, 100u) { for each i: uint in uint::range(0u, 100u) {
let val: record = {val1: 0u32, val2: 0u32, val3: 0u32}; let val: record = {val1: 0u32, val2: 0u32, val3: 0u32};
comm::send(mychan, val); comm::send(mychan, val);
@ -39,31 +41,31 @@ fn test_grow() {
// Don't allow the buffer to shrink below it's original size // Don't allow the buffer to shrink below it's original size
fn test_shrink1() { fn test_shrink1() {
let myport = comm::mk_port::<i8>(); let myport = port();
let mychan = myport.mk_chan(); let mychan = chan(myport);
send(mychan, 0i8); send(mychan, 0i8);
let x = myport.recv(); let x = recv(myport);
} }
fn test_shrink2() { fn test_shrink2() {
let myport = mk_port::<record>(); let myport = port();
let mychan = myport.mk_chan(); let mychan = chan(myport);
for each i: uint in uint::range(0u, 100u) { for each i: uint in uint::range(0u, 100u) {
let val: record = {val1: 0u32, val2: 0u32, val3: 0u32}; let val: record = {val1: 0u32, val2: 0u32, val3: 0u32};
send(mychan, val); send(mychan, val);
} }
for each i: uint in uint::range(0u, 100u) { let x = myport.recv(); } for each i: uint in uint::range(0u, 100u) { let x = recv(myport); }
} }
// Test rotating the buffer when the unit size is not a power of two // Test rotating the buffer when the unit size is not a power of two
fn test_rotate() { fn test_rotate() {
let myport = mk_port::<record>(); let myport = port();
let mychan = myport.mk_chan(); let mychan = chan(myport);
for each i: uint in uint::range(0u, 100u) { for each i: uint in uint::range(0u, 100u) {
let val = {val1: i as u32, val2: i as u32, val3: i as u32}; let val = {val1: i as u32, val2: i as u32, val3: i as u32};
send(mychan, val); send(mychan, val);
let x = myport.recv(); let x = recv(myport);
assert (x.val1 == i as u32); assert (x.val1 == i as u32);
assert (x.val2 == i as u32); assert (x.val2 == i as u32);
assert (x.val3 == i as u32); assert (x.val3 == i as u32);
@ -74,8 +76,8 @@ fn test_rotate() {
// Test rotating and growing the buffer when // Test rotating and growing the buffer when
// the unit size is not a power of two // the unit size is not a power of two
fn test_rotate_grow() { fn test_rotate_grow() {
let myport = mk_port::<record>(); let myport = port::<record>();
let mychan = myport.mk_chan(); let mychan = chan(myport);
for each j: uint in uint::range(0u, 10u) { for each j: uint in uint::range(0u, 10u) {
for each i: uint in uint::range(0u, 10u) { for each i: uint in uint::range(0u, 10u) {
let val: record = let val: record =
@ -83,7 +85,7 @@ fn test_rotate_grow() {
send(mychan, val); send(mychan, val);
} }
for each i: uint in uint::range(0u, 10u) { for each i: uint in uint::range(0u, 10u) {
let x = myport.recv(); let x = recv(myport);
assert (x.val1 == i as u32); assert (x.val1 == i as u32);
assert (x.val2 == i as u32); assert (x.val2 == i as u32);
assert (x.val3 == i as u32); assert (x.val3 == i as u32);

View file

@ -7,9 +7,9 @@ import std::task;
fn x(s: str, n: int) { log s; log n; } fn x(s: str, n: int) { log s; log n; }
fn main() { fn main() {
task::_spawn(bind x("hello from first spawned fn", 65)); task::spawn(bind x("hello from first spawned fn", 65));
task::_spawn(bind x("hello from second spawned fn", 66)); task::spawn(bind x("hello from second spawned fn", 66));
task::_spawn(bind x("hello from third spawned fn", 67)); task::spawn(bind x("hello from third spawned fn", 67));
let i: int = 30; let i: int = 30;
while i > 0 { i = i - 1; log "parent sleeping"; yield(); } while i > 0 { i = i - 1; log "parent sleeping"; yield(); }
} }

View file

@ -1,8 +1,12 @@
use std; use std;
import std::task::join_id; import std::task::join;
import std::task::_spawn; import std::task::spawn_joinable;
fn main() {
let x = spawn_joinable(bind m::child(10));
join(x);
}
fn main() { let x = _spawn(bind m::child(10)); join_id(x); }
mod m { mod m {
fn child(i: int) { log i; } fn child(i: int) { log i; }
} }

View file

@ -10,11 +10,11 @@ import std::str;
import std::comm; import std::comm;
import std::task; import std::task;
type ctx = comm::_chan<int>; type ctx = comm::chan<int>;
fn iotask(cx: ctx, ip: str) { assert (str::eq(ip, "localhost")); } fn iotask(cx: ctx, ip: str) { assert (str::eq(ip, "localhost")); }
fn main() { fn main() {
let p = comm::mk_port::<int>(); let p = comm::port::<int>();
task::_spawn(bind iotask(p.mk_chan(), "localhost")); task::spawn(bind iotask(comm::chan(p), "localhost"));
} }

View file

@ -4,7 +4,10 @@ use std;
import std::task; import std::task;
fn main() { let t = task::_spawn(bind child(10)); task::join_id(t); } fn main() {
let t = task::spawn_joinable(bind child(10));
task::join(t);
}
fn child(i: int) { log_err i; assert (i == 10); } fn child(i: int) { log_err i; assert (i == 10); }

View file

@ -1,9 +1,9 @@
// -*- rust -*- // -*- rust -*-
use std; use std;
import std::task::_spawn; import std::task::spawn;
fn main() { _spawn(bind child(10, 20, 30, 40, 50, 60, 70, 80, 90)); } fn main() { spawn(bind child(10, 20, 30, 40, 50, 60, 70, 80, 90)); }
fn child(i1: int, i2: int, i3: int, i4: int, i5: int, i6: int, i7: int, fn child(i1: int, i2: int, i3: int, i4: int, i5: int, i6: int, i7: int,
i8: int, i9: int) { i8: int, i9: int) {

View file

@ -1,7 +1,7 @@
use std; use std;
import std::task::spawn; import std::task::spawn_joinable;
import std::task::join_id; import std::task::join;
fn main() { test00(); } fn main() { test00(); }
@ -9,7 +9,7 @@ fn start() { log "Started / Finished task."; }
fn test00() { fn test00() {
let f = start; let f = start;
let t = spawn(f); let t = spawn_joinable(f);
join_id(t); join(t);
log "Completing."; log "Completing.";
} }

View file

@ -3,13 +3,13 @@ use std;
import std::comm; import std::comm;
import std::task; import std::task;
fn start(c: comm::_chan<comm::_chan<int>>) { fn start(c: comm::chan<comm::chan<int>>) {
let p: comm::_port<int> = comm::mk_port(); let p: comm::port<int> = comm::port();
comm::send(c, p.mk_chan()); comm::send(c, comm::chan(p));
} }
fn main() { fn main() {
let p = comm::mk_port::<comm::_chan<int>>(); let p = comm::port();
let child = task::_spawn(bind start(p.mk_chan())); let child = task::spawn(bind start(comm::chan(p)));
let c = p.recv(); let c = comm::recv(p);
} }

View file

@ -7,13 +7,13 @@ fn start(task_number: int) { log "Started / Finished task."; }
fn test00() { fn test00() {
let i: int = 0; let i: int = 0;
let t = task::_spawn(bind start(i)); let t = task::spawn_joinable(bind start(i));
// Sleep long enough for the task to finish. // Sleep long enough for the task to finish.
task::sleep(10000u); task::sleep(10000u);
// Try joining tasks that have already finished. // Try joining tasks that have already finished.
task::join_id(t); task::join(t);
log "Joined task."; log "Joined task.";
} }

View file

@ -3,15 +3,15 @@ import std::task;
import std::comm; import std::comm;
import std::comm::send; import std::comm::send;
fn start(c: comm::_chan<int>, start: int, number_of_messages: int) { fn start(c: comm::chan<int>, start: int, number_of_messages: int) {
let i: int = 0; let i: int = 0;
while i < number_of_messages { send(c, start + i); i += 1; } while i < number_of_messages { send(c, start + i); i += 1; }
} }
fn main() { fn main() {
log "Check that we don't deadlock."; log "Check that we don't deadlock.";
let p: comm::_port<int> = comm::mk_port(); let p = comm::port();
let a = task::_spawn(bind start(p.mk_chan(), 0, 10)); let a = task::spawn_joinable(bind start(comm::chan(p), 0, 10));
task::join_id(a); task::join(a);
log "Joined task"; log "Joined task";
} }

View file

@ -9,11 +9,11 @@ fn start(c: comm::chan<int>, n: int) {
} }
fn main() { fn main() {
let p = comm::mk_port(); let p = comm::port();
// Spawn a task that sends us back messages. The parent task // Spawn a task that sends us back messages. The parent task
// is likely to terminate before the child completes, so from // is likely to terminate before the child completes, so from
// the child's point of view the receiver may die. We should // the child's point of view the receiver may die. We should
// drop messages on the floor in this case, and not crash! // drop messages on the floor in this case, and not crash!
let child = task::spawn(bind start(p.mk_chan(), 10)); let child = task::spawn(bind start(comm::chan(p), 10));
let c = p.recv(); let c = comm::recv(p);
} }

View file

@ -3,30 +3,31 @@
use std; use std;
import std::comm; import std::comm;
import std::comm::send; import std::comm::send;
import std::comm::mk_port; import std::comm::port;
import std::comm::recv;
import std::comm::chan;
// Tests of ports and channels on various types // Tests of ports and channels on various types
fn test_rec() { fn test_rec() {
type r = {val0: int, val1: u8, val2: char}; type r = {val0: int, val1: u8, val2: char};
let po = comm::mk_port(); let po = comm::port();
let ch = po.mk_chan(); let ch = chan(po);
let r0: r = {val0: 0, val1: 1u8, val2: '2'}; let r0: r = {val0: 0, val1: 1u8, val2: '2'};
send(ch, r0); send(ch, r0);
let r1: r; let r1: r;
r1 = po.recv(); r1 = recv(po);
assert (r1.val0 == 0); assert (r1.val0 == 0);
assert (r1.val1 == 1u8); assert (r1.val1 == 1u8);
assert (r1.val2 == '2'); assert (r1.val2 == '2');
} }
fn test_vec() { fn test_vec() {
let po = comm::mk_port(); let po = port();
let ch = po.mk_chan(); let ch = chan(po);
let v0: [int] = [0, 1, 2]; let v0: [int] = [0, 1, 2];
send(ch, v0); send(ch, v0);
let v1: [int]; let v1 = recv(po);
v1 = po.recv();
assert (v1[0] == 0); assert (v1[0] == 0);
assert (v1[1] == 1); assert (v1[1] == 1);
assert (v1[2] == 2); assert (v1[2] == 2);
@ -50,33 +51,33 @@ fn test_str() {
fn test_tag() { fn test_tag() {
tag t { tag1; tag2(int); tag3(int, u8, char); } tag t { tag1; tag2(int); tag3(int, u8, char); }
let po = comm::mk_port(); let po = port();
let ch = po.mk_chan(); let ch = chan(po);
send(ch, tag1); send(ch, tag1);
send(ch, tag2(10)); send(ch, tag2(10));
send(ch, tag3(10, 11u8, 'A')); send(ch, tag3(10, 11u8, 'A'));
// FIXME: Do port semantics really guarantee these happen in order? // FIXME: Do port semantics really guarantee these happen in order?
let t1: t; let t1: t;
t1 = po.recv(); t1 = recv(po);
assert (t1 == tag1); assert (t1 == tag1);
t1 = po.recv(); t1 = recv(po);
assert (t1 == tag2(10)); assert (t1 == tag2(10));
t1 = po.recv(); t1 = recv(po);
assert (t1 == tag3(10, 11u8, 'A')); assert (t1 == tag3(10, 11u8, 'A'));
} }
fn test_chan() { fn test_chan() {
let po = comm::mk_port(); let po = port();
let ch = po.mk_chan(); let ch = chan(po);
let po0 = comm::mk_port(); let po0 = port();
let ch0 = po0.mk_chan(); let ch0 = chan(po0);
send(ch, ch0); send(ch, ch0);
let ch1 = po.recv(); let ch1 = recv(po);
// Does the transmitted channel still work? // Does the transmitted channel still work?
send(ch1, 10); send(ch1, 10);
let i: int; let i: int;
i = po0.recv(); i = recv(po0);
assert (i == 10); assert (i == 10);
} }

View file

@ -2,14 +2,14 @@
use std; use std;
import std::task; import std::task;
import std::task::task_id;
import std::comm; import std::comm;
import std::comm::_chan; import std::comm::chan;
import std::comm::send; import std::comm::send;
import std::comm::recv;
fn main() { log "===== WITHOUT THREADS ====="; test00(); } fn main() { log "===== WITHOUT THREADS ====="; test00(); }
fn test00_start(ch: _chan<int>, message: int, count: int) { fn test00_start(ch: chan<int>, message: int, count: int) {
log "Starting test00_start"; log "Starting test00_start";
let i: int = 0; let i: int = 0;
while i < count { while i < count {
@ -26,31 +26,32 @@ fn test00() {
log "Creating tasks"; log "Creating tasks";
let po = comm::mk_port(); let po = comm::port();
let ch = po.mk_chan(); let ch = chan(po);
let i: int = 0; let i: int = 0;
// Create and spawn tasks... // Create and spawn tasks...
let tasks = []; let tasks = [];
while i < number_of_tasks { while i < number_of_tasks {
tasks += [task::_spawn(bind test00_start(ch, i, number_of_messages))]; let thunk = bind test00_start(ch, i, number_of_messages);
tasks += [task::spawn_joinable(thunk)];
i = i + 1; i = i + 1;
} }
// Read from spawned tasks... // Read from spawned tasks...
let sum = 0; let sum = 0;
for t: task_id in tasks { for t in tasks {
i = 0; i = 0;
while i < number_of_messages { while i < number_of_messages {
let value = po.recv(); let value = recv(po);
sum += value; sum += value;
i = i + 1; i = i + 1;
} }
} }
// Join spawned tasks... // Join spawned tasks...
for t: task_id in tasks { task::join_id(t); } for t in tasks { task::join(t); }
log "Completed: Final number is: "; log "Completed: Final number is: ";
log_err sum; log_err sum;

View file

@ -7,38 +7,38 @@ fn main() { test00(); }
fn test00() { fn test00() {
let r: int = 0; let r: int = 0;
let sum: int = 0; let sum: int = 0;
let p = comm::mk_port(); let p = comm::port();
let c = p.mk_chan(); let c = comm::chan(p);
send(c, 1); send(c, 1);
send(c, 2); send(c, 2);
send(c, 3); send(c, 3);
send(c, 4); send(c, 4);
r = p.recv(); r = comm::recv(p);
sum += r; sum += r;
log r; log r;
r = p.recv(); r = comm::recv(p);
sum += r; sum += r;
log r; log r;
r = p.recv(); r = comm::recv(p);
sum += r; sum += r;
log r; log r;
r = p.recv(); r = comm::recv(p);
sum += r; sum += r;
log r; log r;
send(c, 5); send(c, 5);
send(c, 6); send(c, 6);
send(c, 7); send(c, 7);
send(c, 8); send(c, 8);
r = p.recv(); r = comm::recv(p);
sum += r; sum += r;
log r; log r;
r = p.recv(); r = comm::recv(p);
sum += r; sum += r;
log r; log r;
r = p.recv(); r = comm::recv(p);
sum += r; sum += r;
log r; log r;
r = p.recv(); r = comm::recv(p);
sum += r; sum += r;
log r; log r;
assert (sum == 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8); assert (sum == 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8);

View file

@ -6,12 +6,12 @@ fn main() { test00(); }
fn test00() { fn test00() {
let r: int = 0; let r: int = 0;
let sum: int = 0; let sum: int = 0;
let p = comm::mk_port(); let p = comm::port();
let c = p.mk_chan(); let c = comm::chan(p);
let number_of_messages: int = 1000; let number_of_messages: int = 1000;
let i: int = 0; let i: int = 0;
while i < number_of_messages { comm::send(c, i + 0); i += 1; } while i < number_of_messages { comm::send(c, i + 0); i += 1; }
i = 0; i = 0;
while i < number_of_messages { r = p.recv(); sum += r; i += 1; } while i < number_of_messages { sum += comm::recv(p); i += 1; }
assert (sum == number_of_messages * (number_of_messages - 1) / 2); assert (sum == number_of_messages * (number_of_messages - 1) / 2);
} }

View file

@ -1,17 +1,19 @@
use std; use std;
import std::comm; import std::comm;
import std::comm::send; import std::comm::send;
import comm::chan;
import comm::recv;
fn main() { test00(); } fn main() { test00(); }
fn test00() { fn test00() {
let r: int = 0; let r: int = 0;
let sum: int = 0; let sum: int = 0;
let p = comm::mk_port(); let p = comm::port();
let c0 = p.mk_chan(); let c0 = chan(p);
let c1 = p.mk_chan(); let c1 = chan(p);
let c2 = p.mk_chan(); let c2 = chan(p);
let c3 = p.mk_chan(); let c3 = chan(p);
let number_of_messages: int = 1000; let number_of_messages: int = 1000;
let i: int = 0; let i: int = 0;
while i < number_of_messages { while i < number_of_messages {
@ -23,13 +25,13 @@ fn test00() {
} }
i = 0; i = 0;
while i < number_of_messages { while i < number_of_messages {
r = p.recv(); r = recv(p);
sum += r; sum += r;
r = p.recv(); r = recv(p);
sum += r; sum += r;
r = p.recv(); r = recv(p);
sum += r; sum += r;
r = p.recv(); r = recv(p);
sum += r; sum += r;
i += 1; i += 1;
} }

View file

@ -1,10 +1,13 @@
use std; use std;
import std::task; import std::task;
import std::comm; import std::comm;
import comm::chan;
import comm::recv;
import comm::port;
fn main() { test00(); } fn main() { test00(); }
fn test00_start(c: comm::_chan<int>, start: int, number_of_messages: int) { fn test00_start(c: comm::chan<int>, start: int, number_of_messages: int) {
let i: int = 0; let i: int = 0;
while i < number_of_messages { comm::send(c, start + i); i += 1; } while i < number_of_messages { comm::send(c, start + i); i += 1; }
} }
@ -12,39 +15,43 @@ fn test00_start(c: comm::_chan<int>, start: int, number_of_messages: int) {
fn test00() { fn test00() {
let r: int = 0; let r: int = 0;
let sum: int = 0; let sum: int = 0;
let p = comm::mk_port(); let p = port();
let number_of_messages: int = 10; let number_of_messages: int = 10;
let t0 = let t0 =
task::_spawn(bind test00_start(p.mk_chan(), number_of_messages * 0, task::spawn_joinable(bind test00_start(chan(p),
number_of_messages)); number_of_messages * 0,
number_of_messages));
let t1 = let t1 =
task::_spawn(bind test00_start(p.mk_chan(), number_of_messages * 1, task::spawn_joinable(bind test00_start(chan(p),
number_of_messages)); number_of_messages * 1,
number_of_messages));
let t2 = let t2 =
task::_spawn(bind test00_start(p.mk_chan(), number_of_messages * 2, task::spawn_joinable(bind test00_start(chan(p),
number_of_messages)); number_of_messages * 2,
number_of_messages));
let t3 = let t3 =
task::_spawn(bind test00_start(p.mk_chan(), number_of_messages * 3, task::spawn_joinable(bind test00_start(chan(p),
number_of_messages)); number_of_messages * 3,
number_of_messages));
let i: int = 0; let i: int = 0;
while i < number_of_messages { while i < number_of_messages {
r = p.recv(); r = recv(p);
sum += r; sum += r;
r = p.recv(); r = recv(p);
sum += r; sum += r;
r = p.recv(); r = recv(p);
sum += r; sum += r;
r = p.recv(); r = recv(p);
sum += r; sum += r;
i += 1; i += 1;
} }
task::join_id(t0); task::join(t0);
task::join_id(t1); task::join(t1);
task::join_id(t2); task::join(t2);
task::join_id(t3); task::join(t3);
assert (sum == number_of_messages * 4 * (number_of_messages * 4 - 1) / 2); assert (sum == number_of_messages * 4 * (number_of_messages * 4 - 1) / 2);
} }

View file

@ -4,7 +4,7 @@ import std::comm;
fn main() { test00(); } fn main() { test00(); }
fn test00_start(c: comm::_chan<int>, start: int, number_of_messages: int) { fn test00_start(c: comm::chan<int>, start: int, number_of_messages: int) {
let i: int = 0; let i: int = 0;
while i < number_of_messages { comm::send(c, start + i); i += 1; } while i < number_of_messages { comm::send(c, start + i); i += 1; }
} }
@ -12,39 +12,43 @@ fn test00_start(c: comm::_chan<int>, start: int, number_of_messages: int) {
fn test00() { fn test00() {
let r: int = 0; let r: int = 0;
let sum: int = 0; let sum: int = 0;
let p = comm::mk_port(); let p = comm::port();
let number_of_messages: int = 10; let number_of_messages: int = 10;
let t0 = let t0 =
task::_spawn(bind test00_start(p.mk_chan(), number_of_messages * 0, task::spawn_joinable(bind test00_start(comm::chan(p),
number_of_messages)); number_of_messages * 0,
number_of_messages));
let t1 = let t1 =
task::_spawn(bind test00_start(p.mk_chan(), number_of_messages * 1, task::spawn_joinable(bind test00_start(comm::chan(p),
number_of_messages)); number_of_messages * 1,
number_of_messages));
let t2 = let t2 =
task::_spawn(bind test00_start(p.mk_chan(), number_of_messages * 2, task::spawn_joinable(bind test00_start(comm::chan(p),
number_of_messages)); number_of_messages * 2,
number_of_messages));
let t3 = let t3 =
task::_spawn(bind test00_start(p.mk_chan(), number_of_messages * 3, task::spawn_joinable(bind test00_start(comm::chan(p),
number_of_messages)); number_of_messages * 3,
number_of_messages));
let i: int = 0; let i: int = 0;
while i < number_of_messages { while i < number_of_messages {
r = p.recv(); r = comm::recv(p);
sum += r; sum += r;
r = p.recv(); r = comm::recv(p);
sum += r; sum += r;
r = p.recv(); r = comm::recv(p);
sum += r; sum += r;
r = p.recv(); r = comm::recv(p);
sum += r; sum += r;
i += 1; i += 1;
} }
task::join_id(t0); task::join(t0);
task::join_id(t1); task::join(t1);
task::join_id(t2); task::join(t2);
task::join_id(t3); task::join(t3);
assert (sum == number_of_messages * 4 * (number_of_messages * 4 - 1) / 2); assert (sum == number_of_messages * 4 * (number_of_messages * 4 - 1) / 2);
} }

View file

@ -4,7 +4,7 @@ import std::comm;
fn main() { test00(); } fn main() { test00(); }
fn test00_start(c: comm::_chan<int>, number_of_messages: int) { fn test00_start(c: comm::chan<int>, number_of_messages: int) {
let i: int = 0; let i: int = 0;
while i < number_of_messages { comm::send(c, i + 0); i += 1; } while i < number_of_messages { comm::send(c, i + 0); i += 1; }
} }
@ -12,15 +12,16 @@ fn test00_start(c: comm::_chan<int>, number_of_messages: int) {
fn test00() { fn test00() {
let r: int = 0; let r: int = 0;
let sum: int = 0; let sum: int = 0;
let p = comm::mk_port(); let p = comm::port();
let number_of_messages: int = 10; let number_of_messages: int = 10;
let t0 = task::_spawn(bind test00_start(p.mk_chan(), number_of_messages)); let thunk = bind test00_start(comm::chan(p), number_of_messages);
let t0 = task::spawn_joinable(thunk);
let i: int = 0; let i: int = 0;
while i < number_of_messages { r = p.recv(); sum += r; log r; i += 1; } while i < number_of_messages { sum += comm::recv(p); log r; i += 1; }
task::join_id(t0); task::join(t0);
assert (sum == number_of_messages * (number_of_messages - 1) / 2); assert (sum == number_of_messages * (number_of_messages - 1) / 2);
} }

View file

@ -7,9 +7,9 @@ import std::comm;
// any size, but rustc currently can because they do have size. Whether // any size, but rustc currently can because they do have size. Whether
// or not this is desirable I don't know, but here's a regression test. // or not this is desirable I don't know, but here's a regression test.
fn main() { fn main() {
let po: comm::_port<()> = comm::mk_port(); let po = comm::port();
let ch: comm::_chan<()> = po.mk_chan(); let ch = comm::chan(po);
comm::send(ch, ()); comm::send(ch, ());
let n: () = po.recv(); let n: () = comm::recv(po);
assert (n == ()); assert (n == ());
} }

View file

@ -1,11 +1,12 @@
use std; use std;
import std::task; import std::task;
import std::task::task_id; import std::task::task;
import std::comm; import std::comm;
import std::comm::_chan; import std::comm::chan;
import std::comm::_port; import std::comm::port;
import std::comm::send; import std::comm::send;
import std::comm::recv;
fn main() { fn main() {
test00(); test00();
@ -17,7 +18,7 @@ fn main() {
test06(); test06();
} }
fn test00_start(ch: _chan<int>, message: int, count: int) { fn test00_start(ch: chan<int>, message: int, count: int) {
log "Starting test00_start"; log "Starting test00_start";
let i: int = 0; let i: int = 0;
while i < count { while i < count {
@ -33,23 +34,24 @@ fn test00() {
let number_of_messages: int = 4; let number_of_messages: int = 4;
log "Creating tasks"; log "Creating tasks";
let po = comm::mk_port(); let po = port();
let ch = po.mk_chan(); let ch = chan(po);
let i: int = 0; let i: int = 0;
let tasks = []; let tasks = [];
while i < number_of_tasks { while i < number_of_tasks {
i = i + 1; i = i + 1;
tasks += [task::spawn(bind test00_start(ch, i, number_of_messages))]; let thunk = bind test00_start(ch, i, number_of_messages);
tasks += [task::spawn_joinable(thunk)];
} }
let sum: int = 0; let sum: int = 0;
for t: task_id in tasks { for t in tasks {
i = 0; i = 0;
while i < number_of_messages { sum += po.recv(); i = i + 1; } while i < number_of_messages { sum += recv(po); i = i + 1; }
} }
for t: task_id in tasks { task::join_id(t); } for t in tasks { task::join(t); }
log "Completed: Final number is: "; log "Completed: Final number is: ";
assert (sum == assert (sum ==
@ -59,19 +61,19 @@ fn test00() {
} }
fn test01() { fn test01() {
let p = comm::mk_port(); let p = port();
log "Reading from a port that is never written to."; log "Reading from a port that is never written to.";
let value: int = p.recv(); let value: int = recv(p);
log value; log value;
} }
fn test02() { fn test02() {
let p = comm::mk_port(); let p = port();
let c = p.mk_chan(); let c = chan(p);
log "Writing to a local task channel."; log "Writing to a local task channel.";
send(c, 42); send(c, 42);
log "Reading from a local task port."; log "Reading from a local task port.";
let value: int = p.recv(); let value: int = recv(p);
log value; log value;
} }
@ -101,7 +103,7 @@ fn test04() {
log "Finishing up."; log "Finishing up.";
} }
fn test05_start(ch: _chan<int>) { fn test05_start(ch: chan<int>) {
send(ch, 10); send(ch, 10);
send(ch, 20); send(ch, 20);
send(ch, 30); send(ch, 30);
@ -110,13 +112,13 @@ fn test05_start(ch: _chan<int>) {
} }
fn test05() { fn test05() {
let po = comm::mk_port(); let po = comm::port();
let ch = po.mk_chan(); let ch = chan(po);
task::spawn(bind test05_start(ch)); task::spawn(bind test05_start(ch));
let value: int; let value: int;
value = po.recv(); value = recv(po);
value = po.recv(); value = recv(po);
value = po.recv(); value = recv(po);
log value; log value;
} }
@ -136,11 +138,11 @@ fn test06() {
let tasks = []; let tasks = [];
while i < number_of_tasks { while i < number_of_tasks {
i = i + 1; i = i + 1;
tasks += [task::spawn(bind test06_start(i))]; tasks += [task::spawn_joinable(bind test06_start(i))];
} }
for t: task_id in tasks { task::join_id(t); } for t in tasks { task::join(t); }
} }

View file

@ -2,9 +2,15 @@
A test case for issue #577, which also exposes #588 A test case for issue #577, which also exposes #588
*/ */
// FIXME: This won't work until we can compare resources
// xfail-stage0
// xfail-stage1
// xfail-stage2
// xfail-stage3
use std; use std;
import std::task; import std::task;
import std::task::join_id; import std::task::join;
import std::comm; import std::comm;
fn child() { } fn child() { }
@ -15,8 +21,8 @@ fn main() {
let t2; let t2;
let c1 = child, c2 = child; let c1 = child, c2 = child;
t1 = task::_spawn(c1); t1 = task::spawn_joinable(c1);
t2 = task::_spawn(c2); t2 = task::spawn_joinable(c2);
assert (t1 == t1); assert (t1 == t1);
assert (t1 != t2); assert (t1 != t2);
@ -25,22 +31,19 @@ fn main() {
let p1; let p1;
let p2; let p2;
p1 = comm::mk_port::<int>(); p1 = comm::port::<int>();
p2 = comm::mk_port::<int>(); p2 = comm::port::<int>();
assert (p1 == p1); assert (p1 == p1);
assert (p1 != p2); assert (p1 != p2);
// channels // channels
let c1; let c1 = comm::chan(p1);
let c2; let c2 = comm::chan(p2);
c1 = p1.mk_chan();
c2 = p2.mk_chan();
assert (c1 == c1); assert (c1 == c1);
assert (c1 != c2); assert (c1 != c2);
join_id(t1); join(t1);
join_id(t2); join(t2);
} }

View file

@ -1,6 +1,6 @@
use std; use std;
import std::task; import std::task;
fn main() { task::_spawn(bind child("Hello")); } fn main() { task::spawn(bind child("Hello")); }
fn child(s: str) { fn child(s: str) {

View file

@ -5,7 +5,7 @@ import std::task;
fn main() { fn main() {
let i = 10; let i = 10;
while i > 0 { task::_spawn(bind child(i)); i = i - 1; } while i > 0 { task::spawn(bind child(i)); i = i - 1; }
log "main thread exiting"; log "main thread exiting";
} }

View file

@ -1,15 +1,14 @@
use std; use std;
import std::comm::mk_port; import std::comm::*;
import std::comm::send;
/* /*
This is about the simplest program that can successfully send a This is about the simplest program that can successfully send a
message. message.
*/ */
fn main() { fn main() {
let po = mk_port::<int>(); let po = port();
let ch = po.mk_chan(); let ch = chan(po);
send(ch, 42); send(ch, 42);
let r = po.recv(); let r = recv(po);
log_err r; log_err r;
} }

View file

@ -5,13 +5,13 @@ import std::task::*;
fn main() { fn main() {
let f = child; let f = child;
let other = task::spawn(f); let other = task::spawn_joinable(f);
log_err "1"; log_err "1";
yield(); yield();
log_err "2"; log_err "2";
yield(); yield();
log_err "3"; log_err "3";
join_id(other); join(other);
} }
fn child() { log_err "4"; yield(); log_err "5"; yield(); log_err "6"; } fn child() { log_err "4"; yield(); log_err "5"; yield(); log_err "6"; }

View file

@ -5,10 +5,10 @@ import std::task::*;
fn main() { fn main() {
let c = child; let c = child;
let other = task::spawn(c); let other = task::spawn_joinable(c);
log_err "1"; log_err "1";
yield(); yield();
join_id(other); join(other);
} }
fn child() { log_err "2"; } fn child() { log_err "2"; }

View file

@ -3,19 +3,8 @@ import std::comm;
#[test] #[test]
fn create_port_and_chan() { fn create_port_and_chan() {
let p = comm::mk_port::<int>(); let p = comm::port::<int>();
p.mk_chan(); comm::chan(p);
}
#[test]
fn send_recv() {
let p = comm::mk_port::<int>();
let c = p.mk_chan();
comm::send(c, 42);
let v = p.recv();
log_err v;
assert (42 == v);
} }
#[test] #[test]

View file

@ -9,46 +9,30 @@ fn test_sleep() { task::sleep(1000000u); }
fn test_unsupervise() { fn test_unsupervise() {
fn f() { task::unsupervise(); fail; } fn f() { task::unsupervise(); fail; }
let foo = f; let foo = f;
task::_spawn(foo); task::spawn(foo);
}
#[test]
#[ignore]
fn test_join() {
fn winner() { }
let wintask = task::_spawn(bind winner());
assert (task::join_id(wintask) == task::tr_success);
fn failer() { task::unsupervise(); fail; }
let failtask = task::_spawn(bind failer());
assert (task::join_id(failtask) == task::tr_failure);
} }
#[test] #[test]
fn test_lib_spawn() { fn test_lib_spawn() {
fn foo() { log_err "Hello, World!"; } fn foo() { log_err "Hello, World!"; }
let f = foo; let f = foo;
task::_spawn(f); task::spawn(f);
} }
#[test] #[test]
fn test_lib_spawn2() { fn test_lib_spawn2() {
fn foo(x: int) { assert (x == 42); } fn foo(x: int) { assert (x == 42); }
task::_spawn(bind foo(42)); task::spawn(bind foo(42));
} }
#[test] #[test]
fn test_join_chan() { fn test_join_chan() {
fn winner() { } fn winner() { }
let p = comm::mk_port::<task::task_notification>(); let p = comm::port();
let f = winner; let f = winner;
task::spawn_notify(f, p.mk_chan()); task::spawn_notify(f, comm::chan(p));
let s = p.recv(); let s = comm::recv(p);
log_err "received task status message"; log_err "received task status message";
log_err s; log_err s;
alt s { alt s {
@ -61,10 +45,10 @@ fn test_join_chan() {
fn test_join_chan_fail() { fn test_join_chan_fail() {
fn failer() { task::unsupervise(); fail } fn failer() { task::unsupervise(); fail }
let p = comm::mk_port::<task::task_notification>(); let p = comm::port();
let f = failer; let f = failer;
task::spawn_notify(f, p.mk_chan()); task::spawn_notify(f, comm::chan(p));
let s = p.recv(); let s = comm::recv(p);
log_err "received task status message"; log_err "received task status message";
log_err s; log_err s;
alt s { alt s {