From de611a309006f0976bc9a579eb1087e7a89f79a7 Mon Sep 17 00:00:00 2001 From: Michael Bebenita Date: Tue, 7 Sep 2010 18:39:07 -0700 Subject: [PATCH] Lots of design changes around proxies and message passing. Made it so that domains can only talk to other domains via handles, and with the help of the rust_kernel. --- src/Makefile | 30 +---- src/boot/be/abi.ml | 2 +- src/rt/memory.h | 5 - src/rt/memory_region.cpp | 7 +- src/rt/rust.cpp | 87 +++++++------- src/rt/rust_chan.cpp | 25 ++-- src/rt/rust_chan.h | 3 +- src/rt/rust_dom.cpp | 121 +++---------------- src/rt/rust_dom.h | 17 ++- src/rt/rust_internal.h | 30 +++-- src/rt/rust_kernel.cpp | 172 +++++++++++++++++++++++++-- src/rt/rust_kernel.h | 107 +++++++++++++++-- src/rt/rust_message.cpp | 102 +++++++++------- src/rt/rust_message.h | 79 +++++++++---- src/rt/rust_port.cpp | 6 +- src/rt/rust_proxy.h | 53 +++++---- src/rt/rust_srv.cpp | 12 +- src/rt/rust_srv.h | 2 +- src/rt/rust_task.cpp | 13 ++- src/rt/rust_task.h | 3 + src/rt/rust_upcall.cpp | 212 ++++++++++++++++------------------ src/rt/sync/lock_free_queue.h | 2 - src/rt/util/indexed_list.h | 4 +- 23 files changed, 650 insertions(+), 444 deletions(-) diff --git a/src/Makefile b/src/Makefile index 827eb2eee5a..36aaf2a8a15 100644 --- a/src/Makefile +++ b/src/Makefile @@ -380,33 +380,10 @@ self: $(CFG_COMPILER) # Temporarily xfail the entire multi-tasking system, pending resolution # of inter-task shutdown races introduced with notification proxies. -TASK_XFAILS := test/run-pass/acyclic-unwind.rs \ - test/run-pass/alt-type-simple.rs \ - test/run-pass/basic.rs \ - test/run-pass/clone-with-exterior.rs \ - test/run-pass/comm.rs \ - test/run-pass/lazychan.rs \ - test/run-pass/many.rs \ - test/run-pass/obj-dtor.rs \ - test/run-pass/preempt.rs \ - test/run-pass/spawn-fn.rs \ - test/run-pass/spawn-module-qualified.rs \ - test/run-pass/spawn.rs \ - test/run-pass/task-comm-0.rs \ - test/run-pass/task-comm-1.rs \ - test/run-pass/task-comm-2.rs \ - test/run-pass/task-comm-3.rs \ - test/run-pass/task-comm-7.rs \ - test/run-pass/task-comm-8.rs \ - test/run-pass/task-comm-9.rs \ - test/run-pass/task-comm-10.rs \ - test/run-pass/task-comm-11.rs \ - test/run-pass/task-life-0.rs \ - test/run-pass/task-comm.rs \ - test/run-pass/threads.rs \ - test/run-pass/yield.rs \ +TASK_XFAILS := test/run-pass/task-comm-10.rs \ test/run-pass/task-comm-15.rs \ - test/run-pass/task-life-0.rs + test/run-pass/task-life-0.rs \ + test/run-pass/alt-type-simple.rs TEST_XFAILS_X86 := $(TASK_XFAILS) \ test/run-pass/child-outlives-parent.rs \ @@ -425,6 +402,7 @@ TEST_XFAILS_X86 := $(TASK_XFAILS) \ test/run-pass/task-comm.rs \ test/run-pass/vec-slice.rs \ test/run-pass/task-comm-3.rs \ + test/run-fail/task-comm-14.rs \ test/compile-fail/bad-recv.rs \ test/compile-fail/bad-send.rs \ test/compile-fail/infinite-tag-type-recursion.rs \ diff --git a/src/boot/be/abi.ml b/src/boot/be/abi.ml index b58f91259e3..43043e111a6 100644 --- a/src/boot/be/abi.ml +++ b/src/boot/be/abi.ml @@ -20,7 +20,7 @@ let task_field_gc_alloc_chain = task_field_rust_sp + 1;; let task_field_dom = task_field_gc_alloc_chain + 1;; let n_visible_task_fields = task_field_dom + 1;; -let dom_field_interrupt_flag = 0;; +let dom_field_interrupt_flag = 1;; let frame_glue_fns_field_mark = 0;; let frame_glue_fns_field_drop = 1;; diff --git a/src/rt/memory.h b/src/rt/memory.h index 22bc15d32b6..9196e28dd83 100644 --- a/src/rt/memory.h +++ b/src/rt/memory.h @@ -1,11 +1,6 @@ -/* - * - */ - #ifndef MEMORY_H #define MEMORY_H - inline void *operator new(size_t size, void *mem) { return mem; } diff --git a/src/rt/memory_region.cpp b/src/rt/memory_region.cpp index 797a7c1d669..2f841935c11 100644 --- a/src/rt/memory_region.cpp +++ b/src/rt/memory_region.cpp @@ -1,7 +1,3 @@ -/* - * - */ - #include "rust_internal.h" #include "memory_region.h" @@ -20,6 +16,7 @@ memory_region::memory_region(memory_region *parent) : } void memory_region::free(void *mem) { + // printf("free: ptr 0x%" PRIxPTR"\n", (uintptr_t) mem); if (_synchronized) { _lock.lock(); } #ifdef TRACK_ALLOCATIONS if (_allocation_list.replace(mem, NULL) == false) { @@ -34,7 +31,6 @@ void memory_region::free(void *mem) { _live_allocations--; _srv->free(mem); if (_synchronized) { _lock.unlock(); } - } void * @@ -63,6 +59,7 @@ memory_region::malloc(size_t size) { #ifdef TRACK_ALLOCATIONS _allocation_list.append(mem); #endif + // printf("malloc: ptr 0x%" PRIxPTR "\n", (uintptr_t) mem); if (_synchronized) { _lock.unlock(); } return mem; } diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index 43818de5514..0ea167a4a8c 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -1,16 +1,16 @@ #include "rust_internal.h" struct -command_line_args +command_line_args : public dom_owned { - rust_dom &dom; + rust_dom *dom; int argc; char **argv; // vec[str] passed to rust_task::start. rust_vec *args; - command_line_args(rust_dom &dom, + command_line_args(rust_dom *dom, int sys_argc, char **sys_argv) : dom(dom), @@ -21,29 +21,29 @@ command_line_args #if defined(__WIN32__) LPCWSTR cmdline = GetCommandLineW(); LPWSTR *wargv = CommandLineToArgvW(cmdline, &argc); - dom.win32_require("CommandLineToArgvW", wargv != NULL); - argv = (char **) dom.malloc(sizeof(char*) * argc); + dom->win32_require("CommandLineToArgvW", wargv != NULL); + argv = (char **) dom->malloc(sizeof(char*) * argc); for (int i = 0; i < argc; ++i) { int n_chars = WideCharToMultiByte(CP_UTF8, 0, wargv[i], -1, NULL, 0, NULL, NULL); - dom.win32_require("WideCharToMultiByte(0)", n_chars != 0); - argv[i] = (char *) dom.malloc(n_chars); + dom->win32_require("WideCharToMultiByte(0)", n_chars != 0); + argv[i] = (char *) dom->malloc(n_chars); n_chars = WideCharToMultiByte(CP_UTF8, 0, wargv[i], -1, argv[i], n_chars, NULL, NULL); - dom.win32_require("WideCharToMultiByte(1)", n_chars != 0); + dom->win32_require("WideCharToMultiByte(1)", n_chars != 0); } LocalFree(wargv); #endif size_t vec_fill = sizeof(rust_str *) * argc; size_t vec_alloc = next_power_of_two(sizeof(rust_vec) + vec_fill); - void *mem = dom.malloc(vec_alloc); - args = new (mem) rust_vec(&dom, vec_alloc, 0, NULL); + void *mem = dom->malloc(vec_alloc); + args = new (mem) rust_vec(dom, vec_alloc, 0, NULL); rust_str **strs = (rust_str**) &args->data[0]; for (int i = 0; i < argc; ++i) { size_t str_fill = strlen(argv[i]) + 1; size_t str_alloc = next_power_of_two(sizeof(rust_str) + str_fill); - mem = dom.malloc(str_alloc); - strs[i] = new (mem) rust_str(&dom, str_alloc, str_fill, + mem = dom->malloc(str_alloc); + strs[i] = new (mem) rust_str(dom, str_alloc, str_fill, (uint8_t const *)argv[i]); } args->fill = vec_fill; @@ -58,50 +58,55 @@ command_line_args // Drop the args we've had pinned here. rust_str **strs = (rust_str**) &args->data[0]; for (int i = 0; i < argc; ++i) - dom.free(strs[i]); - dom.free(args); + dom->free(strs[i]); + dom->free(args); } #ifdef __WIN32__ for (int i = 0; i < argc; ++i) { - dom.free(argv[i]); + dom->free(argv[i]); } - dom.free(argv); + dom->free(argv); #endif } }; +/** + * Main entry point into the Rust runtime. Here we create a Rust service, + * initialize the kernel, create the root domain and run it. + */ extern "C" CDECL int -rust_start(uintptr_t main_fn, rust_crate const *crate, int argc, char **argv) -{ - int ret; - { - rust_srv srv; - rust_dom dom(&srv, crate, "main"); - srv.kernel->register_domain(&dom); - command_line_args args(dom, argc, argv); +rust_start(uintptr_t main_fn, rust_crate const *crate, int argc, + char **argv) { - dom.log(rust_log::DOM, "startup: %d args", args.argc); - for (int i = 0; i < args.argc; ++i) - dom.log(rust_log::DOM, - "startup: arg[%d] = '%s'", i, args.argv[i]); + rust_srv *srv = new rust_srv(); + rust_kernel *kernel = new rust_kernel(srv); + kernel->start(); + rust_handle *handle = kernel->create_domain(crate, "main"); + rust_dom *dom = handle->referent(); + command_line_args *args = new (dom) command_line_args(dom, argc, argv); - if (dom._log.is_tracing(rust_log::DWARF)) { - rust_crate_reader rdr(&dom, crate); - } - - uintptr_t main_args[4] = { 0, 0, 0, (uintptr_t)args.args }; - - dom.root_task->start(crate->get_exit_task_glue(), - main_fn, - (uintptr_t)&main_args, - sizeof(main_args)); - - ret = dom.start_main_loop(); - srv.kernel->deregister_domain(&dom); + dom->log(rust_log::DOM, "startup: %d args", args->argc); + for (int i = 0; i < args->argc; i++) { + dom->log(rust_log::DOM, + "startup: arg[%d] = '%s'", i, args->argv[i]); } + if (dom->_log.is_tracing(rust_log::DWARF)) { + rust_crate_reader create_reader(dom, crate); + } + + uintptr_t main_args[4] = {0, 0, 0, (uintptr_t)args->args}; + dom->root_task->start(crate->get_exit_task_glue(), + main_fn, (uintptr_t)&main_args, sizeof(main_args)); + int ret = dom->start_main_loop(); + delete args; + kernel->destroy_domain(dom); + kernel->join_all_domains(); + delete kernel; + delete srv; + #if !defined(__WIN32__) // Don't take down the process if the main thread exits without an // error. diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index 2a0a61db7f1..9a6664535cf 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -4,13 +4,15 @@ /** * Create a new rust channel and associate it with the specified port. */ -rust_chan::rust_chan(rust_task *task, maybe_proxy *port) : - task(task), port(port), buffer(task->dom, port->delegate()->unit_sz) { - +rust_chan::rust_chan(rust_task *task, + maybe_proxy *port, + size_t unit_sz) : + task(task), + port(port), + buffer(task->dom, unit_sz) { if (port) { associate(port); } - task->log(rust_log::MEM | rust_log::COMM, "new rust_chan(task=0x%" PRIxPTR ", port=0x%" PRIxPTR ") -> chan=0x%" PRIxPTR, @@ -34,7 +36,7 @@ void rust_chan::associate(maybe_proxy *port) { task->log(rust_log::TASK, "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, this, port); - this->port->delegate()->chans.push(this); + this->port->referent()->chans.push(this); } } @@ -51,8 +53,8 @@ void rust_chan::disassociate() { if (port->is_proxy() == false) { task->log(rust_log::TASK, "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, - this, port->delegate()); - port->delegate()->chans.swap_delete(this); + this, port->referent()); + port->referent()->chans.swap_delete(this); } // Delete reference to the port. @@ -76,14 +78,11 @@ void rust_chan::send(void *sptr) { "rust_chan::transmit with nothing to send."); if (port->is_proxy()) { - // TODO: Cache port task locally. - rust_proxy *port_task = - dom->get_task_proxy(port->delegate()->task); - data_message::send(buffer.peek(), buffer.unit_sz, - "send data", task, port_task, port->as_proxy()); + data_message::send(buffer.peek(), buffer.unit_sz, "send data", + task->get_handle(), port->as_proxy()->handle()); buffer.dequeue(NULL); } else { - rust_port *target_port = port->delegate(); + rust_port *target_port = port->referent(); if (target_port->task->blocked_on(target_port)) { dom->log(rust_log::COMM, "dequeued in rendezvous_ptr"); buffer.dequeue(target_port->task->rendezvous_ptr); diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h index 6aa9824786d..a6e4d3d6b43 100644 --- a/src/rt/rust_chan.h +++ b/src/rt/rust_chan.h @@ -5,7 +5,8 @@ class rust_chan : public rc_base, public task_owned, public rust_cond { public: - rust_chan(rust_task *task, maybe_proxy *port); + rust_chan(rust_task *task, maybe_proxy *port, size_t unit_sz); + ~rust_chan(); rust_task *task; diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp index a1207ec738a..323e9c3981b 100644 --- a/src/rt/rust_dom.cpp +++ b/src/rt/rust_dom.cpp @@ -4,8 +4,9 @@ template class ptr_vec; -rust_dom::rust_dom(rust_srv *srv, rust_crate const *root_crate, - const char *name) : +rust_dom::rust_dom(rust_kernel *kernel, + rust_message_queue *message_queue, rust_srv *srv, + rust_crate const *root_crate, const char *name) : interrupt_flag(0), root_crate(root_crate), _log(srv, this), @@ -20,7 +21,8 @@ rust_dom::rust_dom(rust_srv *srv, rust_crate const *root_crate, root_task(NULL), curr_task(NULL), rval(0), - _kernel(srv->kernel) + kernel(kernel), + message_queue(message_queue) { logptr("new dom", (uintptr_t)this); isaac_init(this, &rctx); @@ -42,33 +44,9 @@ del_all_tasks(rust_dom *dom, ptr_vec *v) { } } -void -rust_dom::delete_proxies() { - rust_task *task; - rust_proxy *task_proxy; - while (_task_proxies.pop(&task, &task_proxy)) { - log(rust_log::TASK, - "deleting proxy 0x%" PRIxPTR " in dom %s 0x%" PRIxPTR, - task_proxy, task_proxy->dom->name, task_proxy->dom); - delete task_proxy; - } - - rust_port *port; - rust_proxy *port_proxy; - while (_port_proxies.pop(&port, &port_proxy)) { - log(rust_log::TASK, - "deleting proxy 0x%" PRIxPTR " in dom %s 0x%" PRIxPTR, - port_proxy, port_proxy->dom->name, port_proxy->dom); - delete port_proxy; - } -} - rust_dom::~rust_dom() { log(rust_log::MEM | rust_log::DOM, "~rust_dom %s @0x%" PRIxPTR, name, (uintptr_t)this); - - log(rust_log::TASK, "deleting all proxies"); - delete_proxies(); log(rust_log::TASK, "deleting all running tasks"); del_all_tasks(this, &running_tasks); log(rust_log::TASK, "deleting all blocked tasks"); @@ -78,8 +56,9 @@ rust_dom::~rust_dom() { #ifndef __WIN32__ pthread_attr_destroy(&attr); #endif - while (caches.length()) + while (caches.length()) { delete caches.pop(); + } } void @@ -275,70 +254,21 @@ rust_dom::reap_dead_tasks() { } } -/** - * Enqueues a message in this domain's incoming message queue. It's the - * responsibility of the receiver to free the message once it's processed. - */ -void rust_dom::send_message(rust_message *message) { - log(rust_log::COMM, "==> enqueueing \"%s\" 0x%" PRIxPTR - " in queue 0x%" PRIxPTR - " in domain 0x%" PRIxPTR, - message->label, - message, - &_incoming_message_queue, - this); - _incoming_message_queue.enqueue(message); -} - /** * Drains and processes incoming pending messages. */ void rust_dom::drain_incoming_message_queue(bool process) { rust_message *message; - while (_incoming_message_queue.dequeue(&message)) { - log(rust_log::COMM, "<== processing incoming message \"%s\" 0x%" - PRIxPTR, message->label, message); + while (message_queue->dequeue(&message)) { + log(rust_log::COMM, "<== receiving \"%s\" " PTR, + message->label, message); if (process) { message->process(); } - message->~rust_message(); - this->synchronized_region.free(message); + delete message; } } -rust_proxy * -rust_dom::get_task_proxy(rust_task *task) { - rust_proxy *proxy = NULL; - if (_task_proxies.get(task, &proxy)) { - return proxy; - } - log(rust_log::COMM, "no proxy for %s @0x%" PRIxPTR, task->name, task); - proxy = new (this) rust_proxy (this, task, false); - _task_proxies.put(task, proxy); - return proxy; -} - -/** - * Gets a proxy for this port. - * - * TODO: This method needs to be synchronized since it's usually called - * during upcall_clone_chan in a different thread. However, for now - * since this usually happens before the thread actually starts, - * we may get lucky without synchronizing. - * - */ -rust_proxy * -rust_dom::get_port_proxy_synchronized(rust_port *port) { - rust_proxy *proxy = NULL; - if (_port_proxies.get(port, &proxy)) { - return proxy; - } - log(rust_log::COMM, "no proxy for 0x%" PRIxPTR, port); - proxy = new (this) rust_proxy (this, port, false); - _port_proxies.put(port, proxy); - return proxy; -} - /** * Schedules a running task for execution. Only running tasks can be * activated. Blocked tasks have to be unblocked before they can be @@ -362,31 +292,6 @@ rust_dom::schedule_task() { return NULL; } -/** - * Checks for simple deadlocks. - */ -bool -rust_dom::is_deadlocked() { - if (_kernel->domains.length() != 1) { - // We cannot tell if we are deadlocked if other domains exists. - return false; - } - - if (running_tasks.length() != 0) { - // We are making progress and therefore we are not deadlocked. - return false; - } - - if (_incoming_message_queue.is_empty() && blocked_tasks.length() > 0) { - // We have no messages to process, no running tasks to schedule - // and some blocked tasks therefore we are likely in a deadlock. - _kernel->log_all_domain_state(); - return true; - } - - return false; -} - void rust_dom::log_state() { if (!running_tasks.is_empty()) { @@ -439,7 +344,7 @@ rust_dom::start_main_loop() logptr("exit-task glue", root_crate->get_exit_task_glue()); while (n_live_tasks() > 0) { - A(this, is_deadlocked() == false, "deadlock"); + A(this, kernel->is_deadlocked() == false, "deadlock"); drain_incoming_message_queue(true); @@ -496,7 +401,7 @@ rust_dom::start_main_loop() log(rust_log::DOM, "terminated scheduler loop, reaping dead tasks ..."); while (dead_tasks.length() > 0) { - if (_incoming_message_queue.is_empty()) { + if (message_queue->is_empty()) { log(rust_log::DOM, "waiting for %d dead tasks to become dereferenced, " "scheduler yielding ...", diff --git a/src/rt/rust_dom.h b/src/rt/rust_dom.h index 5c9c29533c4..3f90bb67915 100644 --- a/src/rt/rust_dom.h +++ b/src/rt/rust_dom.h @@ -1,7 +1,7 @@ #ifndef RUST_DOM_H #define RUST_DOM_H -struct rust_dom +struct rust_dom : public kernel_owned, rc_base { // Fields known to the compiler: uintptr_t interrupt_flag; @@ -27,14 +27,14 @@ struct rust_dom rust_task *curr_task; int rval; - rust_kernel *_kernel; + rust_kernel *kernel; int32_t list_index; hash_map *> _task_proxies; hash_map *> _port_proxies; // Incoming messages from other domains. - lock_free_queue _incoming_message_queue; + rust_message_queue *message_queue; #ifndef __WIN32__ pthread_attr_t attr; @@ -42,9 +42,10 @@ struct rust_dom // Only a pointer to 'name' is kept, so it must live as long as this // domain. - rust_dom(rust_srv *srv, rust_crate const *root_crate, const char *name); + rust_dom(rust_kernel *kernel, + rust_message_queue *message_queue, rust_srv *srv, + rust_crate const *root_crate, const char *name); ~rust_dom(); - void activate(rust_task *task); void log(rust_task *task, uint32_t logbit, char const *fmt, ...); void log(uint32_t logbit, char const *fmt, ...); @@ -63,11 +64,7 @@ struct rust_dom void free(void *mem); void free(void *mem, memory_region::memory_region_type type); - void send_message(rust_message *message); void drain_incoming_message_queue(bool process); - rust_proxy *get_task_proxy(rust_task *task); - void delete_proxies(); - rust_proxy *get_port_proxy_synchronized(rust_port *port); #ifdef __WIN32__ void win32_require(LPCTSTR fn, BOOL ok); @@ -81,7 +78,7 @@ struct rust_dom void reap_dead_tasks(); rust_task *schedule_task(); - bool is_deadlocked(); + int start_main_loop(); void log_state(); diff --git a/src/rt/rust_internal.h b/src/rt/rust_internal.h index 60d86f6116c..c327f8c055c 100644 --- a/src/rt/rust_internal.h +++ b/src/rt/rust_internal.h @@ -72,6 +72,11 @@ struct frame_glue_fns; #define A(dom, e, s, ...) ((e) ? (void)0 : \ (dom)->srv->fatal(#e, __FILE__, __LINE__, s, ## __VA_ARGS__)) +#define K(srv, e, s, ...) ((e) ? (void)0 : \ + srv->fatal(#e, __FILE__, __LINE__, s, ## __VA_ARGS__)) + +#define PTR "0x%" PRIxPTR + // This drives our preemption scheme. static size_t const TIME_SLICE_IN_MS = 10; @@ -96,25 +101,29 @@ template struct rc_base { }; template struct dom_owned { - rust_dom *get_dom() const { - return ((T*)this)->dom; - } - void operator delete(void *ptr) { ((T *)ptr)->dom->free(ptr); } }; template struct task_owned { - rust_dom *get_dom() const { - return ((T *)this)->task->dom; - } - void operator delete(void *ptr) { ((T *)ptr)->task->dom->free(ptr); } }; +template struct kernel_owned { + void operator delete(void *ptr) { + ((T *)ptr)->kernel->free(ptr); + } +}; + +template struct region_owned { + void operator delete(void *ptr) { + ((T *)ptr)->region->free(ptr); + } +}; + // A cond(ition) is something we can block on. This can be a channel // (writing), a port (reading) or a task (waiting). @@ -152,8 +161,8 @@ public: #include "rust_srv.h" #include "rust_log.h" #include "rust_proxy.h" -#include "rust_message.h" #include "rust_kernel.h" +#include "rust_message.h" #include "rust_dom.h" #include "memory.h" @@ -552,6 +561,9 @@ struct gc_alloc { #include "rust_chan.h" #include "rust_port.h" +#include "test/rust_test_harness.h" +#include "test/rust_test_util.h" + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index b82e46152fa..9ea1f2ebee5 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -1,25 +1,81 @@ #include "rust_internal.h" rust_kernel::rust_kernel(rust_srv *srv) : - _region(srv->local_region), + _region(&srv->local_region), _log(srv, NULL), - domains(srv->local_region), - message_queues(srv->local_region) { + _srv(srv), + _interrupt_kernel_loop(FALSE), + domains(&srv->local_region), + message_queues(&srv->local_region) { // Nop. } -rust_kernel::~rust_kernel() { - // Nop. -} - -void -rust_kernel::register_domain(rust_dom *dom) { +rust_handle * +rust_kernel::create_domain(const rust_crate *crate, const char *name) { + rust_message_queue *message_queue = + new (this) rust_message_queue(_srv, this); + rust_srv *srv = _srv->clone(); + rust_dom *dom = + new (this) rust_dom(this, message_queue, srv, crate, name); + rust_handle *handle = get_dom_handle(dom); + message_queue->associate(handle); domains.append(dom); + message_queues.append(message_queue); + return handle; } void -rust_kernel::deregister_domain(rust_dom *dom) { +rust_kernel::destroy_domain(rust_dom *dom) { + log(rust_log::KERN, "deleting domain: " PTR ", index: %d, domains %d", + dom, dom->list_index, domains.length()); domains.remove(dom); + dom->message_queue->disassociate(); + rust_srv *srv = dom->srv; + delete dom; + delete srv; +} + +rust_handle * +rust_kernel::get_dom_handle(rust_dom *dom) { + rust_handle *handle = NULL; + if (_dom_handles.get(dom, &handle)) { + return handle; + } + handle = new (this) rust_handle(this, dom->message_queue, dom); + _dom_handles.put(dom, handle); + return handle; +} + +rust_handle * +rust_kernel::get_task_handle(rust_task *task) { + rust_handle *handle = NULL; + if (_task_handles.get(task, &handle)) { + return handle; + } + handle = new (this) rust_handle(this, task->dom->message_queue, + task); + _task_handles.put(task, handle); + return handle; +} + +rust_handle * +rust_kernel::get_port_handle(rust_port *port) { + rust_handle *handle = NULL; + if (_port_handles.get(port, &handle)) { + return handle; + } + handle = new (this) rust_handle(this, + port->task->dom->message_queue, port); + _port_handles.put(port, handle); + return handle; +} + +void +rust_kernel::join_all_domains() { + // TODO: Perhaps we can do this a little smarter. Just spin wait for now. + while (domains.length() > 0) { + sync::yield(); + } } void @@ -30,6 +86,36 @@ rust_kernel::log_all_domain_state() { } } +/** + * Checks for simple deadlocks. + */ +bool +rust_kernel::is_deadlocked() { + return false; +// _lock.lock(); +// if (domains.length() != 1) { +// // We can only check for deadlocks when only one domain exists. +// return false; +// } +// +// if (domains[0]->running_tasks.length() != 0) { +// // We are making progress and therefore we are not deadlocked. +// return false; +// } +// +// if (domains[0]->message_queue->is_empty() && +// domains[0]->blocked_tasks.length() > 0) { +// // We have no messages to process, no running tasks to schedule +// // and some blocked tasks therefore we are likely in a deadlock. +// log_all_domain_state(); +// return true; +// } +// +// _lock.unlock(); +// return false; +} + + void rust_kernel::log(uint32_t type_bits, char const *fmt, ...) { char buf[256]; @@ -41,3 +127,69 @@ rust_kernel::log(uint32_t type_bits, char const *fmt, ...) { va_end(args); } } + +void +rust_kernel::start_kernel_loop() { + while (_interrupt_kernel_loop == false) { + message_queues.global.lock(); + for (size_t i = 0; i < message_queues.length(); i++) { + rust_message_queue *queue = message_queues[i]; + if (queue->is_associated() == false) { + rust_message *message = NULL; + while (queue->dequeue(&message)) { + message->kernel_process(); + delete message; + } + } + } + message_queues.global.unlock(); + } +} + +void +rust_kernel::run() { + log(rust_log::KERN, "started kernel loop"); + start_kernel_loop(); + log(rust_log::KERN, "finished kernel loop"); +} + +rust_kernel::~rust_kernel() { + K(_srv, domains.length() == 0, + "Kernel has %d live domain(s), join all domains before killing " + "the kernel.", domains.length()); + + // If the kernel loop is running, interrupt it, join and exit. + if (is_running()) { + _interrupt_kernel_loop = true; + join(); + } + + free_handles(_task_handles); + free_handles(_port_handles); + free_handles(_dom_handles); + + rust_message_queue *queue = NULL; + while (message_queues.pop(&queue)) { + K(_srv, queue->is_empty(), "Kernel message queue should be empty " + "before killing the kernel."); + delete queue; + } +} + +void * +rust_kernel::malloc(size_t size) { + return _region->malloc(size); +} + +void rust_kernel::free(void *mem) { + _region->free(mem); +} + +template void +rust_kernel::free_handles(hash_map* > &map) { + T* key; + rust_handle *value; + while (map.pop(&key, &value)) { + delete value; + } +} diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 478d030c177..902a9a2f3b4 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -2,20 +2,113 @@ #define RUST_KERNEL_H /** - * A global object shared by all domains. + * A handle object for Rust tasks. We need a reference to the message queue + * of the referent's domain which we can safely hang on to since it's a + * kernel object. We use the referent reference as a label we stash in + * messages sent via this proxy. */ -class rust_kernel { - memory_region &_region; - rust_log _log; + +class rust_kernel; + +template class +rust_handle : + public rust_cond, + public rc_base >, + public kernel_owned > { public: + rust_kernel *kernel; + rust_message_queue *message_queue; + T *_referent; + T * referent() { + return _referent; + } + rust_handle(rust_kernel *kernel, + rust_message_queue *message_queue, + T *referent) : + kernel(kernel), + message_queue(message_queue), + _referent(referent) { + // Nop. + } +}; + +/** + * A global object shared by all thread domains. Most of the data structures + * in this class are synchronized since they are accessed from multiple + * threads. + */ +class rust_kernel : public rust_thread { + memory_region *_region; + rust_log _log; + rust_srv *_srv; + + /** + * Task proxy objects are kernel owned handles to Rust objects. + */ + hash_map *> _task_handles; + hash_map *> _port_handles; + hash_map *> _dom_handles; + + template void free_handles(hash_map* > &map); + + void run(); + void start_kernel_loop(); + bool volatile _interrupt_kernel_loop; + + /** + * Lock for the message queue list, so we can safely + */ + spin_lock _message_queues_lock; + +public: + + /** + * List of domains that are currently executing. + */ synchronized_indexed_list domains; - synchronized_indexed_list > message_queues; + + /** + * Message queues are kernel objects and are associated with domains. + * Their lifetime is not bound to the lifetime of a domain and in fact + * live on after their associated domain has died. This way we can safely + * communicate with domains that may have died. + * + * Although the message_queues list is synchronized, each individual + * message queue is lock free. + */ + synchronized_indexed_list message_queues; + + rust_handle *get_dom_handle(rust_dom *dom); + rust_handle *get_task_handle(rust_task *task); + rust_handle *get_port_handle(rust_port *port); + rust_kernel(rust_srv *srv); - void register_domain(rust_dom *dom); - void deregister_domain(rust_dom *dom); + + rust_handle *create_domain(rust_crate const *root_crate, + const char *name); + void destroy_domain(rust_dom *dom); + + bool is_deadlocked(); + + /** + * Blocks until all domains have terminated. + */ + void join_all_domains(); + void log_all_domain_state(); void log(uint32_t type_bits, char const *fmt, ...); virtual ~rust_kernel(); + + void *malloc(size_t size); + void free(void *mem); }; +inline void *operator new(size_t size, rust_kernel *kernel) { + return kernel->malloc(size); +} + +inline void *operator new(size_t size, rust_kernel &kernel) { + return kernel.malloc(size); +} + #endif /* RUST_KERNEL_H */ diff --git a/src/rt/rust_message.cpp b/src/rt/rust_message.cpp index b6b7fbf07e0..dab13c091e2 100644 --- a/src/rt/rust_message.cpp +++ b/src/rt/rust_message.cpp @@ -2,36 +2,35 @@ #include "rust_message.h" rust_message:: -rust_message(const char* label, rust_task *source, rust_task *target) : - label(label), - _dom(target->dom), - _source(source), - _target(target) { +rust_message(memory_region *region, const char* label, + rust_handle *source, rust_handle *target) : + label(label), region(region), _source(source), _target(target) { } rust_message::~rust_message() { + // Nop. } void rust_message::process() { - I(_dom, false); + // Nop. } -rust_proxy * -rust_message::get_source_proxy() { - return _dom->get_task_proxy(_source); +void rust_message::kernel_process() { + // Nop. } notify_message:: -notify_message(notification_type type, const char* label, - rust_task *source, - rust_task *target) : - rust_message(label, source, target), type(type) { +notify_message(memory_region *region, notification_type type, + const char* label, rust_handle *source, + rust_handle *target) : + rust_message(region, label, source, target), type(type) { } data_message:: -data_message(uint8_t *buffer, size_t buffer_sz, const char* label, - rust_task *source, rust_task *target, rust_port *port) : - rust_message(label, source, target), +data_message(memory_region *region, uint8_t *buffer, size_t buffer_sz, + const char* label, rust_handle *source, + rust_handle *port) : + rust_message(region, label, source, NULL), _buffer_sz(buffer_sz), _port(port) { _buffer = (uint8_t *)malloc(buffer_sz); memcpy(_buffer, buffer, buffer_sz); @@ -47,54 +46,79 @@ data_message::~data_message() { * source task. */ void notify_message:: -send(notification_type type, const char* label, rust_task *source, - rust_proxy *target) { - rust_task *target_task = target->delegate(); - rust_dom *target_domain = target_task->dom; +send(notification_type type, const char* label, + rust_handle *source, rust_handle *target) { + memory_region *region = &target->message_queue->region; notify_message *message = - new (target_domain, memory_region::SYNCHRONIZED) notify_message(type, - label, source, target_task); - target_domain->send_message(message); + new (region) notify_message(region, type, label, source, target); +// target->referent()->log(rust_log::COMM, +// "==> sending \"%s\" " PTR " in queue " PTR, +// label, message, &target->message_queue); + target->message_queue->enqueue(message); } void notify_message::process() { - rust_task *task = _target; + rust_task *task = _target->referent(); switch (type) { case KILL: - task->ref_count--; + // task->ref_count--; task->kill(); break; case JOIN: { if (task->dead() == false) { - task->tasks_waiting_to_join.append(get_source_proxy()); + rust_proxy *proxy = new rust_proxy(_source); + task->tasks_waiting_to_join.append(proxy); } else { - send(WAKEUP, "wakeup", task, get_source_proxy()); + send(WAKEUP, "wakeup", _target, _source); } break; } case WAKEUP: - task->wakeup(get_source_proxy()->delegate()); + task->wakeup(_source); + break; + } +} + +void notify_message::kernel_process() { + switch(type) { + case WAKEUP: + case KILL: + // Ignore. + break; + case JOIN: + send(WAKEUP, "wakeup", _target, _source); break; } } void data_message:: -send(uint8_t *buffer, size_t buffer_sz, const char* label, rust_task *source, - rust_proxy *target, rust_proxy *port) { +send(uint8_t *buffer, size_t buffer_sz, const char* label, + rust_handle *source, rust_handle *port) { - rust_task *target_task = target->delegate(); - rust_port *target_port = port->delegate(); - rust_dom *target_domain = target_task->dom; + memory_region *region = &port->message_queue->region; data_message *message = - new (target_domain, memory_region::SYNCHRONIZED) - data_message(buffer, buffer_sz, label, source, - target_task, target_port); - target_domain->send_message(message); + new (region) data_message(region, buffer, buffer_sz, label, source, + port); + source->referent()->log(rust_log::COMM, + "==> sending \"%s\"" PTR " in queue " PTR, + label, message, &port->message_queue); + port->message_queue->enqueue(message); } void data_message::process() { - _port->remote_channel->send(_buffer); - _target->log(rust_log::COMM, "<=== received data via message ==="); + _port->referent()->remote_channel->send(_buffer); + // _target->referent()->log(rust_log::COMM, + // "<=== received data via message ==="); +} + +void data_message::kernel_process() { + +} + +rust_message_queue::rust_message_queue(rust_srv *srv, rust_kernel *kernel) : + region (srv, true), kernel(kernel), + dom_handle(NULL) { + // Nop. } // diff --git a/src/rt/rust_message.h b/src/rt/rust_message.h index 7aee6d9f6d0..c342e3e411a 100644 --- a/src/rt/rust_message.h +++ b/src/rt/rust_message.h @@ -9,28 +9,31 @@ /** * Abstract base class for all message types. */ -class rust_message { +class rust_message : public region_owned { public: const char* label; + memory_region *region; private: - rust_dom *_dom; - rust_task *_source; protected: - rust_task *_target; + rust_handle *_source; + rust_handle *_target; public: - rust_message(const char* label, rust_task *source, rust_task *target); + rust_message(memory_region *region, + const char* label, + rust_handle *source, + rust_handle *target); + virtual ~rust_message(); /** - * We can only access the source task through a proxy, so create one - * on demand if we need it. - */ - rust_proxy *get_source_proxy(); - - /** - * Processes the message in the target domain thread. + * Processes the message in the target domain. */ virtual void process(); + + /** + * Processes the message in the kernel. + */ + virtual void kernel_process(); }; /** @@ -44,17 +47,19 @@ public: const notification_type type; - notify_message(notification_type type, const char* label, - rust_task *source, rust_task *target); + notify_message(memory_region *region, notification_type type, + const char* label, rust_handle *source, + rust_handle *target); void process(); + void kernel_process(); /** * This code executes in the sending domain's thread. */ static void - send(notification_type type, const char* label, rust_task *source, - rust_proxy *target); + send(notification_type type, const char* label, + rust_handle *source, rust_handle *target); }; /** @@ -64,21 +69,51 @@ class data_message : public rust_message { private: uint8_t *_buffer; size_t _buffer_sz; - rust_port *_port; -public: + rust_handle *_port; + +public: + data_message(memory_region *region, uint8_t *buffer, size_t buffer_sz, + const char* label, rust_handle *source, + rust_handle *port); - data_message(uint8_t *buffer, size_t buffer_sz, const char* label, - rust_task *source, rust_task *target, rust_port *port); virtual ~data_message(); void process(); + void kernel_process(); /** * This code executes in the sending domain's thread. */ static void send(uint8_t *buffer, size_t buffer_sz, const char* label, - rust_task *source, rust_proxy *target, - rust_proxy *port); + rust_handle *source, rust_handle *port); +}; + +class rust_message_queue : public lock_free_queue, + public kernel_owned { +public: + memory_region region; + rust_kernel *kernel; + rust_handle *dom_handle; + int32_t list_index; + rust_message_queue(rust_srv *srv, rust_kernel *kernel); + + void associate(rust_handle *dom_handle) { + this->dom_handle = dom_handle; + } + + /** + * The Rust domain relinquishes control to the Rust kernel. + */ + void disassociate() { + this->dom_handle = NULL; + } + + /** + * Checks if a Rust domain is responsible for draining the message queue. + */ + bool is_associated() { + return this->dom_handle != NULL; + } }; // diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp index 458283fb394..82054687f2f 100644 --- a/src/rt/rust_port.cpp +++ b/src/rt/rust_port.cpp @@ -2,15 +2,15 @@ #include "rust_port.h" rust_port::rust_port(rust_task *task, size_t unit_sz) : - maybe_proxy(this), task(task), unit_sz(unit_sz), - writers(task->dom), chans(task->dom) { + maybe_proxy(this), task(task), + unit_sz(unit_sz), writers(task->dom), chans(task->dom) { task->log(rust_log::MEM | rust_log::COMM, "new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%" PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this); // Allocate a remote channel, for remote channel data. - remote_channel = new (task->dom) rust_chan(task, this); + remote_channel = new (task->dom) rust_chan(task, this, unit_sz); } rust_port::~rust_port() { diff --git a/src/rt/rust_proxy.h b/src/rt/rust_proxy.h index bf12e1d5f0e..2b5e820da10 100644 --- a/src/rt/rust_proxy.h +++ b/src/rt/rust_proxy.h @@ -2,57 +2,70 @@ #define RUST_PROXY_H /** - * A proxy object is a wrapper around other Rust objects. One use of the proxy - * object is to mitigate access between tasks in different thread domains. + * A proxy object is a wrapper for remote objects. Proxy objects are domain + * owned and provide a way distinguish between local and remote objects. */ template struct rust_proxy; + /** * The base class of all objects that may delegate. */ template struct maybe_proxy : public rc_base, public rust_cond { protected: - T *_delegate; + T *_referent; public: - maybe_proxy(T * delegate) : _delegate(delegate) { + maybe_proxy(T *referent) : _referent(referent) { + // Nop. + } + T *referent() { + return (T *)_referent; } - T *delegate() { - return _delegate; - } + bool is_proxy() { - return _delegate != this; + return _referent != this; } + rust_proxy *as_proxy() { return (rust_proxy *) this; } - T *as_delegate() { - I(_delegate->get_dom(), !is_proxy()); + + T *as_referent() { return (T *) this; } }; +template class rust_handle; + /** * A proxy object that delegates to another. */ template struct -rust_proxy : public maybe_proxy, - public dom_owned > { +rust_proxy : public maybe_proxy { private: bool _strong; + rust_handle *_handle; public: - rust_dom *dom; - rust_proxy(rust_dom *dom, T *delegate, bool strong) : - maybe_proxy (delegate), _strong(strong), dom(dom) { - this->dom->log(rust_log::COMM, - "new proxy: 0x%" PRIxPTR " => 0x%" PRIxPTR, this, delegate); - if (strong) { - delegate->ref(); - } + rust_proxy(rust_handle *handle) : + maybe_proxy (NULL), _strong(FALSE), _handle(handle) { + // Nop. + } + + rust_proxy(T *referent) : + maybe_proxy (referent), _strong(FALSE), _handle(NULL) { + // Nop. + } + + rust_handle *handle() { + return _handle; } }; +class rust_message_queue; +class rust_task; + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_srv.cpp b/src/rt/rust_srv.cpp index d92235623de..a5fcde9b3a0 100644 --- a/src/rt/rust_srv.cpp +++ b/src/rt/rust_srv.cpp @@ -7,13 +7,14 @@ rust_srv::rust_srv() : local_region(this, false), - synchronized_region(this, true), - kernel(new rust_kernel(this)) { + synchronized_region(this, true) { // Nop. } rust_srv::~rust_srv() { - // Nop. +// char msg[1024]; +// snprintf(msg, sizeof(msg), "~rust_srv %" PRIxPTR, (uintptr_t) this); +// log(msg); } void @@ -74,3 +75,8 @@ rust_srv::warning(char const *expression, expression, file, (int)line, buf); log(msg); } + +rust_srv * +rust_srv::clone() { + return new rust_srv(); +} diff --git a/src/rt/rust_srv.h b/src/rt/rust_srv.h index ab646ae6a8f..465b03b7ecc 100644 --- a/src/rt/rust_srv.h +++ b/src/rt/rust_srv.h @@ -7,7 +7,6 @@ class rust_srv { public: memory_region local_region; memory_region synchronized_region; - rust_kernel *kernel; virtual void log(char const *msg); virtual void fatal(char const *expression, char const *file, @@ -24,6 +23,7 @@ public: virtual void *realloc(void *, size_t); rust_srv(); virtual ~rust_srv(); + virtual rust_srv *clone(); }; #endif /* RUST_SRV_H */ diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 08d5974ecbb..408996f69db 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -404,9 +404,10 @@ rust_task::notify_tasks_waiting_to_join() { tasks_waiting_to_join.pop(&waiting_task); if (waiting_task->is_proxy()) { notify_message::send(notify_message::WAKEUP, "wakeup", - this, waiting_task->as_proxy()); + get_handle(), waiting_task->as_proxy()->handle()); + delete waiting_task; } else { - rust_task *task = waiting_task->delegate(); + rust_task *task = waiting_task->referent(); if (task->dead() == false) { task->wakeup(this); } @@ -563,8 +564,7 @@ rust_task::transition(ptr_vec *src, ptr_vec *dst) } void -rust_task::block(rust_cond *on, const char* name) -{ +rust_task::block(rust_cond *on, const char* name) { log(rust_log::TASK, "Blocking on 0x%" PRIxPTR ", cond: 0x%" PRIxPTR, (uintptr_t) on, (uintptr_t) cond); A(dom, cond == NULL, "Cannot block an already blocked task."); @@ -631,6 +631,11 @@ rust_task::log(uint32_t type_bits, char const *fmt, ...) { } } +rust_handle * +rust_task::get_handle() { + return dom->kernel->get_task_handle(this); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index 9a4e0c7bd7a..383707f0aeb 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -50,6 +50,7 @@ rust_task : public maybe_proxy, rust_task(rust_dom *dom, rust_task *spawner, const char *name); + ~rust_task(); void start(uintptr_t exit_task_glue, @@ -110,6 +111,8 @@ rust_task : public maybe_proxy, // Notify tasks waiting for us that we are about to die. void notify_tasks_waiting_to_join(); + rust_handle * get_handle(); + uintptr_t get_fp(); uintptr_t get_previous_fp(uintptr_t fp); frame_glue_fns *get_frame_glue_fns(uintptr_t fp); diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 0e3961bc24c..4b5fa1d631b 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -23,20 +23,6 @@ (task)->dom->get_log().indent(); #endif -void -log_task_state(rust_task *task, maybe_proxy *target) { - rust_task *delegate = target->delegate(); - if (target->is_proxy()) { - task->log(rust_log::TASK, - "remote task: 0x%" PRIxPTR ", ref_count: %d state: %s", - delegate, delegate->ref_count, delegate->state_str()); - } else { - task->log(rust_log::TASK, - "local task: 0x%" PRIxPTR ", ref_count: %d state: %s", - delegate, delegate->ref_count, delegate->state_str()); - } -} - extern "C" CDECL char const * str_buf(rust_task *task, rust_str *s); @@ -104,7 +90,7 @@ upcall_new_chan(rust_task *task, rust_port *port) { "task=0x%" PRIxPTR " (%s), port=0x%" PRIxPTR ")", (uintptr_t) task, task->name, port); I(dom, port); - return new (dom) rust_chan(task, port); + return new (dom) rust_chan(task, port, port->unit_sz); } /** @@ -135,43 +121,55 @@ void upcall_del_chan(rust_task *task, rust_chan *chan) { "Channel's ref count should be zero."); if (chan->is_associated()) { - // We're trying to delete a channel that another task may be reading - // from. We have two options: - // - // 1. We can flush the channel by blocking in upcall_flush_chan() - // and resuming only when the channel is flushed. The problem - // here is that we can get ourselves in a deadlock if the parent - // task tries to join us. - // - // 2. We can leave the channel in a "dormnat" state by not freeing - // it and letting the receiver task delete it for us instead. - if (chan->buffer.is_empty() == false) { - return; + if (chan->port->is_proxy()) { + // Here is a good place to delete the port proxy we allocated + // in upcall_clone_chan. + rust_proxy *proxy = chan->port->as_proxy(); + chan->disassociate(); + delete proxy; + } else { + // We're trying to delete a channel that another task may be + // reading from. We have two options: + // + // 1. We can flush the channel by blocking in upcall_flush_chan() + // and resuming only when the channel is flushed. The problem + // here is that we can get ourselves in a deadlock if the + // parent task tries to join us. + // + // 2. We can leave the channel in a "dormnat" state by not freeing + // it and letting the receiver task delete it for us instead. + if (chan->buffer.is_empty() == false) { + return; + } + chan->disassociate(); } - chan->disassociate(); } delete chan; } /** * Clones a channel and stores it in the spawnee's domain. Each spawned task - * has it's own copy of the channel. + * has its own copy of the channel. */ extern "C" CDECL rust_chan * -upcall_clone_chan(rust_task *task, - maybe_proxy *target, +upcall_clone_chan(rust_task *task, maybe_proxy *target, rust_chan *chan) { LOG_UPCALL_ENTRY(task); - task->log(rust_log::UPCALL | rust_log::COMM, - "target: 0x%" PRIxPTR ", chan: 0x%" PRIxPTR, - target, chan); - rust_task *target_task = target->delegate(); + size_t unit_sz = chan->buffer.unit_sz; maybe_proxy *port = chan->port; - if (target->is_proxy()) { - port = target_task->dom->get_port_proxy_synchronized( - chan->port->as_delegate()); + rust_task *target_task = NULL; + if (target->is_proxy() == false) { + port = chan->port; + target_task = target->referent(); + } else { + rust_handle *handle = + task->dom->kernel->get_port_handle(port->as_referent()); + maybe_proxy *proxy = new rust_proxy (handle); + task->log(rust_log::MEM, "new proxy: " PTR, proxy); + port = proxy; + target_task = target->as_proxy()->handle()->referent(); } - return new (target_task->dom) rust_chan(target_task, port); + return new (target_task->dom) rust_chan(target_task, port, unit_sz); } extern "C" CDECL void @@ -193,17 +191,15 @@ upcall_sleep(rust_task *task, size_t time_in_us) { extern "C" CDECL void upcall_join(rust_task *task, maybe_proxy *target) { LOG_UPCALL_ENTRY(task); - rust_task *target_task = target->delegate(); - task->log(rust_log::UPCALL | rust_log::COMM, - "target: 0x%" PRIxPTR ", task: %s @0x%" PRIxPTR, - target, target_task->name, target_task); if (target->is_proxy()) { - notify_message:: - send(notify_message::JOIN, "join", task, target->as_proxy()); - task->block(target_task, "joining remote task"); + rust_handle *task_handle = target->as_proxy()->handle(); + notify_message::send(notify_message::JOIN, "join", + task->get_handle(), task_handle); + task->block(task_handle, "joining remote task"); task->yield(2); } else { + rust_task *target_task = target->referent(); // If the other task is already dying, we don't have to wait for it. if (target_task->dead() == false) { target_task->tasks_waiting_to_join.push(task); @@ -221,10 +217,6 @@ upcall_join(rust_task *task, maybe_proxy *target) { extern "C" CDECL void upcall_send(rust_task *task, rust_chan *chan, void *sptr) { LOG_UPCALL_ENTRY(task); - task->log(rust_log::UPCALL | rust_log::COMM, - "chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d", - (uintptr_t) chan, (uintptr_t) sptr, - chan->port->delegate()->unit_sz); chan->send(sptr); task->log(rust_log::COMM, "=== sent data ===>"); } @@ -269,21 +261,14 @@ upcall_fail(rust_task *task, extern "C" CDECL void upcall_kill(rust_task *task, maybe_proxy *target) { LOG_UPCALL_ENTRY(task); - log_task_state(task, target); - rust_task *target_task = target->delegate(); - - task->log(rust_log::UPCALL | rust_log::TASK, - "kill task %s @0x%" PRIxPTR ", ref count %d", - target_task->name, target_task, - target_task->ref_count); - if (target->is_proxy()) { notify_message:: - send(notify_message::KILL, "kill", task, target->as_proxy()); + send(notify_message::KILL, "kill", task->get_handle(), + target->as_proxy()->handle()); // The proxy ref_count dropped to zero, delete it here. delete target->as_proxy(); } else { - target_task->kill(); + target->referent()->kill(); } } @@ -554,25 +539,6 @@ upcall_get_type_desc(rust_task *task, return td; } -#if defined(__WIN32__) -static DWORD WINAPI rust_thread_start(void *ptr) -#elif defined(__GNUC__) -static void *rust_thread_start(void *ptr) -#else -#error "Platform not supported" -#endif -{ - // We were handed the domain we are supposed to run. - rust_dom *dom = (rust_dom *) ptr; - - // Start a new rust main loop for this thread. - dom->start_main_loop(); - rust_srv *srv = dom->srv; - srv->kernel->deregister_domain(dom); - delete dom; - return 0; -} - extern "C" CDECL rust_task * upcall_new_task(rust_task *spawner, const char *name) { LOG_UPCALL_ENTRY(spawner); @@ -604,54 +570,76 @@ upcall_start_task(rust_task *spawner, return task; } +/** + * Called whenever a new domain is created. + */ extern "C" CDECL maybe_proxy * upcall_new_thread(rust_task *task, const char *name) { LOG_UPCALL_ENTRY(task); - - rust_dom *old_dom = task->dom; - rust_dom *new_dom = new rust_dom(old_dom->srv, - old_dom->root_crate, - name); - old_dom->srv->kernel->register_domain(new_dom); + rust_dom *parent_dom = task->dom; + rust_kernel *kernel = parent_dom->kernel; + rust_handle *child_dom_handle = + kernel->create_domain(parent_dom->root_crate, name); + rust_handle *child_task_handle = + kernel->get_task_handle(child_dom_handle->referent()->root_task); task->log(rust_log::UPCALL | rust_log::MEM, - "upcall new_thread(%s) = dom 0x%" PRIxPTR " task 0x%" PRIxPTR, - name, new_dom, new_dom->root_task); - rust_proxy *proxy = - new (old_dom) rust_proxy(old_dom, - new_dom->root_task, true); - task->log(rust_log::UPCALL | rust_log::MEM, - "new proxy = 0x%" PRIxPTR " -> task = 0x%" PRIxPTR, - proxy, proxy->delegate()); - return proxy; + "child name: %s, child_dom_handle: " PTR + ", child_task_handle: " PTR, + name, child_dom_handle, child_task_handle); + rust_proxy *child_task_proxy = + new rust_proxy (child_task_handle); + return child_task_proxy; } +#if defined(__WIN32__) +static DWORD WINAPI rust_thread_start(void *ptr) +#elif defined(__GNUC__) +static void *rust_thread_start(void *ptr) +#else +#error "Platform not supported" +#endif +{ + // We were handed the domain we are supposed to run. + rust_dom *dom = (rust_dom *) ptr; + + // Start a new rust main loop for this thread. + dom->start_main_loop(); + + // Destroy the domain. + dom->kernel->destroy_domain(dom); + + return 0; +} + +/** + * Called after a new domain is created. Here we create a new thread and + * and start the domain main loop. + */ extern "C" CDECL maybe_proxy * -upcall_start_thread(rust_task *spawner, - maybe_proxy *root_task_proxy, +upcall_start_thread(rust_task *task, + rust_proxy *child_task_proxy, uintptr_t exit_task_glue, uintptr_t spawnee_fn, size_t callsz) { - LOG_UPCALL_ENTRY(spawner); - - rust_dom *dom = spawner->dom; - rust_task *root_task = root_task_proxy->delegate(); - dom->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK, - "upcall start_thread(exit_task_glue 0x%" PRIxPTR - ", spawnee 0x%" PRIxPTR - ", callsz %" PRIdPTR ")", exit_task_glue, spawnee_fn, callsz); - root_task->start(exit_task_glue, spawnee_fn, spawner->rust_sp, callsz); - + LOG_UPCALL_ENTRY(task); + rust_dom *parenet_dom = task->dom; + rust_handle *child_task_handle = child_task_proxy->handle(); + task->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK, + "exit_task_glue: " PTR ", spawnee_fn " PTR + ", callsz %" PRIdPTR ")", exit_task_glue, spawnee_fn, callsz); + rust_task *child_task = child_task_handle->referent(); + child_task->start(exit_task_glue, spawnee_fn, task->rust_sp, callsz); #if defined(__WIN32__) HANDLE thread; - thread = CreateThread(NULL, 0, rust_thread_start, root_task->dom, - 0, NULL); - dom->win32_require("CreateThread", thread != NULL); + thread = CreateThread(NULL, 0, rust_thread_start, child_task->dom, 0, + NULL); + parenet_dom->win32_require("CreateThread", thread != NULL); #else pthread_t thread; - pthread_create(&thread, &dom->attr, rust_thread_start, - (void *) root_task->dom); + pthread_create(&thread, &parenet_dom->attr, rust_thread_start, + (void *) child_task->dom); #endif - return root_task_proxy; + return child_task_proxy; } // diff --git a/src/rt/sync/lock_free_queue.h b/src/rt/sync/lock_free_queue.h index 5ebc70f12fe..ac0c5b046a0 100644 --- a/src/rt/sync/lock_free_queue.h +++ b/src/rt/sync/lock_free_queue.h @@ -98,8 +98,6 @@ class lock_free_queue { } public: - int32_t list_index; - lock_free_queue() { // We can only handle 64bit CAS for counted pointers, so this will // not work with 64bit pointers. diff --git a/src/rt/util/indexed_list.h b/src/rt/util/indexed_list.h index b93945cc9cb..d643a050540 100644 --- a/src/rt/util/indexed_list.h +++ b/src/rt/util/indexed_list.h @@ -28,10 +28,10 @@ public: * object inserted in this list must define a "int32_t list_index" member. */ template class indexed_list { - memory_region ®ion; + memory_region *region; array_list list; public: - indexed_list(memory_region ®ion) : region(region) {} + indexed_list(memory_region *region) : region(region) {} virtual int32_t append(T *value); virtual bool pop(T **value); virtual size_t length() {