1
Fork 0

Converted the rest of the task-comm-* tests over. Also fixed some

channel lifecycle bugs.
This commit is contained in:
Eric Holk 2011-08-06 10:07:39 -07:00
parent 86babab2fe
commit d9b84a546c
20 changed files with 171 additions and 167 deletions

View file

@ -29,8 +29,6 @@ native "rust-intrinsic" mod rusti {
resource chan_ptr(ch: *rustrt::rust_chan) { resource chan_ptr(ch: *rustrt::rust_chan) {
rustrt::drop_chan(ch); rustrt::drop_chan(ch);
rustrt::drop_chan(ch); // FIXME: We shouldn't have to do this
// twice.
} }
resource port_ptr(po: *rustrt::rust_port) { resource port_ptr(po: *rustrt::rust_port) {

View file

@ -901,7 +901,7 @@ new_chan(rust_task *task, rust_port *port) {
extern "C" CDECL extern "C" CDECL
void del_chan(rust_task *task, rust_chan *chan) { void del_chan(rust_task *task, rust_chan *chan) {
LOG(task, comm, "del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); LOG(task, comm, "del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
chan->destroy(); I(task->sched, false);
} }
extern "C" CDECL extern "C" CDECL

View file

@ -22,7 +22,7 @@ rust_chan::~rust_chan() {
KLOG(kernel, comm, "del rust_chan(task=0x%" PRIxPTR ")", KLOG(kernel, comm, "del rust_chan(task=0x%" PRIxPTR ")",
(uintptr_t) this); (uintptr_t) this);
this->destroy(); I(this->kernel, !is_associated());
A(kernel, is_associated() == false, A(kernel, is_associated() == false,
"Channel must be disassociated before being freed."); "Channel must be disassociated before being freed.");
@ -32,12 +32,12 @@ rust_chan::~rust_chan() {
* Link this channel with the specified port. * Link this channel with the specified port.
*/ */
void rust_chan::associate(rust_port *port) { void rust_chan::associate(rust_port *port) {
this->ref();
this->port = port; this->port = port;
scoped_lock with(port->lock); scoped_lock with(port->lock);
KLOG(kernel, task, KLOG(kernel, task,
"associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
this, port); this, port);
this->ref();
this->task = port->task; this->task = port->task;
this->task->ref(); this->task->ref();
this->port->chans.push(this); this->port->chans.push(this);
@ -64,7 +64,7 @@ void rust_chan::disassociate() {
port->chans.swap_delete(this); port->chans.swap_delete(this);
// Delete reference to the port. // Delete reference to the port.
port = NULL; port = NULL;
this->deref(); this->deref();
} }
@ -99,30 +99,6 @@ rust_chan *rust_chan::clone(rust_task *target) {
rust_chan(kernel, port, buffer.unit_sz); rust_chan(kernel, port, buffer.unit_sz);
} }
/**
* Cannot Yield: If the task were to unwind, the dropped ref would still
* appear to be live, causing modify-after-free errors.
*/
void rust_chan::destroy() {
if (is_associated()) {
// We're trying to delete a channel that another task may be
// reading from. We have two options:
//
// 1. We can flush the channel by blocking in upcall_flush_chan()
// and resuming only when the channel is flushed. The problem
// here is that we can get ourselves in a deadlock if the
// parent task tries to join us.
//
// 2. We can leave the channel in a "dormnat" state by not freeing
// it and letting the receiver task delete it for us instead.
if (buffer.is_empty() == false) {
return;
}
scoped_lock with(port->lock);
disassociate();
}
}
// //
// Local Variables: // Local Variables:
// mode: C++ // mode: C++

View file

@ -23,9 +23,6 @@ public:
void send(void *sptr); void send(void *sptr);
rust_chan *clone(rust_task *target); rust_chan *clone(rust_task *target);
// Called whenever the channel's ref count drops to zero.
void destroy();
}; };
// //

View file

@ -20,4 +20,4 @@ fn test05() {
value = po.recv(); value = po.recv();
value = po.recv(); value = po.recv();
assert (value == 30); assert (value == 30);
} }

View file

@ -6,4 +6,4 @@ fn main() { test00(); }
fn start() { log "Started / Finished task."; } fn start() { log "Started / Finished task."; }
fn test00() { let t: task = spawn start(); join(t); log "Completing."; } fn test00() { let t: task = spawn start(); join(t); log "Completing."; }

View file

@ -1,29 +1,32 @@
use std; use std;
import std::task; import std::task;
import std::comm;
fn start(c: chan[chan[str]]) { fn start(pcc: *u8) {
let p: port[str]; let c = comm::chan_from_unsafe_ptr(pcc);
let p;
let a; let a;
let b; let b;
p = port(); p = comm::mk_port[str]();
c <| chan(p); c.send(p.mk_chan().unsafe_ptr());
p |> a; a = p.recv();
log_err a; log_err a;
p |> b; b = p.recv();
log_err b; log_err b;
} }
fn main() { fn main() {
let p: port[chan[str]]; let p : comm::_port[*u8];
let child; let child;
p = port(); p = comm::mk_port();
child = spawn start(chan(p)); child = spawn start(p.mk_chan().unsafe_ptr());
let c; let pc; let c;
p |> c; pc = p.recv();
c <| "A"; c = comm::chan_from_unsafe_ptr(pc);
c <| "B"; c.send("A");
c.send("B");
task::yield(); task::yield();
} }

View file

@ -1,10 +1,16 @@
// xfail-stage3 // xfail-stage3
use std;
import std::comm;
fn start(c: chan[chan[str]]) { let p: port[str] = port(); c <| chan(p); } fn start(pcc: *u8) {
let c = comm::chan_from_unsafe_ptr(pcc);
let p : comm::_port[int] = comm::mk_port();
c.send(p.mk_chan().unsafe_ptr());
}
fn main() { fn main() {
let p: port[chan[str]] = port(); let p = comm::mk_port();
let child = spawn start(chan(p)); let child = spawn start(p.mk_chan().unsafe_ptr());
let c; let pc = p.recv();
p |> c; let c : comm::_chan[int] = comm::chan_from_unsafe_ptr(pc);
} }

View file

@ -16,4 +16,4 @@ fn test00() {
task::join(t); task::join(t);
log "Joined task."; log "Joined task.";
} }

View file

@ -1,15 +1,17 @@
use std; use std;
import std::task; import std::task;
import std::comm;
fn start(c: chan[int], start: int, number_of_messages: int) { fn start(pc: *u8, start: int, number_of_messages: int) {
let c = comm::chan_from_unsafe_ptr(pc);
let i: int = 0; let i: int = 0;
while i < number_of_messages { c <| start + i; i += 1; } while i < number_of_messages { c.send(start + i); i += 1; }
} }
fn main() { fn main() {
log "Check that we don't deadlock."; log "Check that we don't deadlock.";
let p: port[int] = port(); let p : comm::_port[int] = comm::mk_port();
let a: task = spawn start(chan(p), 0, 10); let a: task = spawn start(p.mk_chan().unsafe_ptr(), 0, 10);
task::join(a); task::join(a);
log "Joined task"; log "Joined task";
} }

View file

@ -3,20 +3,25 @@
// xfail-stage3 // xfail-stage3
// This test fails when run with multiple threads // This test fails when run with multiple threads
fn start(c: chan[int], n: int) { use std;
import std::comm;
fn start(pc: *u8, n: int) {
let c = comm::chan_from_unsafe_ptr();
let i: int = n; let i: int = n;
while i > 0 { c <| 0; i = i - 1; } while i > 0 { c.send(0); i = i - 1; }
} }
fn main() { fn main() {
let p: port[int] = port(); let p = comm::mk_port();
// Spawn a task that sends us back messages. The parent task // Spawn a task that sends us back messages. The parent task
// is likely to terminate before the child completes, so from // is likely to terminate before the child completes, so from
// the child's point of view the receiver may die. We should // the child's point of view the receiver may die. We should
// drop messages on the floor in this case, and not crash! // drop messages on the floor in this case, and not crash!
let child = spawn start(chan(p), 10); let child = spawn start(p.mk_chan().unsafe_ptr(), 10);
let c; let c;
p |> c; let pc = p.recv();
c = chan::chan_from_unsafe_ptr();
} }

View file

@ -1,39 +1,43 @@
// -*- rust -*- // -*- rust -*-
use std;
import std::comm;
import std::comm::mk_port;
// Tests of ports and channels on various types // Tests of ports and channels on various types
fn test_rec() { fn test_rec() {
type r = {val0: int, val1: u8, val2: char}; type r = {val0: int, val1: u8, val2: char};
let po: port[r] = port(); let po = comm::mk_port();
let ch: chan[r] = chan(po); let ch = po.mk_chan();
let r0: r = {val0: 0, val1: 1u8, val2: '2'}; let r0: r = {val0: 0, val1: 1u8, val2: '2'};
ch <| r0; ch.send(r0);
let r1: r; let r1: r;
po |> r1; r1 = po.recv();
assert (r1.val0 == 0); assert (r1.val0 == 0);
assert (r1.val1 == 1u8); assert (r1.val1 == 1u8);
assert (r1.val2 == '2'); assert (r1.val2 == '2');
} }
fn test_vec() { fn test_vec() {
let po: port[int[]] = port(); let po = comm::mk_port();
let ch: chan[int[]] = chan(po); let ch = po.mk_chan();
let v0: int[] = ~[0, 1, 2]; let v0: int[] = ~[0, 1, 2];
ch <| v0; ch.send(v0);
let v1: int[]; let v1: int[];
po |> v1; v1 = po.recv();
assert (v1.(0) == 0); assert (v1.(0) == 0);
assert (v1.(1) == 1); assert (v1.(1) == 1);
assert (v1.(2) == 2); assert (v1.(2) == 2);
} }
fn test_str() { fn test_str() {
let po: port[str] = port(); let po = comm::mk_port();
let ch: chan[str] = chan(po); let ch = po.mk_chan();
let s0: str = "test"; let s0: str = "test";
ch <| s0; ch.send(s0);
let s1: str; let s1: str;
po |> s1; s1 = po.recv();
assert (s1.(0) as u8 == 't' as u8); assert (s1.(0) as u8 == 't' as u8);
assert (s1.(1) as u8 == 'e' as u8); assert (s1.(1) as u8 == 'e' as u8);
assert (s1.(2) as u8 == 's' as u8); assert (s1.(2) as u8 == 's' as u8);
@ -42,33 +46,34 @@ fn test_str() {
fn test_tag() { fn test_tag() {
tag t { tag1; tag2(int); tag3(int, u8, char); } tag t { tag1; tag2(int); tag3(int, u8, char); }
let po: port[t] = port(); let po = comm::mk_port();
let ch: chan[t] = chan(po); let ch = po.mk_chan();
ch <| tag1; ch.send(tag1);
ch <| tag2(10); ch.send(tag2(10));
ch <| tag3(10, 11u8, 'A'); ch.send(tag3(10, 11u8, 'A'));
// FIXME: Do port semantics really guarantee these happen in order?
let t1: t; let t1: t;
po |> t1; t1 = po.recv();
assert (t1 == tag1); assert (t1 == tag1);
po |> t1; t1 = po.recv();
assert (t1 == tag2(10)); assert (t1 == tag2(10));
po |> t1; t1 = po.recv();
assert (t1 == tag3(10, 11u8, 'A')); assert (t1 == tag3(10, 11u8, 'A'));
} }
fn test_chan() { fn test_chan() {
let po: port[chan[int]] = port(); let po = comm::mk_port();
let ch: chan[chan[int]] = chan(po); let ch = po.mk_chan();
let po0: port[int] = port(); let po0 = comm::mk_port();
let ch0: chan[int] = chan(po0); let ch0 = po0.mk_chan();
ch <| ch0; ch.send(ch0);
let ch1: chan[int]; let ch1;
po |> ch1; ch1 = po.recv();
// Does the transmitted channel still work? // Does the transmitted channel still work?
ch1 <| 10; ch1.send(10);
let i: int; let i: int;
po0 |> i; i = po0.recv();
assert (i == 10); assert (i == 10);
} }

View file

@ -1,12 +1,14 @@
use std; use std;
import std::task; import std::task;
import std::comm;
fn main() { log "===== WITHOUT THREADS ====="; test00(); } fn main() { log "===== WITHOUT THREADS ====="; test00(); }
fn test00_start(ch: chan[int], message: int, count: int) { fn test00_start(pch: *u8, message: int, count: int) {
log "Starting test00_start"; log "Starting test00_start";
let ch = comm::chan_from_unsafe_ptr(pch);
let i: int = 0; let i: int = 0;
while i < count { log "Sending Message"; ch <| message; i = i + 1; } while i < count { log "Sending Message"; ch.send(message); i = i + 1; }
log "Ending test00_start"; log "Ending test00_start";
} }
@ -16,15 +18,15 @@ fn test00() {
log "Creating tasks"; log "Creating tasks";
let po: port[int] = port(); let po = comm::mk_port();
let ch: chan[int] = chan(po); let ch = po.mk_chan();
let i: int = 0; let i: int = 0;
// Create and spawn tasks... // Create and spawn tasks...
let tasks: vec[task] = []; let tasks: vec[task] = [];
while i < number_of_tasks { while i < number_of_tasks {
tasks += [spawn test00_start(ch, i, number_of_messages)]; tasks += [spawn test00_start(ch.unsafe_ptr(), i, number_of_messages)];
i = i + 1; i = i + 1;
} }
@ -34,7 +36,7 @@ fn test00() {
i = 0; i = 0;
while i < number_of_messages { while i < number_of_messages {
let value: int; let value: int;
po |> value; value = po.recv();
sum += value; sum += value;
i = i + 1; i = i + 1;
} }
@ -47,4 +49,4 @@ fn test00() {
// assert (sum == (((number_of_tasks * (number_of_tasks - 1)) / 2) * // assert (sum == (((number_of_tasks * (number_of_tasks - 1)) / 2) *
// number_of_messages)); // number_of_messages));
assert (sum == 480); assert (sum == 480);
} }

View file

@ -1,42 +1,43 @@
use std;
import std::comm;
fn main() { test00(); } fn main() { test00(); }
fn test00() { fn test00() {
let r: int = 0; let r: int = 0;
let sum: int = 0; let sum: int = 0;
let p: port[int] = port(); let p = comm::mk_port();
let c: chan[int] = chan(p); let c = p.mk_chan();
c <| 1; c.send(1);
c <| 2; c.send(2);
c <| 3; c.send(3);
c <| 4; c.send(4);
p |> r; r = p.recv();
sum += r; sum += r;
log r; log r;
p |> r; r = p.recv();
sum += r; sum += r;
log r; log r;
p |> r; r = p.recv();
sum += r; sum += r;
log r; log r;
p |> r; r = p.recv();
sum += r; sum += r;
log r; log r;
c <| 5; c.send(5);
c <| 6; c.send(6);
c <| 7; c.send(7);
c <| 8; c.send(8);
p |> r; r = p.recv();
sum += r; sum += r;
log r; log r;
p |> r; r = p.recv();
sum += r; sum += r;
log r; log r;
p |> r; r = p.recv();
sum += r; sum += r;
log r; log r;
p |> r; r = p.recv();
sum += r; sum += r;
log r; log r;
assert (sum == 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8); assert (sum == 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8);

View file

@ -1,16 +1,17 @@
use std;
import std::comm;
fn main() { test00(); } fn main() { test00(); }
fn test00() { fn test00() {
let r: int = 0; let r: int = 0;
let sum: int = 0; let sum: int = 0;
let p: port[int] = port(); let p = comm::mk_port();
let c: chan[int] = chan(p); let c = p.mk_chan();
let number_of_messages: int = 1000; let number_of_messages: int = 1000;
let i: int = 0; let i: int = 0;
while i < number_of_messages { c <| i; i += 1; } while i < number_of_messages { c.send(i); i += 1; }
i = 0; i = 0;
while i < number_of_messages { p |> r; sum += r; i += 1; } while i < number_of_messages { r = p.recv(); sum += r; i += 1; }
assert (sum == number_of_messages * (number_of_messages - 1) / 2); assert (sum == number_of_messages * (number_of_messages - 1) / 2);
} }

View file

@ -1,33 +1,34 @@
use std;
import std::comm;
fn main() { test00(); } fn main() { test00(); }
fn test00() { fn test00() {
let r: int = 0; let r: int = 0;
let sum: int = 0; let sum: int = 0;
let p: port[int] = port(); let p = comm::mk_port();
let c0: chan[int] = chan(p); let c0 = p.mk_chan();
let c1: chan[int] = chan(p); let c1 = p.mk_chan();
let c2: chan[int] = chan(p); let c2 = p.mk_chan();
let c3: chan[int] = chan(p); let c3 = p.mk_chan();
let number_of_messages: int = 1000; let number_of_messages: int = 1000;
let i: int = 0; let i: int = 0;
while i < number_of_messages { while i < number_of_messages {
c0 <| i; c0.send(i);
c1 <| i; c1.send(i);
c2 <| i; c2.send(i);
c3 <| i; c3.send(i);
i += 1; i += 1;
} }
i = 0; i = 0;
while i < number_of_messages { while i < number_of_messages {
p |> r; r = p.recv();
sum += r; sum += r;
p |> r; r = p.recv();
sum += r; sum += r;
p |> r; r = p.recv();
sum += r; sum += r;
p |> r; r = p.recv();
sum += r; sum += r;
i += 1; i += 1;
} }

View file

@ -1,41 +1,43 @@
use std; use std;
import std::task; import std::task;
import std::comm;
fn main() { test00(); } fn main() { test00(); }
fn test00_start(c: chan[int], start: int, number_of_messages: int) { fn test00_start(pc: *u8, start: int, number_of_messages: int) {
let c = comm::chan_from_unsafe_ptr(pc);
let i: int = 0; let i: int = 0;
while i < number_of_messages { c <| start + i; i += 1; } while i < number_of_messages { c.send(start + i); i += 1; }
} }
fn test00() { fn test00() {
let r: int = 0; let r: int = 0;
let sum: int = 0; let sum: int = 0;
let p: port[int] = port(); let p = comm::mk_port();
let number_of_messages: int = 10; let number_of_messages: int = 10;
let t0: task = let t0: task =
spawn test00_start(chan(p), number_of_messages * 0, spawn test00_start(p.mk_chan().unsafe_ptr(), number_of_messages * 0,
number_of_messages); number_of_messages);
let t1: task = let t1: task =
spawn test00_start(chan(p), number_of_messages * 1, spawn test00_start(p.mk_chan().unsafe_ptr(), number_of_messages * 1,
number_of_messages); number_of_messages);
let t2: task = let t2: task =
spawn test00_start(chan(p), number_of_messages * 2, spawn test00_start(p.mk_chan().unsafe_ptr(), number_of_messages * 2,
number_of_messages); number_of_messages);
let t3: task = let t3: task =
spawn test00_start(chan(p), number_of_messages * 3, spawn test00_start(p.mk_chan().unsafe_ptr(), number_of_messages * 3,
number_of_messages); number_of_messages);
let i: int = 0; let i: int = 0;
while i < number_of_messages { while i < number_of_messages {
p |> r; r = p.recv();
sum += r; sum += r;
p |> r; r = p.recv();
sum += r; sum += r;
p |> r; r = p.recv();
sum += r; sum += r;
p |> r; r = p.recv();
sum += r; sum += r;
i += 1; i += 1;
} }

View file

@ -1,41 +1,43 @@
use std; use std;
import std::task; import std::task;
import std::comm;
fn main() { test00(); } fn main() { test00(); }
fn test00_start(c: chan[int], start: int, number_of_messages: int) { fn test00_start(pc: *u8, start: int, number_of_messages: int) {
let c = comm::chan_from_unsafe_ptr(pc);
let i: int = 0; let i: int = 0;
while i < number_of_messages { c <| start + i; i += 1; } while i < number_of_messages { c.send(start + i); i += 1; }
} }
fn test00() { fn test00() {
let r: int = 0; let r: int = 0;
let sum: int = 0; let sum: int = 0;
let p: port[int] = port(); let p = comm::mk_port();
let number_of_messages: int = 10; let number_of_messages: int = 10;
let t0: task = let t0: task =
spawn test00_start(chan(p), number_of_messages * 0, spawn test00_start(p.mk_chan().unsafe_ptr(), number_of_messages * 0,
number_of_messages); number_of_messages);
let t1: task = let t1: task =
spawn test00_start(chan(p), number_of_messages * 1, spawn test00_start(p.mk_chan().unsafe_ptr(), number_of_messages * 1,
number_of_messages); number_of_messages);
let t2: task = let t2: task =
spawn test00_start(chan(p), number_of_messages * 2, spawn test00_start(p.mk_chan().unsafe_ptr(), number_of_messages * 2,
number_of_messages); number_of_messages);
let t3: task = let t3: task =
spawn test00_start(chan(p), number_of_messages * 3, spawn test00_start(p.mk_chan().unsafe_ptr(), number_of_messages * 3,
number_of_messages); number_of_messages);
let i: int = 0; let i: int = 0;
while i < number_of_messages { while i < number_of_messages {
p |> r; r = p.recv();
sum += r; sum += r;
p |> r; r = p.recv();
sum += r; sum += r;
p |> r; r = p.recv();
sum += r; sum += r;
p |> r; r = p.recv();
sum += r; sum += r;
i += 1; i += 1;
} }

View file

@ -1,23 +1,26 @@
use std; use std;
import std::task; import std::task;
import std::comm;
fn main() { test00(); } fn main() { test00(); }
fn test00_start(c: chan[int], number_of_messages: int) { fn test00_start(pc: *u8, number_of_messages: int) {
let c = comm::chan_from_unsafe_ptr(pc);
let i: int = 0; let i: int = 0;
while i < number_of_messages { c <| i; i += 1; } while i < number_of_messages { c.send(i); i += 1; }
} }
fn test00() { fn test00() {
let r: int = 0; let r: int = 0;
let sum: int = 0; let sum: int = 0;
let p: port[int] = port(); let p = comm::mk_port();
let number_of_messages: int = 10; let number_of_messages: int = 10;
let t0: task = spawn test00_start(chan(p), number_of_messages); let t0: task = spawn test00_start(p.mk_chan().unsafe_ptr(),
number_of_messages);
let i: int = 0; let i: int = 0;
while i < number_of_messages { p |> r; sum += r; log r; i += 1; } while i < number_of_messages { r = p.recv(); sum += r; log r; i += 1; }
task::join(t0); task::join(t0);

View file

@ -16,4 +16,4 @@ fn send_recv() {
let v = p.recv(); let v = p.recv();
log_err v; log_err v;
assert(42 == v); assert(42 == v);
} }