1
Fork 0

Started working on a library-based comm system. Creating and deleting ports work.

This commit is contained in:
Eric Holk 2011-07-29 17:53:02 -07:00
parent 7a05f1db7c
commit 5a673cc2c9
8 changed files with 176 additions and 19 deletions

48
src/lib/comm.rs Normal file
View file

@ -0,0 +1,48 @@
import sys;
export _chan;
export _port;
export mk_port;
export mk_chan;
native "rust" mod rustrt {
type rust_chan;
type rust_port;
fn new_chan(po : *rust_port) -> *rust_chan;
fn del_chan(ch : *rust_chan);
fn drop_chan(ch : *rust_chan);
fn new_port(unit_sz : uint) -> *rust_port;
fn del_port(po : *rust_port);
fn drop_port(po : *rust_port);
}
resource chan_ptr(ch: *rustrt::rust_chan) {
rustrt::drop_chan(ch);
rustrt::drop_chan(ch); // FIXME: We shouldn't have to do this
// twice.
rustrt::del_chan(ch);
}
tag _chan[T] { _chan(@chan_dtor); }
resource port_ptr(po: *rustrt::rust_port) {
rustrt::drop_port(po);
rustrt::del_port(po);
}
tag _port[T] { _port(@port_dtor); }
fn mk_port[T]() -> _port[T] {
_port(@port_dtor(rustrt::new_port(sys::size_of[T]())))
}
fn mk_chan[T](po : &_port[T]) -> _chan[T] {
alt po {
_port(_po) {
_chan(@chan_dtor(rustrt::new_chan(**_po)))
}
}
}

View file

@ -23,6 +23,7 @@ mod io;
mod ioivec;
mod sys;
mod task;
mod comm;
// Utility modules.

View file

@ -868,6 +868,53 @@ sched_threads(rust_task *task) {
return task->kernel->num_threads;
}
extern "C" CDECL rust_port*
new_port(rust_task *task, size_t unit_sz) {
LOG(task, comm, "new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)",
(uintptr_t) task, task->name, unit_sz);
// take a reference on behalf of the port
task->ref();
return new (task->kernel, "rust_port") rust_port(task, unit_sz);
}
extern "C" CDECL void
del_port(rust_task *task, rust_port *port) {
LOG(task, comm, "del_port(0x%" PRIxPTR ")", (uintptr_t) port);
I(task->sched, !port->ref_count);
delete port;
// FIXME: this should happen in the port.
task->deref();
}
extern "C" CDECL rust_chan*
new_chan(rust_task *task, rust_port *port) {
rust_scheduler *sched = task->sched;
LOG(task, comm, "new_chan("
"task=0x%" PRIxPTR " (%s), port=0x%" PRIxPTR ")",
(uintptr_t) task, task->name, port);
I(sched, port);
return new (task->kernel, "rust_chan")
rust_chan(task->kernel, port, port->unit_sz);
}
extern "C" CDECL
void del_chan(rust_task *task, rust_chan *chan) {
LOG(task, comm, "del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
chan->destroy();
}
extern "C" CDECL
void drop_chan(rust_task *task, rust_chan *chan) {
chan->ref_count--;
}
extern "C" CDECL
void drop_port(rust_task *, rust_port *port) {
port->ref_count--;
>>>>>>> Started working on a library-based comm system. Creating and deleting ports work.
}
//
// Local Variables:
// mode: C++

View file

@ -95,40 +95,33 @@ upcall_trace_str(rust_task *task, char const *c) {
task->sched->log(task, 2, "trace: %s", c);
}
extern "C" CDECL rust_port*
new_port(rust_task *task, size_t unit_sz);
extern "C" CDECL rust_port*
upcall_new_port(rust_task *task, size_t unit_sz) {
LOG_UPCALL_ENTRY(task);
LOG(task, comm, "upcall_new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)",
(uintptr_t) task, task->name, unit_sz);
// take a reference on behalf of the port
task->ref();
return new (task->kernel, "rust_port") rust_port(task, unit_sz);
return new_port(task, unit_sz);
}
extern "C" CDECL void
del_port(rust_task *task, rust_port *port);
extern "C" CDECL void
upcall_del_port(rust_task *task, rust_port *port) {
LOG_UPCALL_ENTRY(task);
LOG(task, comm, "upcall del_port(0x%" PRIxPTR ")", (uintptr_t) port);
I(task->sched, !port->ref_count);
delete port;
// FIXME: this should happen in the port.
task->deref();
return del_port(task, port);
}
/**
* Creates a new channel pointing to a given port.
*/
extern "C" CDECL rust_chan*
new_chan(rust_task *task, rust_port *port);
extern "C" CDECL rust_chan*
upcall_new_chan(rust_task *task, rust_port *port) {
LOG_UPCALL_ENTRY(task);
rust_scheduler *sched = task->sched;
LOG(task, comm, "upcall_new_chan("
"task=0x%" PRIxPTR " (%s), port=0x%" PRIxPTR ")",
(uintptr_t) task, task->name, port);
I(sched, port);
return new (task->kernel, "rust_chan")
rust_chan(task->kernel, port, port->unit_sz);
return new_chan(task, port);
}
/**
@ -148,11 +141,11 @@ upcall_flush_chan(rust_task *task, rust_chan *chan) {
* appear to be live, causing modify-after-free errors.
*/
extern "C" CDECL
void del_chan(rust_task *task, rust_chan *chan);
extern "C" CDECL
void upcall_del_chan(rust_task *task, rust_chan *chan) {
LOG_UPCALL_ENTRY(task);
LOG(task, comm, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
chan->destroy();
del_chan(task, chan);
}
/**

View file

@ -5,11 +5,15 @@ debug_box
debug_fn
debug_obj
debug_opaque
del_chan
del_port
debug_ptrcast
debug_tag
debug_trap
debug_tydesc
do_gc
drop_chan
drop_port
get_time
hack_allow_leaks
ivec_copy_from_buf
@ -20,6 +24,8 @@ ivec_reserve_shared
ivec_to_ptr
last_os_error
nano_time
new_chan
new_port
pin_task
unpin_task
rand_free

View file

@ -0,0 +1,53 @@
// xfail-stage0
// xfail-stage1
// xfail-stage2
// xfail-stage3
// Test case for issue #763, provided by robarnold.
use std;
import std::task;
tag request {
quit;
close(int, chan[bool]);
}
type ctx = chan[request];
fn request_task(c : chan[ctx]) {
let p = port();
c <| chan(p);
let req;
while (true) {
p |> req;
alt (req) {
quit. {
ret;
}
close(what, status) {
log "closing now";
log what;
status <| true;
}
}
}
}
fn new() -> ctx {
let p = port();
let t = spawn request_task(chan(p));
let cx;
p |> cx;
ret cx;
}
fn main() {
let cx = new();
let p = port();
cx <| close(4, chan(p));
let result;
p |> result;
cx <| quit;
}

8
src/test/stdtest/comm.rs Normal file
View file

@ -0,0 +1,8 @@
use std;
import std::comm;
#[test]
fn create_port_and_chan() {
let p = comm::mk_port[int]();
let c = comm::mk_chan(p);
}

View file

@ -2,6 +2,7 @@ use std;
mod bitv;
mod box;
mod comm;
mod deque;
mod either;
mod fs;