diff --git a/src/libstd/timer.rs b/src/libstd/timer.rs index a7cb1762c4f..1c646f61d6a 100644 --- a/src/libstd/timer.rs +++ b/src/libstd/timer.rs @@ -30,7 +30,6 @@ fn delayed_send(msecs: uint, ch: comm::chan, val: T) { let timer_ptr = ptr::addr_of(timer); let hl_loop = uv::global_loop::get(); uv::hl::interact(hl_loop) {|loop_ptr| - uv::hl::ref(hl_loop, timer_ptr); let init_result = uv::ll::timer_init(loop_ptr, timer_ptr); if (init_result == 0i32) { let start_result = uv::ll::timer_start( @@ -54,9 +53,6 @@ fn delayed_send(msecs: uint, ch: comm::chan, val: T) { comm::recv(timer_done_po); // notify the caller immediately comm::send(ch, copy(val)); - // then clean up our handle - uv::hl::unref_and_close(hl_loop, timer_ptr, - delayed_send_close_cb); // uv_close for this timer has been processed comm::recv(timer_done_po); } @@ -122,6 +118,7 @@ crust fn delayed_send_cb(handle: *uv::ll::uv_timer_t, let stop_result = uv::ll::timer_stop(handle); if (stop_result == 0i32) { comm::send(timer_done_ch, ()); + uv::ll::close(handle, delayed_send_close_cb); } else { let loop_ptr = uv::ll::get_loop_for_uv_handle(handle); @@ -140,14 +137,12 @@ crust fn delayed_send_close_cb(handle: *uv::ll::uv_timer_t) unsafe { #[cfg(test)] mod test { #[test] - #[ignore] - fn test_timer_simple_sleep_test() { + fn test_gl_timer_simple_sleep_test() { sleep(1u); } #[test] - #[ignore] - fn test_timer_recv_timeout_before_time_passes() { + fn test_gl_timer_recv_timeout_before_time_passes() { let expected = rand::rng().gen_str(16u); let test_po = comm::port::(); let test_ch = comm::chan(test_po); @@ -165,8 +160,7 @@ mod test { } #[test] - #[ignore] - fn test_timer_recv_timeout_after_time_passes() { + fn test_gl_timer_recv_timeout_after_time_passes() { let expected = rand::rng().gen_str(16u); let fail_msg = rand::rng().gen_str(16u); let test_po = comm::port::(); diff --git a/src/libstd/uv_global_loop.rs b/src/libstd/uv_global_loop.rs index 53130c04218..46d73867285 100644 --- a/src/libstd/uv_global_loop.rs +++ b/src/libstd/uv_global_loop.rs @@ -6,7 +6,7 @@ import ll = uv_ll; import hl = uv_hl; import get_gl = get; -export get, get_single_task_gl, get_monitor_task_gl; +export get, get_monitor_task_gl; native mod rustrt { fn rust_uv_get_kernel_global_chan_ptr() -> *libc::uintptr_t; @@ -16,8 +16,7 @@ native mod rustrt { Race-free helper to get access to a global task where a libuv loop is running. -Use `uv::hl::interact`, `uv::hl::ref`, `uv::hl::unref` and -uv `uv::hl::unref_and_close` to do operations against the global +Use `uv::hl::interact` to do operations against the global loop that this function returns. # Return @@ -32,61 +31,10 @@ fn get() -> hl::high_level_loop { // WARNING: USE ONLY ONE get_*_task_gl fn in the scope of a process lifetime. #[doc(hidden)] fn get_monitor_task_gl() -> hl::high_level_loop { - let monitor_loop_chan = - rustrt::rust_uv_get_kernel_monitor_global_chan_ptr(); - ret spawn_global_weak_task( - monitor_loop_chan, - {|weak_exit_po, msg_po, loop_ptr, first_msg| - log(debug, "monitor gl: entering inner loop"); - unsafe { - monitor_task_loop_body(weak_exit_po, msg_po, loop_ptr, - copy(first_msg)) - } - }, - {|msg_ch| - hl::monitor_task_loop({op_chan: msg_ch}) - }); -} - -// WARNING: USE ONLY ONE get_*_task_gl fn in the scope of a process lifetime. -#[doc(hidden)] -fn get_single_task_gl() -> hl::high_level_loop { - let global_loop_chan_ptr = rustrt::rust_uv_get_kernel_global_chan_ptr(); - ret spawn_global_weak_task( - global_loop_chan_ptr, - {|weak_exit_po, msg_po, loop_ptr, first_msg| - log(debug, "single-task gl: about to enter inner loop"); - unsafe { - single_task_loop_body(weak_exit_po, msg_po, loop_ptr, - copy(first_msg)) - } - }, - {|msg_ch| - log(debug, "after priv::chan_from_global_ptr"); - unsafe { - let handle = get_global_async_handle_native_representation() - as **ll::uv_async_t; - hl::single_task_loop( - { async_handle: handle, op_chan: msg_ch }) - } - } - ); -} - -// INTERNAL API - -fn spawn_global_weak_task( - global_loop_chan_ptr: *libc::uintptr_t, - weak_task_body_cb: fn~( - comm::port<()>, - comm::port, - *libc::c_void, - hl::high_level_msg) -> bool, - after_task_spawn_cb: fn~(comm::chan) - -> hl::high_level_loop) -> hl::high_level_loop { + let monitor_loop_chan_ptr = + rustrt::rust_uv_get_kernel_global_chan_ptr(); log(debug, #fmt("ENTERING global_loop::get() loop chan: %?", - global_loop_chan_ptr)); - + monitor_loop_chan_ptr)); let builder_fn = {|| let builder = task::builder(); let opts = { @@ -101,206 +49,95 @@ fn spawn_global_weak_task( }; unsafe { log(debug, "before priv::chan_from_global_ptr"); - let msg_ch = priv::chan_from_global_ptr::( - global_loop_chan_ptr, - builder_fn) {|port| - - // the actual body of our global loop lives here - log(debug, "initialized global port task!"); - log(debug, "GLOBAL initialized global port task!"); - outer_global_loop_body(port, weak_task_body_cb); + type hl_loop_req_ch = comm::chan; + let msg_ch = priv::chan_from_global_ptr::( + monitor_loop_chan_ptr, + builder_fn) {|msg_po| + log(debug, "global monitor task starting"); + priv::weaken_task() {|weak_exit_po| + log(debug, "global monitor task is now weak"); + let hl_loop_data = spawn_libuv_weak_task(); + let hl_loop = alt hl_loop_data { + (async, msg_ch) { + hl::simple_task_loop({async_handle:async, op_chan:msg_ch}) + } + }; + loop { + log(debug, "in outer_loop..."); + let continue = either::either( + {|weak_exit| + // all normal tasks have ended, tell the + // libuv loop to tear_down, then exit + log(debug, #fmt("weak_exit_po recv'd msg: %?", + weak_exit)); + let ( a, loop_msg_ch )= hl_loop_data; + comm::send(loop_msg_ch, hl::teardown_loop); + ll::async_send(a); + false + }, {|fetch_ch| + log(debug, #fmt("hl_loop req recv'd: %?", + fetch_ch)); + comm::send(fetch_ch, copy(hl_loop)); + true + }, comm::select2(weak_exit_po, msg_po)); + if !continue { break; } + } + log(debug, "global monitor task is leaving weakend state"); + }; + log(debug, "global monitor task exiting"); }; - ret after_task_spawn_cb(msg_ch); + // once we have a chan to the monitor loop, we ask it for + // the libuv loop's async handle + let fetch_po = comm::port::(); + let fetch_ch = comm::chan(fetch_po); + comm::send(msg_ch, fetch_ch); + comm::recv(fetch_po) } } -unsafe fn outer_global_loop_body( - msg_po: comm::port, - weak_task_body_cb: fn~( - comm::port<()>, - comm::port, - *libc::c_void, - hl::high_level_msg) -> bool) { - // we're going to use a single libuv-generated loop ptr - // for the duration of the process - let loop_ptr = ll::loop_new(); - - // data structure for loop goes here.. - - // immediately weaken the task this is running in. - priv::weaken_task() {|weak_exit_po| - // when we first enter this loop, we're going - // to wait on stand-by to receive a request to - // fire-up the libuv loop - let mut continue = true; - while continue { - log(debug, "in outer_loop..."); - continue = either::either( - {|left_val| - // bail out.. - // if we catch this msg at this point, - // we should just be able to exit because - // the loop isn't active - log(debug, #fmt("weak_exit_po recv'd msg: %?", - left_val)); - false - }, {|right_val| - weak_task_body_cb(weak_exit_po, msg_po, loop_ptr, - right_val) - }, comm::select2(weak_exit_po, msg_po)); - log(debug,#fmt("GLOBAL LOOP EXITED, WAITING TO RESTART? %?", - continue)); - } - }; - - ll::loop_delete(loop_ptr); -} - -unsafe fn monitor_task_loop_body(weak_exit_po_in: comm::port<()>, - msg_po_in: comm::port, - loop_ptr: *libc::c_void, - -first_interaction: hl::high_level_msg) -> bool { - // resend the msg to be handled in the select2 loop below.. - comm::send(comm::chan(msg_po_in), first_interaction); - - // our async_handle - let async_handle_po = comm::port::<*ll::uv_async_t>(); - let async_handle_ch = comm::chan(async_handle_po); - - // the msg_po that libuv will be receiving on.. - let loop_msg_po = comm::port::(); - let loop_msg_po_ptr = ptr::addr_of(loop_msg_po); - let loop_msg_ch = comm::chan(loop_msg_po); - - // the question of whether unsupervising this will even do any - // good is there.. but since this'll go into blocking in libuv with - // a quickness.. any errors that occur (including inside crust) will - // be segfaults.. so yeah. +unsafe fn spawn_libuv_weak_task() -> (*ll::uv_async_t, + comm::chan){ + let exit_po = comm::port::<(*ll::uv_async_t, + comm::chan)>(); + let exit_ch = comm::chan(exit_po); + task::spawn_sched(task::manual_threads(1u)) {|| - let loop_msg_po_in = *loop_msg_po_ptr; - hl::run_high_level_loop( - loop_ptr, - loop_msg_po_in, // here the loop gets handed a different message - // port, as we'll be receiving all of the messages - // initially and then passing them on.. - // before_run - {|async_handle| - log(debug,#fmt("monitor gl: before_run: async_handle %?", - async_handle)); - // when this is ran, our async_handle is set up, so let's - // do an async_send with it.. letting the loop know, once it - // starts, that is has work - ll::async_send(async_handle); - comm::send(async_handle_ch, copy(async_handle)); - }, - // before_msg_drain - {|async_handle| - log(debug,#fmt("monitor gl: b4_msg_drain: async_handle %?", - async_handle)); - true - }, - // before_tear_down - {|async_handle| - log(debug,#fmt("monitor gl: b4_tear_down: async_handle %?", - async_handle)); - }); + log(debug, "entering global libuv task"); + let loop_ptr = ll::loop_new(); + priv::weaken_task() {|weak_exit_po| + log(debug, #fmt("global libuv task is now weak %?", + weak_exit_po)); + let loop_msg_po = comm::port::(); + let loop_msg_ch = comm::chan(loop_msg_po); + hl::run_high_level_loop( + loop_ptr, + loop_msg_po, + // before_run + {|async_handle| + log(debug,#fmt("global libuv: before_run %?", + async_handle)); + let out_data = (async_handle, loop_msg_ch); + comm::send(exit_ch, out_data); + }, + // before_msg_process + {|async_handle, loop_active| + log(debug,#fmt("global libuv: before_msg_drain %? %?", + async_handle, loop_active)); + true + }, + // before_tear_down + {|async_handle| + log(debug,#fmt("libuv task: before_tear_down %?", + async_handle)); + } + ); + log(debug, "global libuv task is leaving weakened state"); + }; + ll::loop_delete(loop_ptr); + log(debug, "global libuv task exiting"); }; - // our loop is set up, so let's emit the handle back out to our users.. - let async_handle = comm::recv(async_handle_po); - // supposed to return a bool to indicate to the enclosing loop whether - // it should continue or not.. - let mut continue_inner_loop = true; - let mut didnt_get_hl_bailout = true; - while continue_inner_loop { - log(debug, "monitor task inner loop.. about to block on select2"); - continue_inner_loop = either::either( - {|left_val| - // bail out.. - log(debug, #fmt("monitor inner weak_exit_po recv'd msg: %?", - left_val)); - // TODO: make loop bail out - didnt_get_hl_bailout = false; - false - }, {|right_val| - // wake up our inner loop and pass it a msg.. - comm::send(loop_msg_ch, copy(right_val)); - ll::async_send(async_handle); - true - }, comm::select2(weak_exit_po_in, msg_po_in) - ) - } - didnt_get_hl_bailout -} - -unsafe fn single_task_loop_body(weak_exit_po_in: comm::port<()>, - msg_po_in: comm::port, - loop_ptr: *libc::c_void, - -first_interaction: hl::high_level_msg) -> bool { - // resend the msg - comm::send(comm::chan(msg_po_in), first_interaction); - - // black magic - let weak_exit_po_ptr = ptr::addr_of(weak_exit_po_in); - hl::run_high_level_loop( - loop_ptr, - msg_po_in, - // before_run - {|async_handle| - log(debug,#fmt("global_loop before_run: async_handle %?", - async_handle)); - // set the handle as the global - set_global_async_handle(0u as *ll::uv_async_t, - async_handle); - // when this is ran, our async_handle is set up, so let's - // do an async_send with it - ll::async_send(async_handle); - }, - // before_msg_drain - {|async_handle| - log(debug,#fmt("global_loop before_msg_drain: async_handle %?", - async_handle)); - let weak_exit_po = *weak_exit_po_ptr; - if(comm::peek(weak_exit_po)) { - // if this is true, immediately bail and return false, causing - // the libuv loop to start tearing down - log(debug,"got weak_exit meg inside libuv loop"); - comm::recv(weak_exit_po); - false - } - // if no weak_exit_po msg is received, then we'll let the - // loop continue - else { - true - } - }, - // before_tear_down - {|async_handle| - log(debug,#fmt("global_loop before_tear_down: async_handle %?", - async_handle)); - set_global_async_handle(async_handle, - 0 as *ll::uv_async_t); - }); - // supposed to return a bool to indicate to the enclosing loop whether - // it should continue or not.. - ret true; -} - -unsafe fn get_global_async_handle_native_representation() - -> *libc::uintptr_t { - ret rustrt::rust_uv_get_kernel_global_async_handle(); -} - -unsafe fn get_global_async_handle() -> *ll::uv_async_t { - ret (*get_global_async_handle_native_representation()) as *ll::uv_async_t; -} - -unsafe fn set_global_async_handle(old: *ll::uv_async_t, - new_ptr: *ll::uv_async_t) { - rustrt::rust_compare_and_swap_ptr( - get_global_async_handle_native_representation(), - old as libc::uintptr_t, - new_ptr as libc::uintptr_t); + comm::recv(exit_po) } #[cfg(test)] @@ -320,8 +157,7 @@ mod test { let hl_loop = get_gl(); hl::interact(hl_loop) {|loop_ptr| log(debug, "closing timer"); - //ll::close(timer_ptr as *libc::c_void, simple_timer_close_cb); - hl::unref_and_close(hl_loop, timer_ptr, simple_timer_close_cb); + ll::close(timer_ptr, simple_timer_close_cb); log(debug, "about to deref exit_ch_ptr"); log(debug, "after msg sent on deref'd exit_ch"); }; @@ -340,7 +176,6 @@ mod test { log(debug, "user code inside interact loop!!!"); let init_status = ll::timer_init(loop_ptr, timer_ptr); if(init_status == 0i32) { - hl::ref(hl_loop, timer_ptr); ll::set_data_for_uv_handle( timer_ptr as *libc::c_void, exit_ch_ptr as *libc::c_void); @@ -359,13 +194,39 @@ mod test { comm::recv(exit_po); log(debug, "global_loop timer test: msg recv on exit_po, done.."); } + #[test] - #[ignore] - fn test_uv_global_loop_high_level_global_timer() unsafe { + fn test_gl_uv_global_loop_high_level_global_timer() unsafe { let hl_loop = get_gl(); + let exit_po = comm::port::<()>(); + let exit_ch = comm::chan(exit_po); task::spawn_sched(task::manual_threads(1u), {|| impl_uv_hl_simple_timer(hl_loop); + comm::send(exit_ch, ()); }); impl_uv_hl_simple_timer(hl_loop); + comm::recv(exit_po); + } + + // keeping this test ignored until some kind of stress-test-harness + // is set up for the build bots + #[test] + #[ignore] + fn test_stress_gl_uv_global_loop_high_level_global_timer() unsafe { + let hl_loop = get_gl(); + let exit_po = comm::port::<()>(); + let exit_ch = comm::chan(exit_po); + let cycles = 5000u; + iter::repeat(cycles) {|| + task::spawn_sched(task::manual_threads(1u), {|| + impl_uv_hl_simple_timer(hl_loop); + comm::send(exit_ch, ()); + }); + }; + iter::repeat(cycles) {|| + comm::recv(exit_po); + }; + log(debug, "test_stress_gl_uv_global_loop_high_level_global_timer"+ + " exiting sucessfully!"); } } \ No newline at end of file diff --git a/src/libstd/uv_hl.rs b/src/libstd/uv_hl.rs index 8ce0fb8e5c7..75c5c6ebea4 100644 --- a/src/libstd/uv_hl.rs +++ b/src/libstd/uv_hl.rs @@ -6,8 +6,8 @@ provide a high-level, abstracted interface to some set of libuv functionality. "]; -export high_level_loop, hl_loop_ext, high_level_msg; -export run_high_level_loop, interact, ref, unref, unref_and_close; +export high_level_loop, high_level_msg; +export run_high_level_loop, interact; import ll = uv_ll; @@ -26,51 +26,15 @@ enum high_level_loop { simple_task_loop({ async_handle: *ll::uv_async_t, op_chan: comm::chan - }), - single_task_loop({ - async_handle: **ll::uv_async_t, - op_chan: comm::chan - }), - monitor_task_loop({ - op_chan: comm::chan }) } -impl hl_loop_ext for high_level_loop { - fn async_handle() -> **ll::uv_async_t { - alt self { - single_task_loop({async_handle, op_chan}) { - ret async_handle; - } - _ { - fail "variant of hl::high_level_loop that doesn't include" + - "an async_handle field"; - } - } - } - fn op_chan() -> comm::chan { - alt self { - single_task_loop({async_handle, op_chan}) { - ret op_chan; - } - monitor_task_loop({op_chan}) { - ret op_chan; - } - simple_task_loop({async_handle, op_chan}) { - ret op_chan; - } - } - } -} - #[doc=" Represents the range of interactions with a `high_level_loop` "] enum high_level_msg { interaction (fn~(*libc::c_void)), - ref_handle (*libc::c_void), - manual_unref_handle (*libc::c_void, option<*u8>), - tear_down + teardown_loop } #[doc = " @@ -93,7 +57,8 @@ provided `async_handle`. `uv_run` should return shortly after unsafe fn run_high_level_loop(loop_ptr: *libc::c_void, msg_po: comm::port, before_run: fn~(*ll::uv_async_t), - before_msg_drain: fn~(*ll::uv_async_t) -> bool, + before_msg_process: + fn~(*ll::uv_async_t, bool) -> bool, before_tear_down: fn~(*ll::uv_async_t)) { // set up the special async handle we'll use to allow multi-task // communication with this loop @@ -106,11 +71,9 @@ unsafe fn run_high_level_loop(loop_ptr: *libc::c_void, let data: hl_loop_data = default_gl_data({ async_handle: async_handle, mut active: true, - before_msg_drain: before_msg_drain, + before_msg_process: before_msg_process, before_tear_down: before_tear_down, - msg_po_ptr: ptr::addr_of(msg_po), - mut refd_handles: [mut], - mut unrefd_handles: [mut] + msg_po_ptr: ptr::addr_of(msg_po) }); let data_ptr = ptr::addr_of(data); ll::set_data_for_uv_handle(async_handle, data_ptr); @@ -143,44 +106,6 @@ unsafe fn interact(a_loop: high_level_loop, send_high_level_msg(a_loop, interaction(cb)); } -iface uv_handle_manager { - fn init() -> T; -} - -type safe_handle_fields = { - hl_loop: high_level_loop, - handle: T, - close_cb: *u8 -}; - -/*fn safe_handle(a_loop: high_level_loop, - handle_val: T, - handle_init_cb: fn~(*libc::c_void, *T), - close_cb: *u8) { - -resource safe_handle_container(handle_fields: safe_handle_fields) { -} -}*/ - - -#[doc=" -Needs to be encapsulated within `safe_handle` -"] -fn ref(hl_loop: high_level_loop, handle: *T) unsafe { - send_high_level_msg(hl_loop, ref_handle(handle as *libc::c_void)); -} -#[doc=" -Needs to be encapsulated within `safe_handle` -"] -fn unref(hl_loop: high_level_loop, handle: *T) unsafe { - send_high_level_msg(hl_loop, manual_unref_handle(handle as *libc::c_void, - none)); -} -fn unref_and_close(hl_loop: high_level_loop, handle: *T, cb: *u8) unsafe { - send_high_level_msg(hl_loop, manual_unref_handle(handle as *libc::c_void, - some(cb))); -} - // INTERNAL API // data that lives for the lifetime of the high-evel oo @@ -188,36 +113,26 @@ enum hl_loop_data { default_gl_data({ async_handle: *ll::uv_async_t, mut active: bool, - before_msg_drain: fn~(*ll::uv_async_t) -> bool, + before_msg_process: fn~(*ll::uv_async_t, bool) -> bool, before_tear_down: fn~(*ll::uv_async_t), - msg_po_ptr: *comm::port, - mut refd_handles: [mut *libc::c_void], - mut unrefd_handles: [mut *libc::c_void]}) + msg_po_ptr: *comm::port}) } unsafe fn send_high_level_msg(hl_loop: high_level_loop, - -msg: high_level_msg) unsafe { - comm::send(hl_loop.op_chan(), msg); + -msg: high_level_msg) { + let op_chan = alt hl_loop{simple_task_loop({async_handle, op_chan}){ + op_chan}}; + comm::send(op_chan, msg); // if the global async handle == 0, then that means // the loop isn't active, so we don't need to wake it up, // (the loop's enclosing task should be blocking on a message // receive on this port) alt hl_loop { - single_task_loop({async_handle, op_chan}) { - if ((*async_handle) != 0 as *ll::uv_async_t) { - log(debug,"global async handle != 0, waking up loop.."); - ll::async_send((*async_handle)); - } - else { - log(debug,"GLOBAL ASYNC handle == 0"); - } - } simple_task_loop({async_handle, op_chan}) { log(debug,"simple async handle != 0, waking up loop.."); ll::async_send((async_handle)); } - _ {} } } @@ -228,71 +143,57 @@ unsafe fn send_high_level_msg(hl_loop: high_level_loop, // data member crust fn high_level_wake_up_cb(async_handle: *ll::uv_async_t, status: int) unsafe { - // nothing here, yet. log(debug, #fmt("high_level_wake_up_cb crust.. handle: %? status: %?", async_handle, status)); let loop_ptr = ll::get_loop_for_uv_handle(async_handle); let data = ll::get_data_for_uv_handle(async_handle) as *hl_loop_data; - // we check to see if the loop is "active" (the loop is set to - // active = false the first time we realize we need to 'tear down', - // set subsequent calls to the global async handle may be triggered - // before all of the uv_close() calls are processed and loop exits - // on its own. So if the loop isn't active, we won't run the user's - // on_wake callback (and, consequently, let messages pile up, probably - // in the loops msg_po) - if (*data).active { - log(debug, "before on_wake"); - let mut do_msg_drain = (*data).before_msg_drain(async_handle); - let mut continue = true; - if do_msg_drain { - let msg_po = *((*data).msg_po_ptr); - if comm::peek(msg_po) { - // if this is true, we'll iterate over the - // msgs waiting in msg_po until there's no more - log(debug,"got msg_po"); - while(continue) { - log(debug,"before alt'ing on high_level_msg"); - alt comm::recv(msg_po) { + alt (*data).active { + true { + let msg_po = *((*data).msg_po_ptr); + alt comm::peek(msg_po) { + true { + loop { + let msg = comm::recv(msg_po); + alt (*data).active { + true { + alt msg { interaction(cb) { - log(debug,"got interaction, before cb.."); - // call it.. + (*data).before_msg_process(async_handle, + (*data).active); cb(loop_ptr); - log(debug,"after calling cb"); } - ref_handle(handle) { - high_level_ref(data, handle); - } - manual_unref_handle(handle, user_close_cb) { - high_level_unref(data, handle, true, user_close_cb); - } - tear_down { - log(debug,"incoming hl_msg: got tear_down"); + teardown_loop { + begin_teardown(data); } } - continue = comm::peek(msg_po); + } + false { + // drop msg ? + } } + if !comm::peek(msg_po) { break; } } - else { - log(debug, "in hl wake_cb, no pending messages"); - } - } - log(debug, #fmt("after on_wake, continue? %?", continue)); - if !do_msg_drain { - high_level_tear_down(data); + } + false { + // no pending msgs + } } + } + false { + // loop not active + } } } crust fn tear_down_close_cb(handle: *ll::uv_async_t) unsafe { - log(debug, #fmt("tear_down_close_cb called, closing handle at %?", - handle)); - let data = ll::get_data_for_uv_handle(handle) as *hl_loop_data; - if vec::len((*data).refd_handles) > 0u { - fail "Didn't unref all high-level handles"; - } + let loop_ptr = ll::get_loop_for_uv_handle(handle); + let loop_refs = ll::loop_refcount(loop_ptr); + log(debug, #fmt("tear_down_close_cb called, closing handle at %? refs %?", + handle, loop_refs)); + assert loop_refs == 1i32; } -fn high_level_tear_down(data: *hl_loop_data) unsafe { +fn begin_teardown(data: *hl_loop_data) unsafe { log(debug, "high_level_tear_down() called, close async_handle"); // call user-suppled before_tear_down cb let async_handle = (*data).async_handle; @@ -300,90 +201,6 @@ fn high_level_tear_down(data: *hl_loop_data) unsafe { ll::close(async_handle as *libc::c_void, tear_down_close_cb); } -unsafe fn high_level_ref(data: *hl_loop_data, handle: *libc::c_void) { - log(debug,"incoming hl_msg: got ..ref_handle"); - let mut refd_handles = (*data).refd_handles; - let mut unrefd_handles = (*data).unrefd_handles; - let handle_already_refd = refd_handles.contains(handle); - if handle_already_refd { - fail "attempt to do a high-level ref an already ref'd handle"; - } - let handle_already_unrefd = unrefd_handles.contains(handle); - // if we are ref'ing a handle (by ptr) that was already unref'd, - // probably - if handle_already_unrefd { - let last_idx = vec::len(unrefd_handles) - 1u; - let handle_idx = vec::position_elem(unrefd_handles, handle); - alt handle_idx { - none { - fail "trying to remove handle that isn't in unrefd_handles"; - } - some(idx) { - unrefd_handles[idx] <-> unrefd_handles[last_idx]; - vec::pop(unrefd_handles); - } - } - (*data).unrefd_handles = unrefd_handles; - } - refd_handles += [handle]; - (*data).refd_handles = refd_handles; -} - -unsafe fn high_level_unref(data: *hl_loop_data, handle: *libc::c_void, - manual_unref: bool, user_close_cb: option<*u8>) { - log(debug,"incoming hl_msg: got auto_unref_handle"); - let mut refd_handles = (*data).refd_handles; - let mut unrefd_handles = (*data).unrefd_handles; - log(debug, #fmt("refs: %?, unrefs %? handle %?", vec::len(refd_handles), - vec::len(unrefd_handles), handle)); - let handle_already_refd = refd_handles.contains(handle); - if !handle_already_refd { - fail "attempting to high-level unref an untracked handle"; - } - let double_unref = unrefd_handles.contains(handle); - if double_unref { - log(debug, "double unref encountered"); - if manual_unref { - // will allow a user to manual unref, but only signal - // a fail when a double-unref is caused by a user - fail "attempting to high-level unref an unrefd handle"; - } - else { - log(debug, "not failing..."); - } - } - else { - log(debug, "attempting to unref handle"); - alt user_close_cb { - some(cb) { - ll::close(handle, cb); - } - none { } - } - let last_idx = vec::len(refd_handles) - 1u; - let handle_idx = vec::position_elem(refd_handles, handle); - alt handle_idx { - none { - fail "trying to remove handle that isn't in refd_handles"; - } - some(idx) { - refd_handles[idx] <-> refd_handles[last_idx]; - vec::pop(refd_handles); - } - } - (*data).refd_handles = refd_handles; - unrefd_handles += [handle]; - (*data).unrefd_handles = unrefd_handles; - if vec::len(refd_handles) == 0u { - log(debug, "0 referenced handles, start loop teardown"); - high_level_tear_down(data); - } - else { - log(debug, "more than 0 referenced handles"); - } - } - -} #[cfg(test)] mod test { crust fn async_close_cb(handle: *ll::uv_async_t) unsafe { @@ -397,7 +214,7 @@ mod test { log(debug, #fmt("async_handle_cb handle %? status %?",handle,status)); let hl_loop = (*(ll::get_data_for_uv_handle(handle) as *ah_data)).hl_loop; - unref_and_close(hl_loop, handle, async_close_cb); + ll::close(handle, async_close_cb); } type ah_data = { hl_loop: high_level_loop, @@ -414,7 +231,6 @@ mod test { }; let ah_data_ptr = ptr::addr_of(ah_data); interact(hl_loop) {|loop_ptr| - ref(hl_loop, ah_ptr); ll::async_init(loop_ptr, ah_ptr, async_handle_cb); ll::set_data_for_uv_handle(ah_ptr, ah_data_ptr as *libc::c_void); ll::async_send(ah_ptr); @@ -446,9 +262,9 @@ mod test { })); }, // before_msg_drain - {|async_handle| - log(debug,#fmt("hltest before_msg_drain: async_handle %?", - async_handle)); + {|async_handle, status| + log(debug,#fmt("hltest before_msg_drain: handle %? %?", + async_handle, status)); true }, // before_tear_down @@ -473,7 +289,6 @@ mod test { } #[test] - #[ignore] fn test_uv_hl_async() unsafe { let exit_po = comm::port::<()>(); let exit_ch = comm::chan(exit_po); @@ -485,27 +300,30 @@ mod test { // under race-condition type situations.. this ensures that the loop // lives until, at least, all of the impl_uv_hl_async() runs have been // called, at least. - let lifetime_handle = ll::async_t(); - let lifetime_handle_ptr = ptr::addr_of(lifetime_handle); - interact(hl_loop) {|loop_ptr| - ref(hl_loop, lifetime_handle_ptr); - ll::async_init(loop_ptr, lifetime_handle_ptr, - lifetime_async_callback); - }; - + let work_exit_po = comm::port::<()>(); + let work_exit_ch = comm::chan(work_exit_po); iter::repeat(7u) {|| task::spawn_sched(task::manual_threads(1u), {|| impl_uv_hl_async(hl_loop); + comm::send(work_exit_ch, ()); }); }; - impl_uv_hl_async(hl_loop); - impl_uv_hl_async(hl_loop); - impl_uv_hl_async(hl_loop); - interact(hl_loop) {|loop_ptr| - ll::close(lifetime_handle_ptr, lifetime_handle_close); - unref(hl_loop, lifetime_handle_ptr); - log(debug, "close and unref lifetime handle"); + iter::repeat(7u) {|| + comm::recv(work_exit_po); }; + log(debug, "sending teardown_loop msg.."); + // the teardown msg usually comes, in the case of the global loop, + // as a result of receiving a msg on the weaken_task port. but, + // anyone rolling their own high_level_loop can decide when to + // send the msg. it's assert and barf, though, if all of your + // handles aren't uv_close'd first + alt hl_loop { + simple_task_loop({async_handle, op_chan}) { + comm::send(op_chan, teardown_loop); + ll::async_send(async_handle); + } + } comm::recv(exit_po); + log(debug, "after recv on exit_po.. exiting.."); } }