1
Fork 0

Atomic ref counting for chans.

This commit is contained in:
Eric Holk 2011-08-05 15:16:48 -07:00
parent 200bbcf91b
commit b62e80c1f0
12 changed files with 61 additions and 38 deletions

View file

@ -36,6 +36,8 @@ type upcalls =
del_port: ValueRef, del_port: ValueRef,
new_chan: ValueRef, new_chan: ValueRef,
flush_chan: ValueRef, flush_chan: ValueRef,
drop_chan: ValueRef,
take_chan: ValueRef,
del_chan: ValueRef, del_chan: ValueRef,
clone_chan: ValueRef, clone_chan: ValueRef,
chan_target_task: ValueRef, chan_target_task: ValueRef,
@ -94,6 +96,8 @@ fn declare_upcalls(tn: type_names, tydesc_type: TypeRef,
new_chan: new_chan:
d("new_chan", ~[T_opaque_port_ptr()], T_opaque_chan_ptr()), d("new_chan", ~[T_opaque_port_ptr()], T_opaque_chan_ptr()),
flush_chan: dv("flush_chan", ~[T_opaque_chan_ptr()]), flush_chan: dv("flush_chan", ~[T_opaque_chan_ptr()]),
drop_chan: dv("drop_chan", ~[T_opaque_chan_ptr()]),
take_chan: dv("take_chan", ~[T_opaque_chan_ptr()]),
del_chan: dv("del_chan", ~[T_opaque_chan_ptr()]), del_chan: dv("del_chan", ~[T_opaque_chan_ptr()]),
clone_chan: clone_chan:
d("clone_chan", ~[taskptr_type, T_opaque_chan_ptr()], d("clone_chan", ~[taskptr_type, T_opaque_chan_ptr()],

View file

@ -1237,6 +1237,12 @@ fn make_copy_glue(cx: &@block_ctxt, v: ValueRef, t: &ty::t) {
cx.build.Call(bcx_ccx(cx).upcalls.take_task, cx.build.Call(bcx_ccx(cx).upcalls.take_task,
~[cx.fcx.lltaskptr, task_ptr]); ~[cx.fcx.lltaskptr, task_ptr]);
bcx = cx; bcx = cx;
} else if ty::type_is_chan(bcx_tcx(cx), t) {
let ptr = cx.build.Load(v);
ptr = cx.build.PointerCast(ptr, T_opaque_chan_ptr());
cx.build.Call(bcx_ccx(cx).upcalls.take_chan,
~[cx.fcx.lltaskptr, ptr]);
bcx = cx;
} else if ty::type_is_boxed(bcx_tcx(cx), t) { } else if ty::type_is_boxed(bcx_tcx(cx), t) {
bcx = incr_refcnt_of_boxed(cx, cx.build.Load(v)).bcx; bcx = incr_refcnt_of_boxed(cx, cx.build.Load(v)).bcx;
} else if (ty::type_is_structural(bcx_tcx(cx), t)) { } else if (ty::type_is_structural(bcx_tcx(cx), t)) {
@ -1395,7 +1401,13 @@ fn make_drop_glue(cx: &@block_ctxt, v0: ValueRef, t: &ty::t) {
} }
ty::ty_box(_) { decr_refcnt_maybe_free(cx, v0, v0, t) } ty::ty_box(_) { decr_refcnt_maybe_free(cx, v0, v0, t) }
ty::ty_port(_) { decr_refcnt_maybe_free(cx, v0, v0, t) } ty::ty_port(_) { decr_refcnt_maybe_free(cx, v0, v0, t) }
ty::ty_chan(_) { decr_refcnt_maybe_free(cx, v0, v0, t) } ty::ty_chan(_) {
let ptr = cx.build.Load(v0);
ptr = cx.build.PointerCast(ptr, T_opaque_chan_ptr());
{bcx: cx,
val: cx.build.Call(bcx_ccx(cx).upcalls.drop_chan,
~[cx.fcx.lltaskptr, ptr])}
}
ty::ty_task. { ty::ty_task. {
let task_ptr = cx.build.Load(v0); let task_ptr = cx.build.Load(v0);
{bcx: cx, {bcx: cx,

View file

@ -904,9 +904,14 @@ void del_chan(rust_task *task, rust_chan *chan) {
chan->destroy(); chan->destroy();
} }
extern "C" CDECL
void take_chan(rust_task *task, rust_chan *chan) {
chan->ref();
}
extern "C" CDECL extern "C" CDECL
void drop_chan(rust_task *task, rust_chan *chan) { void drop_chan(rust_task *task, rust_chan *chan) {
chan->ref_count--; chan->deref();
} }
extern "C" CDECL extern "C" CDECL

View file

@ -22,6 +22,8 @@ rust_chan::~rust_chan() {
KLOG(kernel, comm, "del rust_chan(task=0x%" PRIxPTR ")", KLOG(kernel, comm, "del rust_chan(task=0x%" PRIxPTR ")",
(uintptr_t) this); (uintptr_t) this);
this->destroy();
A(kernel, is_associated() == false, A(kernel, is_associated() == false,
"Channel must be disassociated before being freed."); "Channel must be disassociated before being freed.");
} }
@ -35,7 +37,7 @@ void rust_chan::associate(rust_port *port) {
KLOG(kernel, task, KLOG(kernel, task,
"associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
this, port); this, port);
++this->ref_count; this->ref();
this->task = port->task; this->task = port->task;
this->task->ref(); this->task->ref();
this->port->chans.push(this); this->port->chans.push(this);
@ -57,13 +59,14 @@ void rust_chan::disassociate() {
KLOG(kernel, task, KLOG(kernel, task,
"disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
this, port); this, port);
--this->ref_count;
task->deref(); task->deref();
this->task = NULL; this->task = NULL;
port->chans.swap_delete(this); port->chans.swap_delete(this);
// Delete reference to the port. // Delete reference to the port.
port = NULL; port = NULL;
this->deref();
} }
/** /**
@ -101,9 +104,6 @@ rust_chan *rust_chan::clone(rust_task *target) {
* appear to be live, causing modify-after-free errors. * appear to be live, causing modify-after-free errors.
*/ */
void rust_chan::destroy() { void rust_chan::destroy() {
A(kernel, ref_count == 0,
"Channel's ref count should be zero.");
if (is_associated()) { if (is_associated()) {
// We're trying to delete a channel that another task may be // We're trying to delete a channel that another task may be
// reading from. We have two options: // reading from. We have two options:
@ -121,7 +121,6 @@ void rust_chan::destroy() {
scoped_lock with(port->lock); scoped_lock with(port->lock);
disassociate(); disassociate();
} }
delete this;
} }
// //

View file

@ -3,13 +3,13 @@
class rust_chan : public kernel_owned<rust_chan>, class rust_chan : public kernel_owned<rust_chan>,
public rust_cond { public rust_cond {
~rust_chan();
public: public:
RUST_REFCOUNTED_WITH_DTOR(rust_chan, destroy()) RUST_ATOMIC_REFCOUNT();
rust_chan(rust_kernel *kernel, rust_port *port, rust_chan(rust_kernel *kernel, rust_port *port,
size_t unit_sz); size_t unit_sz);
~rust_chan();
rust_kernel *kernel; rust_kernel *kernel;
rust_task *task; rust_task *task;
rust_port *port; rust_port *port;

View file

@ -108,6 +108,13 @@ static size_t const BUF_BYTES = 2048;
void ref() { ++ref_count; } \ void ref() { ++ref_count; } \
void deref() { if (--ref_count == 0) { dtor; } } void deref() { if (--ref_count == 0) { dtor; } }
#define RUST_ATOMIC_REFCOUNT() \
private: \
intptr_t ref_count; \
public: \
void ref() { sync::increment(ref_count); } \
void deref() { if(0 == sync::decrement(ref_count)) { delete this; } }
template <typename T> struct rc_base { template <typename T> struct rc_base {
RUST_REFCOUNTED(T) RUST_REFCOUNTED(T)

View file

@ -8,10 +8,6 @@ rust_port::rust_port(rust_task *task, size_t unit_sz)
LOG(task, comm, LOG(task, comm,
"new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%" "new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%"
PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this); PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this);
// Allocate a remote channel, for remote channel data.
remote_channel = new (kernel, "remote chan")
rust_chan(kernel, this, unit_sz);
} }
rust_port::~rust_port() { rust_port::~rust_port() {
@ -22,17 +18,9 @@ rust_port::~rust_port() {
scoped_lock with(lock); scoped_lock with(lock);
rust_chan *chan = chans.peek(); rust_chan *chan = chans.peek();
chan->disassociate(); chan->disassociate();
if (chan->ref_count == 0) {
LOG(task, comm,
"chan: 0x%" PRIxPTR " is dormant, freeing", chan);
delete chan;
} }
} }
delete remote_channel;
}
bool rust_port::receive(void *dptr) { bool rust_port::receive(void *dptr) {
for (uint32_t i = 0; i < chans.length(); i++) { for (uint32_t i = 0; i < chans.length(); i++) {
rust_chan *chan = chans[i]; rust_chan *chan = chans[i];
@ -52,10 +40,9 @@ void rust_port::log_state() {
for (uint32_t i = 0; i < chans.length(); i++) { for (uint32_t i = 0; i < chans.length(); i++) {
rust_chan *chan = chans[i]; rust_chan *chan = chans[i];
LOG(task, comm, LOG(task, comm,
"\tchan: 0x%" PRIxPTR ", size: %d, remote: %s", "\tchan: 0x%" PRIxPTR ", size: %d",
chan, chan,
chan->buffer.size(), chan->buffer.size());
chan == remote_channel ? "yes" : "no");
} }
} }

View file

@ -12,9 +12,6 @@ public:
ptr_vec<rust_token> writers; ptr_vec<rust_token> writers;
ptr_vec<rust_chan> chans; ptr_vec<rust_chan> chans;
// Data sent to this port from remote tasks is buffered in this channel.
rust_chan *remote_channel;
lock_and_signal lock; lock_and_signal lock;
rust_port(rust_task *task, size_t unit_sz); rust_port(rust_task *task, size_t unit_sz);

View file

@ -38,14 +38,7 @@ struct gc_alloc {
struct struct
rust_task : public kernel_owned<rust_task>, rust_cond rust_task : public kernel_owned<rust_task>, rust_cond
{ {
// This block could be pulled out into something like a RUST_ATOMIC_REFCOUNT();
// RUST_ATOMIC_REFCOUNTED macro.
private:
intptr_t ref_count;
public:
void ref() { sync::increment(ref_count); }
void deref() { if(0 == sync::decrement(ref_count)) { delete this; } }
// Fields known to the compiler. // Fields known to the compiler.
stk_seg *stk; stk_seg *stk;

View file

@ -508,6 +508,22 @@ upcall_drop_task(rust_task *task, rust_task *target) {
} }
} }
extern "C" CDECL void
upcall_take_chan(rust_task *task, rust_chan *target) {
LOG_UPCALL_ENTRY(task);
if(target) {
target->ref();
}
}
extern "C" CDECL void
upcall_drop_chan(rust_task *task, rust_chan *target) {
LOG_UPCALL_ENTRY(task);
if(target) {
target->deref();
}
}
extern "C" CDECL rust_task * extern "C" CDECL rust_task *
upcall_start_task(rust_task *spawner, upcall_start_task(rust_task *spawner,
rust_task *task, rust_task *task,

View file

@ -69,6 +69,7 @@ str_from_vec
str_push_byte str_push_byte
str_slice str_slice
str_vec str_vec
take_chan
task_sleep task_sleep
task_yield task_yield
task_join task_join
@ -78,6 +79,7 @@ upcall_chan_target_task
upcall_clone_chan upcall_clone_chan
upcall_del_chan upcall_del_chan
upcall_del_port upcall_del_port
upcall_drop_chan
upcall_drop_task upcall_drop_task
upcall_dup_str upcall_dup_str
upcall_exit upcall_exit
@ -108,6 +110,7 @@ upcall_shared_free
upcall_sleep upcall_sleep
upcall_start_task upcall_start_task
upcall_take_task upcall_take_task
upcall_take_chan
upcall_trace_str upcall_trace_str
upcall_trace_word upcall_trace_word
upcall_vec_append upcall_vec_append