1
Fork 0

rt: Simplify channel-port association

It turns out that there's only ever a single channel per port these days,
and it always has the same lifetime as the port, so we don't need a list or a
complex association protocol.
This commit is contained in:
Brian Anderson 2011-11-10 16:10:57 -08:00
parent 08d0ff38bc
commit e4f980810b
5 changed files with 23 additions and 70 deletions

View file

@ -538,9 +538,8 @@ port_recv(uintptr_t *dptr, rust_port *port) {
scoped_lock with(port->lock); scoped_lock with(port->lock);
LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR
", size: 0x%" PRIxPTR ", chan_no: %d", ", size: 0x%" PRIxPTR,
(uintptr_t) port, (uintptr_t) dptr, port->unit_sz, (uintptr_t) port, (uintptr_t) dptr, port->unit_sz);
port->chans.length());
if (port->receive(dptr)) { if (port->receive(dptr)) {
return; return;

View file

@ -6,16 +6,17 @@
*/ */
rust_chan::rust_chan(rust_kernel *kernel, rust_port *port, rust_chan::rust_chan(rust_kernel *kernel, rust_port *port,
size_t unit_sz) size_t unit_sz)
: ref_count(1), : ref_count(0),
kernel(kernel), kernel(kernel),
port(port), port(port),
buffer(kernel, unit_sz) { buffer(kernel, unit_sz) {
if (port) {
associate(port);
}
KLOG(kernel, comm, "new rust_chan(task=0x%" PRIxPTR KLOG(kernel, comm, "new rust_chan(task=0x%" PRIxPTR
", port=0x%" PRIxPTR ") -> chan=0x%" PRIxPTR, ", port=0x%" PRIxPTR ") -> chan=0x%" PRIxPTR,
(uintptr_t) task, (uintptr_t) port, (uintptr_t) this); (uintptr_t) task, (uintptr_t) port, (uintptr_t) this);
A(kernel, port != NULL, "Port must not be null");
this->task = port->task;
this->task->ref();
} }
rust_chan::~rust_chan() { rust_chan::~rust_chan() {
@ -26,49 +27,15 @@ rust_chan::~rust_chan() {
A(kernel, is_associated() == false, A(kernel, is_associated() == false,
"Channel must be disassociated before being freed."); "Channel must be disassociated before being freed.");
}
/** task->deref();
* Link this channel with the specified port. task = NULL;
*/
void rust_chan::associate(rust_port *port) {
this->ref();
this->port = port;
scoped_lock with(port->lock);
KLOG(kernel, task,
"associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
this, port);
this->task = port->task;
this->task->ref();
this->port->chans.push(this);
} }
bool rust_chan::is_associated() { bool rust_chan::is_associated() {
return port != NULL; return port != NULL;
} }
/**
* Unlink this channel from its associated port.
*/
void rust_chan::disassociate() {
A(kernel,
port->lock.lock_held_by_current_thread(),
"Port referent lock must be held to call rust_chan::disassociate");
A(kernel, is_associated(),
"Channel must be associated with a port.");
KLOG(kernel, task,
"disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
this, port);
task->deref();
this->task = NULL;
port->chans.swap_delete(this);
// Delete reference to the port.
port = NULL;
this->deref();
}
/** /**
* Attempt to send data to the associated port. * Attempt to send data to the associated port.
*/ */

View file

@ -16,8 +16,6 @@ public:
size_t idx; size_t idx;
circular_buffer buffer; circular_buffer buffer;
void associate(rust_port *port);
void disassociate();
bool is_associated(); bool is_associated();
void send(void *sptr); void send(void *sptr);

View file

@ -5,7 +5,7 @@
rust_port::rust_port(rust_task *task, size_t unit_sz) rust_port::rust_port(rust_task *task, size_t unit_sz)
: ref_count(1), kernel(task->kernel), task(task), : ref_count(1), kernel(task->kernel), task(task),
unit_sz(unit_sz), writers(task), chans(task) { unit_sz(unit_sz), writers(task) {
LOG(task, comm, LOG(task, comm,
"new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%" "new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%"
@ -14,47 +14,37 @@ rust_port::rust_port(rust_task *task, size_t unit_sz)
id = task->register_port(this); id = task->register_port(this);
remote_chan = new (task->kernel, "rust_chan") remote_chan = new (task->kernel, "rust_chan")
rust_chan(task->kernel, this, unit_sz); rust_chan(task->kernel, this, unit_sz);
remote_chan->ref();
remote_chan->port = this;
} }
rust_port::~rust_port() { rust_port::~rust_port() {
LOG(task, comm, "~rust_port 0x%" PRIxPTR, (uintptr_t) this); LOG(task, comm, "~rust_port 0x%" PRIxPTR, (uintptr_t) this);
// Disassociate channels from this port. {
while (chans.is_empty() == false) {
scoped_lock with(lock); scoped_lock with(lock);
rust_chan *chan = chans.peek(); remote_chan->port = NULL;
chan->disassociate(); remote_chan->deref();
remote_chan = NULL;
} }
remote_chan->deref();
remote_chan = NULL;
task->release_port(id); task->release_port(id);
} }
bool rust_port::receive(void *dptr) { bool rust_port::receive(void *dptr) {
for (uint32_t i = 0; i < chans.length(); i++) { if (remote_chan->buffer.is_empty() == false) {
rust_chan *chan = chans[i]; remote_chan->buffer.dequeue(dptr);
if (chan->buffer.is_empty() == false) { LOG(task, comm, "<=== read data ===");
chan->buffer.dequeue(dptr); return true;
LOG(task, comm, "<=== read data ===");
return true;
}
} }
return false; return false;
} }
void rust_port::log_state() { void rust_port::log_state() {
LOG(task, comm, LOG(task, comm,
"rust_port: 0x%" PRIxPTR ", associated channel(s): %d", "\tchan: 0x%" PRIxPTR ", size: %d",
this, chans.length()); remote_chan,
for (uint32_t i = 0; i < chans.length(); i++) { remote_chan->buffer.size());
rust_chan *chan = chans[i];
LOG(task, comm,
"\tchan: 0x%" PRIxPTR ", size: %d",
chan,
chan->buffer.size());
}
} }
// //

View file

@ -12,7 +12,6 @@ public:
rust_chan *remote_chan; rust_chan *remote_chan;
size_t unit_sz; size_t unit_sz;
ptr_vec<rust_token> writers; ptr_vec<rust_token> writers;
ptr_vec<rust_chan> chans;
lock_and_signal lock; lock_and_signal lock;