diff --git a/src/lib/comm.rs b/src/lib/comm.rs index 8d416b7d60f..798daf1db12 100644 --- a/src/lib/comm.rs +++ b/src/lib/comm.rs @@ -29,8 +29,6 @@ native "rust-intrinsic" mod rusti { resource chan_ptr(ch: *rustrt::rust_chan) { rustrt::drop_chan(ch); - rustrt::drop_chan(ch); // FIXME: We shouldn't have to do this - // twice. } resource port_ptr(po: *rustrt::rust_port) { diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 1b6ad431cab..862373e07dd 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -901,7 +901,7 @@ new_chan(rust_task *task, rust_port *port) { extern "C" CDECL void del_chan(rust_task *task, rust_chan *chan) { LOG(task, comm, "del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); - chan->destroy(); + I(task->sched, false); } extern "C" CDECL diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index 4b9320688a1..045694eee61 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -22,7 +22,7 @@ rust_chan::~rust_chan() { KLOG(kernel, comm, "del rust_chan(task=0x%" PRIxPTR ")", (uintptr_t) this); - this->destroy(); + I(this->kernel, !is_associated()); A(kernel, is_associated() == false, "Channel must be disassociated before being freed."); @@ -32,12 +32,12 @@ rust_chan::~rust_chan() { * Link this channel with the specified port. */ void rust_chan::associate(rust_port *port) { + this->ref(); this->port = port; scoped_lock with(port->lock); KLOG(kernel, task, "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, this, port); - this->ref(); this->task = port->task; this->task->ref(); this->port->chans.push(this); @@ -64,7 +64,7 @@ void rust_chan::disassociate() { port->chans.swap_delete(this); // Delete reference to the port. - port = NULL; + port = NULL; this->deref(); } @@ -99,30 +99,6 @@ rust_chan *rust_chan::clone(rust_task *target) { rust_chan(kernel, port, buffer.unit_sz); } -/** - * Cannot Yield: If the task were to unwind, the dropped ref would still - * appear to be live, causing modify-after-free errors. - */ -void rust_chan::destroy() { - if (is_associated()) { - // We're trying to delete a channel that another task may be - // reading from. We have two options: - // - // 1. We can flush the channel by blocking in upcall_flush_chan() - // and resuming only when the channel is flushed. The problem - // here is that we can get ourselves in a deadlock if the - // parent task tries to join us. - // - // 2. We can leave the channel in a "dormnat" state by not freeing - // it and letting the receiver task delete it for us instead. - if (buffer.is_empty() == false) { - return; - } - scoped_lock with(port->lock); - disassociate(); - } -} - // // Local Variables: // mode: C++ diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h index eff4958f683..9dbd9337a18 100644 --- a/src/rt/rust_chan.h +++ b/src/rt/rust_chan.h @@ -23,9 +23,6 @@ public: void send(void *sptr); rust_chan *clone(rust_task *target); - - // Called whenever the channel's ref count drops to zero. - void destroy(); }; // diff --git a/src/test/run-pass/task-comm-0.rs b/src/test/run-pass/task-comm-0.rs index 205f24c2699..6f258d9f6e7 100644 --- a/src/test/run-pass/task-comm-0.rs +++ b/src/test/run-pass/task-comm-0.rs @@ -20,4 +20,4 @@ fn test05() { value = po.recv(); value = po.recv(); assert (value == 30); -} \ No newline at end of file +} diff --git a/src/test/run-pass/task-comm-1.rs b/src/test/run-pass/task-comm-1.rs index 360f4d7feae..f634af9cee9 100644 --- a/src/test/run-pass/task-comm-1.rs +++ b/src/test/run-pass/task-comm-1.rs @@ -6,4 +6,4 @@ fn main() { test00(); } fn start() { log "Started / Finished task."; } -fn test00() { let t: task = spawn start(); join(t); log "Completing."; } \ No newline at end of file +fn test00() { let t: task = spawn start(); join(t); log "Completing."; } diff --git a/src/test/run-pass/task-comm-10.rs b/src/test/run-pass/task-comm-10.rs index 65a1bc2b91d..cdd18478dbc 100644 --- a/src/test/run-pass/task-comm-10.rs +++ b/src/test/run-pass/task-comm-10.rs @@ -1,29 +1,32 @@ use std; import std::task; +import std::comm; -fn start(c: chan[chan[str]]) { - let p: port[str]; +fn start(pcc: *u8) { + let c = comm::chan_from_unsafe_ptr(pcc); + let p; let a; let b; - p = port(); - c <| chan(p); - p |> a; + p = comm::mk_port[str](); + c.send(p.mk_chan().unsafe_ptr()); + a = p.recv(); log_err a; - p |> b; + b = p.recv(); log_err b; } fn main() { - let p: port[chan[str]]; + let p : comm::_port[*u8]; let child; - p = port(); - child = spawn start(chan(p)); - let c; + p = comm::mk_port(); + child = spawn start(p.mk_chan().unsafe_ptr()); + let pc; let c; - p |> c; - c <| "A"; - c <| "B"; + pc = p.recv(); + c = comm::chan_from_unsafe_ptr(pc); + c.send("A"); + c.send("B"); task::yield(); -} \ No newline at end of file +} diff --git a/src/test/run-pass/task-comm-11.rs b/src/test/run-pass/task-comm-11.rs index 682b6d2a430..1e3dc70e18e 100644 --- a/src/test/run-pass/task-comm-11.rs +++ b/src/test/run-pass/task-comm-11.rs @@ -1,10 +1,16 @@ // xfail-stage3 +use std; +import std::comm; -fn start(c: chan[chan[str]]) { let p: port[str] = port(); c <| chan(p); } +fn start(pcc: *u8) { + let c = comm::chan_from_unsafe_ptr(pcc); + let p : comm::_port[int] = comm::mk_port(); + c.send(p.mk_chan().unsafe_ptr()); +} fn main() { - let p: port[chan[str]] = port(); - let child = spawn start(chan(p)); - let c; - p |> c; -} \ No newline at end of file + let p = comm::mk_port(); + let child = spawn start(p.mk_chan().unsafe_ptr()); + let pc = p.recv(); + let c : comm::_chan[int] = comm::chan_from_unsafe_ptr(pc); +} diff --git a/src/test/run-pass/task-comm-12.rs b/src/test/run-pass/task-comm-12.rs index 516225567a6..bb05a7c24ba 100644 --- a/src/test/run-pass/task-comm-12.rs +++ b/src/test/run-pass/task-comm-12.rs @@ -16,4 +16,4 @@ fn test00() { task::join(t); log "Joined task."; -} \ No newline at end of file +} diff --git a/src/test/run-pass/task-comm-13.rs b/src/test/run-pass/task-comm-13.rs index 9cc5e7504c3..163ccbefb22 100644 --- a/src/test/run-pass/task-comm-13.rs +++ b/src/test/run-pass/task-comm-13.rs @@ -1,15 +1,17 @@ use std; import std::task; +import std::comm; -fn start(c: chan[int], start: int, number_of_messages: int) { +fn start(pc: *u8, start: int, number_of_messages: int) { + let c = comm::chan_from_unsafe_ptr(pc); let i: int = 0; - while i < number_of_messages { c <| start + i; i += 1; } + while i < number_of_messages { c.send(start + i); i += 1; } } fn main() { log "Check that we don't deadlock."; - let p: port[int] = port(); - let a: task = spawn start(chan(p), 0, 10); + let p : comm::_port[int] = comm::mk_port(); + let a: task = spawn start(p.mk_chan().unsafe_ptr(), 0, 10); task::join(a); log "Joined task"; } \ No newline at end of file diff --git a/src/test/run-pass/task-comm-15.rs b/src/test/run-pass/task-comm-15.rs index f211ade79fd..9b666dd32ec 100644 --- a/src/test/run-pass/task-comm-15.rs +++ b/src/test/run-pass/task-comm-15.rs @@ -3,20 +3,25 @@ // xfail-stage3 // This test fails when run with multiple threads -fn start(c: chan[int], n: int) { +use std; +import std::comm; + +fn start(pc: *u8, n: int) { + let c = comm::chan_from_unsafe_ptr(); let i: int = n; - while i > 0 { c <| 0; i = i - 1; } + while i > 0 { c.send(0); i = i - 1; } } fn main() { - let p: port[int] = port(); + let p = comm::mk_port(); // Spawn a task that sends us back messages. The parent task // is likely to terminate before the child completes, so from // the child's point of view the receiver may die. We should // drop messages on the floor in this case, and not crash! - let child = spawn start(chan(p), 10); + let child = spawn start(p.mk_chan().unsafe_ptr(), 10); let c; - p |> c; + let pc = p.recv(); + c = chan::chan_from_unsafe_ptr(); } \ No newline at end of file diff --git a/src/test/run-pass/task-comm-16.rs b/src/test/run-pass/task-comm-16.rs index 6ca323faddf..e901fef3962 100644 --- a/src/test/run-pass/task-comm-16.rs +++ b/src/test/run-pass/task-comm-16.rs @@ -1,39 +1,43 @@ // -*- rust -*- +use std; +import std::comm; +import std::comm::mk_port; + // Tests of ports and channels on various types fn test_rec() { type r = {val0: int, val1: u8, val2: char}; - let po: port[r] = port(); - let ch: chan[r] = chan(po); + let po = comm::mk_port(); + let ch = po.mk_chan(); let r0: r = {val0: 0, val1: 1u8, val2: '2'}; - ch <| r0; + ch.send(r0); let r1: r; - po |> r1; + r1 = po.recv(); assert (r1.val0 == 0); assert (r1.val1 == 1u8); assert (r1.val2 == '2'); } fn test_vec() { - let po: port[int[]] = port(); - let ch: chan[int[]] = chan(po); + let po = comm::mk_port(); + let ch = po.mk_chan(); let v0: int[] = ~[0, 1, 2]; - ch <| v0; + ch.send(v0); let v1: int[]; - po |> v1; + v1 = po.recv(); assert (v1.(0) == 0); assert (v1.(1) == 1); assert (v1.(2) == 2); } fn test_str() { - let po: port[str] = port(); - let ch: chan[str] = chan(po); + let po = comm::mk_port(); + let ch = po.mk_chan(); let s0: str = "test"; - ch <| s0; + ch.send(s0); let s1: str; - po |> s1; + s1 = po.recv(); assert (s1.(0) as u8 == 't' as u8); assert (s1.(1) as u8 == 'e' as u8); assert (s1.(2) as u8 == 's' as u8); @@ -42,33 +46,34 @@ fn test_str() { fn test_tag() { tag t { tag1; tag2(int); tag3(int, u8, char); } - let po: port[t] = port(); - let ch: chan[t] = chan(po); - ch <| tag1; - ch <| tag2(10); - ch <| tag3(10, 11u8, 'A'); + let po = comm::mk_port(); + let ch = po.mk_chan(); + ch.send(tag1); + ch.send(tag2(10)); + ch.send(tag3(10, 11u8, 'A')); + // FIXME: Do port semantics really guarantee these happen in order? let t1: t; - po |> t1; + t1 = po.recv(); assert (t1 == tag1); - po |> t1; + t1 = po.recv(); assert (t1 == tag2(10)); - po |> t1; + t1 = po.recv(); assert (t1 == tag3(10, 11u8, 'A')); } fn test_chan() { - let po: port[chan[int]] = port(); - let ch: chan[chan[int]] = chan(po); - let po0: port[int] = port(); - let ch0: chan[int] = chan(po0); - ch <| ch0; - let ch1: chan[int]; - po |> ch1; + let po = comm::mk_port(); + let ch = po.mk_chan(); + let po0 = comm::mk_port(); + let ch0 = po0.mk_chan(); + ch.send(ch0); + let ch1; + ch1 = po.recv(); // Does the transmitted channel still work? - ch1 <| 10; + ch1.send(10); let i: int; - po0 |> i; + i = po0.recv(); assert (i == 10); } diff --git a/src/test/run-pass/task-comm-3.rs b/src/test/run-pass/task-comm-3.rs index 9494bcc5d5d..a5d0af1aef0 100644 --- a/src/test/run-pass/task-comm-3.rs +++ b/src/test/run-pass/task-comm-3.rs @@ -1,12 +1,14 @@ use std; import std::task; +import std::comm; fn main() { log "===== WITHOUT THREADS ====="; test00(); } -fn test00_start(ch: chan[int], message: int, count: int) { +fn test00_start(pch: *u8, message: int, count: int) { log "Starting test00_start"; + let ch = comm::chan_from_unsafe_ptr(pch); let i: int = 0; - while i < count { log "Sending Message"; ch <| message; i = i + 1; } + while i < count { log "Sending Message"; ch.send(message); i = i + 1; } log "Ending test00_start"; } @@ -16,15 +18,15 @@ fn test00() { log "Creating tasks"; - let po: port[int] = port(); - let ch: chan[int] = chan(po); + let po = comm::mk_port(); + let ch = po.mk_chan(); let i: int = 0; // Create and spawn tasks... let tasks: vec[task] = []; while i < number_of_tasks { - tasks += [spawn test00_start(ch, i, number_of_messages)]; + tasks += [spawn test00_start(ch.unsafe_ptr(), i, number_of_messages)]; i = i + 1; } @@ -34,7 +36,7 @@ fn test00() { i = 0; while i < number_of_messages { let value: int; - po |> value; + value = po.recv(); sum += value; i = i + 1; } @@ -47,4 +49,4 @@ fn test00() { // assert (sum == (((number_of_tasks * (number_of_tasks - 1)) / 2) * // number_of_messages)); assert (sum == 480); -} \ No newline at end of file +} diff --git a/src/test/run-pass/task-comm-4.rs b/src/test/run-pass/task-comm-4.rs index 1b002a6b246..3f1e05fce9e 100644 --- a/src/test/run-pass/task-comm-4.rs +++ b/src/test/run-pass/task-comm-4.rs @@ -1,42 +1,43 @@ - +use std; +import std::comm; fn main() { test00(); } fn test00() { let r: int = 0; let sum: int = 0; - let p: port[int] = port(); - let c: chan[int] = chan(p); - c <| 1; - c <| 2; - c <| 3; - c <| 4; - p |> r; + let p = comm::mk_port(); + let c = p.mk_chan(); + c.send(1); + c.send(2); + c.send(3); + c.send(4); + r = p.recv(); sum += r; log r; - p |> r; + r = p.recv(); sum += r; log r; - p |> r; + r = p.recv(); sum += r; log r; - p |> r; + r = p.recv(); sum += r; log r; - c <| 5; - c <| 6; - c <| 7; - c <| 8; - p |> r; + c.send(5); + c.send(6); + c.send(7); + c.send(8); + r = p.recv(); sum += r; log r; - p |> r; + r = p.recv(); sum += r; log r; - p |> r; + r = p.recv(); sum += r; log r; - p |> r; + r = p.recv(); sum += r; log r; assert (sum == 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8); diff --git a/src/test/run-pass/task-comm-5.rs b/src/test/run-pass/task-comm-5.rs index d1562038d6a..3126d98d99b 100644 --- a/src/test/run-pass/task-comm-5.rs +++ b/src/test/run-pass/task-comm-5.rs @@ -1,16 +1,17 @@ - +use std; +import std::comm; fn main() { test00(); } fn test00() { let r: int = 0; let sum: int = 0; - let p: port[int] = port(); - let c: chan[int] = chan(p); + let p = comm::mk_port(); + let c = p.mk_chan(); let number_of_messages: int = 1000; let i: int = 0; - while i < number_of_messages { c <| i; i += 1; } + while i < number_of_messages { c.send(i); i += 1; } i = 0; - while i < number_of_messages { p |> r; sum += r; i += 1; } + while i < number_of_messages { r = p.recv(); sum += r; i += 1; } assert (sum == number_of_messages * (number_of_messages - 1) / 2); } \ No newline at end of file diff --git a/src/test/run-pass/task-comm-6.rs b/src/test/run-pass/task-comm-6.rs index c273d87479c..74555ded400 100644 --- a/src/test/run-pass/task-comm-6.rs +++ b/src/test/run-pass/task-comm-6.rs @@ -1,33 +1,34 @@ - +use std; +import std::comm; fn main() { test00(); } fn test00() { let r: int = 0; let sum: int = 0; - let p: port[int] = port(); - let c0: chan[int] = chan(p); - let c1: chan[int] = chan(p); - let c2: chan[int] = chan(p); - let c3: chan[int] = chan(p); + let p = comm::mk_port(); + let c0 = p.mk_chan(); + let c1 = p.mk_chan(); + let c2 = p.mk_chan(); + let c3 = p.mk_chan(); let number_of_messages: int = 1000; let i: int = 0; while i < number_of_messages { - c0 <| i; - c1 <| i; - c2 <| i; - c3 <| i; + c0.send(i); + c1.send(i); + c2.send(i); + c3.send(i); i += 1; } i = 0; while i < number_of_messages { - p |> r; + r = p.recv(); sum += r; - p |> r; + r = p.recv(); sum += r; - p |> r; + r = p.recv(); sum += r; - p |> r; + r = p.recv(); sum += r; i += 1; } diff --git a/src/test/run-pass/task-comm-7.rs b/src/test/run-pass/task-comm-7.rs index d6db6724226..6040487a986 100644 --- a/src/test/run-pass/task-comm-7.rs +++ b/src/test/run-pass/task-comm-7.rs @@ -1,41 +1,43 @@ use std; import std::task; +import std::comm; fn main() { test00(); } -fn test00_start(c: chan[int], start: int, number_of_messages: int) { +fn test00_start(pc: *u8, start: int, number_of_messages: int) { + let c = comm::chan_from_unsafe_ptr(pc); let i: int = 0; - while i < number_of_messages { c <| start + i; i += 1; } + while i < number_of_messages { c.send(start + i); i += 1; } } fn test00() { let r: int = 0; let sum: int = 0; - let p: port[int] = port(); + let p = comm::mk_port(); let number_of_messages: int = 10; let t0: task = - spawn test00_start(chan(p), number_of_messages * 0, + spawn test00_start(p.mk_chan().unsafe_ptr(), number_of_messages * 0, number_of_messages); let t1: task = - spawn test00_start(chan(p), number_of_messages * 1, + spawn test00_start(p.mk_chan().unsafe_ptr(), number_of_messages * 1, number_of_messages); let t2: task = - spawn test00_start(chan(p), number_of_messages * 2, + spawn test00_start(p.mk_chan().unsafe_ptr(), number_of_messages * 2, number_of_messages); let t3: task = - spawn test00_start(chan(p), number_of_messages * 3, + spawn test00_start(p.mk_chan().unsafe_ptr(), number_of_messages * 3, number_of_messages); let i: int = 0; while i < number_of_messages { - p |> r; + r = p.recv(); sum += r; - p |> r; + r = p.recv(); sum += r; - p |> r; + r = p.recv(); sum += r; - p |> r; + r = p.recv(); sum += r; i += 1; } diff --git a/src/test/run-pass/task-comm-8.rs b/src/test/run-pass/task-comm-8.rs index d6db6724226..6040487a986 100644 --- a/src/test/run-pass/task-comm-8.rs +++ b/src/test/run-pass/task-comm-8.rs @@ -1,41 +1,43 @@ use std; import std::task; +import std::comm; fn main() { test00(); } -fn test00_start(c: chan[int], start: int, number_of_messages: int) { +fn test00_start(pc: *u8, start: int, number_of_messages: int) { + let c = comm::chan_from_unsafe_ptr(pc); let i: int = 0; - while i < number_of_messages { c <| start + i; i += 1; } + while i < number_of_messages { c.send(start + i); i += 1; } } fn test00() { let r: int = 0; let sum: int = 0; - let p: port[int] = port(); + let p = comm::mk_port(); let number_of_messages: int = 10; let t0: task = - spawn test00_start(chan(p), number_of_messages * 0, + spawn test00_start(p.mk_chan().unsafe_ptr(), number_of_messages * 0, number_of_messages); let t1: task = - spawn test00_start(chan(p), number_of_messages * 1, + spawn test00_start(p.mk_chan().unsafe_ptr(), number_of_messages * 1, number_of_messages); let t2: task = - spawn test00_start(chan(p), number_of_messages * 2, + spawn test00_start(p.mk_chan().unsafe_ptr(), number_of_messages * 2, number_of_messages); let t3: task = - spawn test00_start(chan(p), number_of_messages * 3, + spawn test00_start(p.mk_chan().unsafe_ptr(), number_of_messages * 3, number_of_messages); let i: int = 0; while i < number_of_messages { - p |> r; + r = p.recv(); sum += r; - p |> r; + r = p.recv(); sum += r; - p |> r; + r = p.recv(); sum += r; - p |> r; + r = p.recv(); sum += r; i += 1; } diff --git a/src/test/run-pass/task-comm-9.rs b/src/test/run-pass/task-comm-9.rs index c8a0626e82b..9a04e113ac9 100644 --- a/src/test/run-pass/task-comm-9.rs +++ b/src/test/run-pass/task-comm-9.rs @@ -1,23 +1,26 @@ use std; import std::task; +import std::comm; fn main() { test00(); } -fn test00_start(c: chan[int], number_of_messages: int) { +fn test00_start(pc: *u8, number_of_messages: int) { + let c = comm::chan_from_unsafe_ptr(pc); let i: int = 0; - while i < number_of_messages { c <| i; i += 1; } + while i < number_of_messages { c.send(i); i += 1; } } fn test00() { let r: int = 0; let sum: int = 0; - let p: port[int] = port(); + let p = comm::mk_port(); let number_of_messages: int = 10; - let t0: task = spawn test00_start(chan(p), number_of_messages); + let t0: task = spawn test00_start(p.mk_chan().unsafe_ptr(), + number_of_messages); let i: int = 0; - while i < number_of_messages { p |> r; sum += r; log r; i += 1; } + while i < number_of_messages { r = p.recv(); sum += r; log r; i += 1; } task::join(t0); diff --git a/src/test/stdtest/comm.rs b/src/test/stdtest/comm.rs index 6bc212a42f7..b65939e2854 100644 --- a/src/test/stdtest/comm.rs +++ b/src/test/stdtest/comm.rs @@ -16,4 +16,4 @@ fn send_recv() { let v = p.recv(); log_err v; assert(42 == v); -} \ No newline at end of file +}