rt: Properly block tasks while waiting for port detach
This commit is contained in:
parent
4c4a2320eb
commit
1347d04bb0
5 changed files with 67 additions and 12 deletions
|
@ -42,7 +42,9 @@ native mod rustrt {
|
||||||
|
|
||||||
fn new_port(unit_sz: ctypes::size_t) -> *rust_port;
|
fn new_port(unit_sz: ctypes::size_t) -> *rust_port;
|
||||||
fn del_port(po: *rust_port);
|
fn del_port(po: *rust_port);
|
||||||
fn rust_port_detach(po: *rust_port);
|
fn rust_port_begin_detach(po: *rust_port,
|
||||||
|
yield: *ctypes::uintptr_t);
|
||||||
|
fn rust_port_end_detach(po: *rust_port);
|
||||||
fn get_port_id(po: *rust_port) -> port_id;
|
fn get_port_id(po: *rust_port) -> port_id;
|
||||||
fn rust_port_size(po: *rust_port) -> ctypes::size_t;
|
fn rust_port_size(po: *rust_port) -> ctypes::size_t;
|
||||||
fn port_recv(dptr: *uint, po: *rust_port,
|
fn port_recv(dptr: *uint, po: *rust_port,
|
||||||
|
@ -82,7 +84,17 @@ enum chan<T: send> {
|
||||||
resource port_ptr<T: send>(po: *rust_port) {
|
resource port_ptr<T: send>(po: *rust_port) {
|
||||||
// Once the port is detached it's guaranteed not to receive further
|
// Once the port is detached it's guaranteed not to receive further
|
||||||
// messages
|
// messages
|
||||||
rustrt::rust_port_detach(po);
|
let yield = 0u;
|
||||||
|
let yieldp = ptr::addr_of(yield);
|
||||||
|
rustrt::rust_port_begin_detach(po, yieldp);
|
||||||
|
if yield != 0u {
|
||||||
|
// Need to wait for the port to be detached
|
||||||
|
// FIXME: If this fails then we're going to leave our port
|
||||||
|
// in a bogus state.
|
||||||
|
task::yield();
|
||||||
|
}
|
||||||
|
rustrt::rust_port_end_detach(po);
|
||||||
|
|
||||||
// Drain the port so that all the still-enqueued items get dropped
|
// Drain the port so that all the still-enqueued items get dropped
|
||||||
while rustrt::rust_port_size(po) > 0u {
|
while rustrt::rust_port_size(po) > 0u {
|
||||||
// FIXME: For some reason if we don't assign to something here
|
// FIXME: For some reason if we don't assign to something here
|
||||||
|
|
|
@ -482,18 +482,23 @@ new_port(size_t unit_sz) {
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" CDECL void
|
extern "C" CDECL void
|
||||||
rust_port_detach(rust_port *port) {
|
rust_port_begin_detach(rust_port *port, uintptr_t *yield) {
|
||||||
rust_task *task = rust_task_thread::get_task();
|
rust_task *task = rust_task_thread::get_task();
|
||||||
LOG(task, comm, "rust_port_detach(0x%" PRIxPTR ")", (uintptr_t) port);
|
LOG(task, comm, "rust_port_detach(0x%" PRIxPTR ")", (uintptr_t) port);
|
||||||
port->detach();
|
port->begin_detach(yield);
|
||||||
|
}
|
||||||
|
|
||||||
|
extern "C" CDECL void
|
||||||
|
rust_port_end_detach(rust_port *port) {
|
||||||
|
port->end_detach();
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" CDECL void
|
extern "C" CDECL void
|
||||||
del_port(rust_port *port) {
|
del_port(rust_port *port) {
|
||||||
rust_task *task = rust_task_thread::get_task();
|
rust_task *task = rust_task_thread::get_task();
|
||||||
LOG(task, comm, "del_port(0x%" PRIxPTR ")", (uintptr_t) port);
|
LOG(task, comm, "del_port(0x%" PRIxPTR ")", (uintptr_t) port);
|
||||||
A(task->thread, port->get_ref_count() == 1, "Expected port ref_count == 1");
|
A(task->thread, port->get_ref_count() == 0, "Expected port ref_count == 0");
|
||||||
port->deref();
|
delete port;
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" CDECL size_t
|
extern "C" CDECL size_t
|
||||||
|
|
|
@ -20,13 +20,40 @@ rust_port::~rust_port() {
|
||||||
task->deref();
|
task->deref();
|
||||||
}
|
}
|
||||||
|
|
||||||
void rust_port::detach() {
|
void rust_port::delete_this() {
|
||||||
|
scoped_lock with(detach_lock);
|
||||||
|
|
||||||
|
if (task->blocked_on(&detach_cond)) {
|
||||||
|
// The port owner is waiting for the port to be detached
|
||||||
|
task->wakeup(&detach_cond);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void rust_port::begin_detach(uintptr_t *yield) {
|
||||||
|
*yield = false;
|
||||||
|
|
||||||
task->release_port(id);
|
task->release_port(id);
|
||||||
// FIXME: Busy waiting until we're the only ref
|
|
||||||
|
deref();
|
||||||
|
|
||||||
|
scoped_lock with(detach_lock);
|
||||||
|
if (get_ref_count() != 0) {
|
||||||
|
task->block(&detach_cond, "waiting for port detach");
|
||||||
|
*yield = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void rust_port::end_detach() {
|
||||||
|
// FIXME: For some reason, on rare occasion we can get here without
|
||||||
|
// actually having the ref count go to 0. Possibly related to #1923
|
||||||
bool done = false;
|
bool done = false;
|
||||||
while (!done) {
|
while (!done) {
|
||||||
done = get_ref_count() == 1;
|
done = get_ref_count() == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Just take the lock to make sure that the thread that signaled
|
||||||
|
// the detach_cond isn't still holding it
|
||||||
|
scoped_lock with(detach_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
void rust_port::send(void *sptr) {
|
void rust_port::send(void *sptr) {
|
||||||
|
|
|
@ -3,6 +3,8 @@
|
||||||
|
|
||||||
#include "rust_internal.h"
|
#include "rust_internal.h"
|
||||||
|
|
||||||
|
class port_detach_cond : public rust_cond { };
|
||||||
|
|
||||||
class rust_port : public kernel_owned<rust_port>, public rust_cond {
|
class rust_port : public kernel_owned<rust_port>, public rust_cond {
|
||||||
public:
|
public:
|
||||||
RUST_ATOMIC_REFCOUNT();
|
RUST_ATOMIC_REFCOUNT();
|
||||||
|
@ -16,15 +18,23 @@ public:
|
||||||
|
|
||||||
lock_and_signal lock;
|
lock_and_signal lock;
|
||||||
|
|
||||||
|
private:
|
||||||
|
// Protects blocking and signaling on detach_cond
|
||||||
|
lock_and_signal detach_lock;
|
||||||
|
port_detach_cond detach_cond;
|
||||||
|
|
||||||
|
public:
|
||||||
rust_port(rust_task *task, size_t unit_sz);
|
rust_port(rust_task *task, size_t unit_sz);
|
||||||
~rust_port();
|
~rust_port();
|
||||||
void delete_this() { delete this; }
|
void delete_this();
|
||||||
|
|
||||||
void log_state();
|
void log_state();
|
||||||
void send(void *sptr);
|
void send(void *sptr);
|
||||||
void receive(void *dptr, uintptr_t *yield);
|
void receive(void *dptr, uintptr_t *yield);
|
||||||
size_t size();
|
size_t size();
|
||||||
void detach();
|
|
||||||
|
void begin_detach(uintptr_t *yield);
|
||||||
|
void end_detach();
|
||||||
};
|
};
|
||||||
|
|
||||||
//
|
//
|
||||||
|
|
|
@ -34,7 +34,8 @@ rust_str_push
|
||||||
rust_list_files
|
rust_list_files
|
||||||
rust_log_console_on
|
rust_log_console_on
|
||||||
rust_log_console_off
|
rust_log_console_off
|
||||||
rust_port_detach
|
rust_port_begin_detach
|
||||||
|
rust_port_end_detach
|
||||||
rust_port_size
|
rust_port_size
|
||||||
rust_process_wait
|
rust_process_wait
|
||||||
rust_ptr_eq
|
rust_ptr_eq
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue