1
Fork 0

std: Stop using oldcomm

This commit is contained in:
Brian Anderson 2013-01-25 00:52:50 -08:00
parent 87acde8826
commit da4b376897
8 changed files with 427 additions and 447 deletions

View file

@ -483,7 +483,6 @@ mod tests {
use arc::*; use arc::*;
use arc; use arc;
use core::oldcomm::*;
use core::option::{Some, None}; use core::option::{Some, None};
use core::option; use core::option;
use core::pipes; use core::pipes;

View file

@ -38,7 +38,6 @@
#[forbid(deprecated_mode)]; #[forbid(deprecated_mode)];
use core::libc; use core::libc;
use core::oldcomm;
use core::option; use core::option;
use core::prelude::*; use core::prelude::*;
use core::ptr; use core::ptr;

View file

@ -12,8 +12,8 @@
#[forbid(deprecated_mode)]; #[forbid(deprecated_mode)];
use core::libc; use core::libc;
use core::oldcomm;
use core::prelude::*; use core::prelude::*;
use core::pipes::{stream, SharedChan};
use core::ptr; use core::ptr;
use core::result; use core::result;
use core::str; use core::str;
@ -113,17 +113,18 @@ enum IpGetAddrErr {
* A `result<~[ip_addr], ip_get_addr_err>` instance that will contain * A `result<~[ip_addr], ip_get_addr_err>` instance that will contain
* a vector of `ip_addr` results, in the case of success, or an error * a vector of `ip_addr` results, in the case of success, or an error
* object in the case of failure * object in the case of failure
*/ */
pub fn get_addr(node: &str, iotask: &iotask) pub fn get_addr(node: &str, iotask: &iotask)
-> result::Result<~[IpAddr], IpGetAddrErr> { -> result::Result<~[IpAddr], IpGetAddrErr> {
do oldcomm::listen |output_ch| { let (output_po, output_ch) = stream();
let output_ch = SharedChan(output_ch);
do str::as_buf(node) |node_ptr, len| { do str::as_buf(node) |node_ptr, len| {
unsafe { unsafe {
log(debug, fmt!("slice len %?", len)); log(debug, fmt!("slice len %?", len));
let handle = create_uv_getaddrinfo_t(); let handle = create_uv_getaddrinfo_t();
let handle_ptr = ptr::addr_of(&handle); let handle_ptr = ptr::addr_of(&handle);
let handle_data = GetAddrData { let handle_data = GetAddrData {
output_ch: output_ch output_ch: output_ch.clone()
}; };
let handle_data_ptr = ptr::addr_of(&handle_data); let handle_data_ptr = ptr::addr_of(&handle_data);
do interact(iotask) |loop_ptr| { do interact(iotask) |loop_ptr| {
@ -145,8 +146,7 @@ pub fn get_addr(node: &str, iotask: &iotask)
} }
} }
}; };
output_ch.recv() output_po.recv()
}
} }
} }
} }
@ -300,7 +300,7 @@ pub mod v6 {
} }
struct GetAddrData { struct GetAddrData {
output_ch: oldcomm::Chan<result::Result<~[IpAddr],IpGetAddrErr>> output_ch: SharedChan<result::Result<~[IpAddr],IpGetAddrErr>>
} }
extern fn get_addr_cb(handle: *uv_getaddrinfo_t, status: libc::c_int, extern fn get_addr_cb(handle: *uv_getaddrinfo_t, status: libc::c_int,
@ -309,6 +309,7 @@ extern fn get_addr_cb(handle: *uv_getaddrinfo_t, status: libc::c_int,
log(debug, ~"in get_addr_cb"); log(debug, ~"in get_addr_cb");
let handle_data = get_data_for_req(handle) as let handle_data = get_data_for_req(handle) as
*GetAddrData; *GetAddrData;
let output_ch = (*handle_data).output_ch.clone();
if status == 0i32 { if status == 0i32 {
if res != (ptr::null::<addrinfo>()) { if res != (ptr::null::<addrinfo>()) {
let mut out_vec = ~[]; let mut out_vec = ~[];
@ -326,7 +327,7 @@ extern fn get_addr_cb(handle: *uv_getaddrinfo_t, status: libc::c_int,
else { else {
log(debug, ~"curr_addr is not of family AF_INET or "+ log(debug, ~"curr_addr is not of family AF_INET or "+
~"AF_INET6. Error."); ~"AF_INET6. Error.");
(*handle_data).output_ch.send( output_ch.send(
result::Err(GetAddrUnknownError)); result::Err(GetAddrUnknownError));
break; break;
}; };
@ -344,17 +345,17 @@ extern fn get_addr_cb(handle: *uv_getaddrinfo_t, status: libc::c_int,
} }
log(debug, fmt!("successful process addrinfo result, len: %?", log(debug, fmt!("successful process addrinfo result, len: %?",
vec::len(out_vec))); vec::len(out_vec)));
(*handle_data).output_ch.send(result::Ok(move out_vec)); output_ch.send(result::Ok(move out_vec));
} }
else { else {
log(debug, ~"addrinfo pointer is NULL"); log(debug, ~"addrinfo pointer is NULL");
(*handle_data).output_ch.send( output_ch.send(
result::Err(GetAddrUnknownError)); result::Err(GetAddrUnknownError));
} }
} }
else { else {
log(debug, ~"status != 0 error in get_addr_cb"); log(debug, ~"status != 0 error in get_addr_cb");
(*handle_data).output_ch.send( output_ch.send(
result::Err(GetAddrUnknownError)); result::Err(GetAddrUnknownError));
} }
if res != (ptr::null::<addrinfo>()) { if res != (ptr::null::<addrinfo>()) {

View file

@ -23,7 +23,7 @@ use core::io::{Reader, ReaderUtil, Writer};
use core::io; use core::io;
use core::libc::size_t; use core::libc::size_t;
use core::libc; use core::libc;
use core::oldcomm; use core::pipes::{stream, Chan, Port, SharedChan};
use core::prelude::*; use core::prelude::*;
use core::ptr; use core::ptr;
use core::result::{Result}; use core::result::{Result};
@ -146,19 +146,22 @@ pub fn connect(input_ip: ip::IpAddr, port: uint,
iotask: &IoTask) iotask: &IoTask)
-> result::Result<TcpSocket, TcpConnectErrData> { -> result::Result<TcpSocket, TcpConnectErrData> {
unsafe { unsafe {
let result_po = oldcomm::Port::<ConnAttempt>(); let (result_po, result_ch) = stream::<ConnAttempt>();
let closed_signal_po = oldcomm::Port::<()>(); let result_ch = SharedChan(result_ch);
let conn_data = { let (closed_signal_po, closed_signal_ch) = stream::<()>();
result_ch: oldcomm::Chan(&result_po), let closed_signal_ch = SharedChan(closed_signal_ch);
closed_signal_ch: oldcomm::Chan(&closed_signal_po) let conn_data = ConnectReqData {
result_ch: result_ch,
closed_signal_ch: closed_signal_ch
}; };
let conn_data_ptr = ptr::addr_of(&conn_data); let conn_data_ptr = ptr::addr_of(&conn_data);
let reader_po = oldcomm::Port::<result::Result<~[u8], TcpErrData>>(); let (reader_po, reader_ch) = stream::<Result<~[u8], TcpErrData>>();
let reader_ch = SharedChan(reader_ch);
let stream_handle_ptr = malloc_uv_tcp_t(); let stream_handle_ptr = malloc_uv_tcp_t();
*(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t(); *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
let socket_data = @TcpSocketData { let socket_data = @TcpSocketData {
reader_po: reader_po, reader_po: @reader_po,
reader_ch: oldcomm::Chan(&reader_po), reader_ch: reader_ch,
stream_handle_ptr: stream_handle_ptr, stream_handle_ptr: stream_handle_ptr,
connect_req: uv::ll::connect_t(), connect_req: uv::ll::connect_t(),
write_req: uv::ll::write_t(), write_req: uv::ll::write_t(),
@ -169,7 +172,6 @@ pub fn connect(input_ip: ip::IpAddr, port: uint,
iotask: iotask.clone() iotask: iotask.clone()
}; };
let socket_data_ptr = ptr::addr_of(&(*socket_data)); let socket_data_ptr = ptr::addr_of(&(*socket_data));
debug!("tcp_connect result_ch %?", conn_data.result_ch);
// get an unsafe representation of our stream_handle_ptr that // get an unsafe representation of our stream_handle_ptr that
// we can send into the interact cb to be handled in libuv.. // we can send into the interact cb to be handled in libuv..
debug!("stream_handle_ptr outside interact %?", debug!("stream_handle_ptr outside interact %?",
@ -238,8 +240,9 @@ pub fn connect(input_ip: ip::IpAddr, port: uint,
// somesuch // somesuch
let err_data = let err_data =
uv::ll::get_last_err_data(loop_ptr); uv::ll::get_last_err_data(loop_ptr);
oldcomm::send((*conn_data_ptr).result_ch, let result_ch = (*conn_data_ptr)
ConnFailure(err_data)); .result_ch.clone();
result_ch.send(ConnFailure(err_data));
uv::ll::set_data_for_uv_handle( uv::ll::set_data_for_uv_handle(
stream_handle_ptr, stream_handle_ptr,
conn_data_ptr); conn_data_ptr);
@ -251,19 +254,19 @@ pub fn connect(input_ip: ip::IpAddr, port: uint,
_ => { _ => {
// failure to create a tcp handle // failure to create a tcp handle
let err_data = uv::ll::get_last_err_data(loop_ptr); let err_data = uv::ll::get_last_err_data(loop_ptr);
oldcomm::send((*conn_data_ptr).result_ch, let result_ch = (*conn_data_ptr).result_ch.clone();
ConnFailure(err_data)); result_ch.send(ConnFailure(err_data));
} }
} }
} }
} }
match oldcomm::recv(result_po) { match result_po.recv() {
ConnSuccess => { ConnSuccess => {
debug!("tcp::connect - received success on result_po"); debug!("tcp::connect - received success on result_po");
result::Ok(TcpSocket(socket_data)) result::Ok(TcpSocket(socket_data))
} }
ConnFailure(ref err_data) => { ConnFailure(ref err_data) => {
oldcomm::recv(closed_signal_po); closed_signal_po.recv();
debug!("tcp::connect - received failure on result_po"); debug!("tcp::connect - received failure on result_po");
// still have to free the malloc'd stream handle.. // still have to free the malloc'd stream handle..
rustrt::rust_uv_current_kernel_free(stream_handle_ptr rustrt::rust_uv_current_kernel_free(stream_handle_ptr
@ -359,7 +362,7 @@ pub fn write_future(sock: &TcpSocket, raw_write_data: ~[u8])
* `tcp_err_data` record * `tcp_err_data` record
*/ */
pub fn read_start(sock: &TcpSocket) pub fn read_start(sock: &TcpSocket)
-> result::Result<oldcomm::Port< -> result::Result<@Port<
result::Result<~[u8], TcpErrData>>, TcpErrData> { result::Result<~[u8], TcpErrData>>, TcpErrData> {
unsafe { unsafe {
let socket_data = ptr::addr_of(&(*(sock.socket_data))); let socket_data = ptr::addr_of(&(*(sock.socket_data)));
@ -374,12 +377,9 @@ pub fn read_start(sock: &TcpSocket)
* *
* * `sock` - a `net::tcp::tcp_socket` that you wish to stop reading on * * `sock` - a `net::tcp::tcp_socket` that you wish to stop reading on
*/ */
pub fn read_stop(sock: &TcpSocket, pub fn read_stop(sock: &TcpSocket) ->
read_port: oldcomm::Port<result::Result<~[u8], TcpErrData>>) ->
result::Result<(), TcpErrData> { result::Result<(), TcpErrData> {
unsafe { unsafe {
debug!(
"taking the read_port out of commission %?", read_port);
let socket_data = ptr::addr_of(&(*sock.socket_data)); let socket_data = ptr::addr_of(&(*sock.socket_data));
read_stop_common_impl(socket_data) read_stop_common_impl(socket_data)
} }
@ -519,14 +519,16 @@ pub fn accept(new_conn: TcpNewConnection)
NewTcpConn(server_handle_ptr) => { NewTcpConn(server_handle_ptr) => {
let server_data_ptr = uv::ll::get_data_for_uv_handle( let server_data_ptr = uv::ll::get_data_for_uv_handle(
server_handle_ptr) as *TcpListenFcData; server_handle_ptr) as *TcpListenFcData;
let reader_po = oldcomm::Port(); let (reader_po, reader_ch) = stream::<
Result<~[u8], TcpErrData>>();
let reader_ch = SharedChan(reader_ch);
let iotask = &(*server_data_ptr).iotask; let iotask = &(*server_data_ptr).iotask;
let stream_handle_ptr = malloc_uv_tcp_t(); let stream_handle_ptr = malloc_uv_tcp_t();
*(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) =
uv::ll::tcp_t(); uv::ll::tcp_t();
let client_socket_data: @TcpSocketData = @TcpSocketData { let client_socket_data: @TcpSocketData = @TcpSocketData {
reader_po: reader_po, reader_po: @reader_po,
reader_ch: oldcomm::Chan(&reader_po), reader_ch: reader_ch,
stream_handle_ptr : stream_handle_ptr, stream_handle_ptr : stream_handle_ptr,
connect_req : uv::ll::connect_t(), connect_req : uv::ll::connect_t(),
write_req : uv::ll::write_t(), write_req : uv::ll::write_t(),
@ -538,8 +540,8 @@ pub fn accept(new_conn: TcpNewConnection)
let client_stream_handle_ptr = let client_stream_handle_ptr =
(*client_socket_data_ptr).stream_handle_ptr; (*client_socket_data_ptr).stream_handle_ptr;
let result_po = oldcomm::Port::<Option<TcpErrData>>(); let (result_po, result_ch) = stream::<Option<TcpErrData>>();
let result_ch = oldcomm::Chan(&result_po); let result_ch = SharedChan(result_ch);
// UNSAFE LIBUV INTERACTION BEGIN // UNSAFE LIBUV INTERACTION BEGIN
// .. normally this happens within the context of // .. normally this happens within the context of
@ -565,11 +567,11 @@ pub fn accept(new_conn: TcpNewConnection)
client_stream_handle_ptr, client_stream_handle_ptr,
client_socket_data_ptr client_socket_data_ptr
as *libc::c_void); as *libc::c_void);
oldcomm::send(result_ch, None); result_ch.send(None);
} }
_ => { _ => {
log(debug, ~"failed to accept client conn"); log(debug, ~"failed to accept client conn");
oldcomm::send(result_ch, Some( result_ch.send(Some(
uv::ll::get_last_err_data( uv::ll::get_last_err_data(
loop_ptr).to_tcp_err())); loop_ptr).to_tcp_err()));
} }
@ -577,13 +579,13 @@ pub fn accept(new_conn: TcpNewConnection)
} }
_ => { _ => {
log(debug, ~"failed to accept client stream"); log(debug, ~"failed to accept client stream");
oldcomm::send(result_ch, Some( result_ch.send(Some(
uv::ll::get_last_err_data( uv::ll::get_last_err_data(
loop_ptr).to_tcp_err())); loop_ptr).to_tcp_err()));
} }
} }
// UNSAFE LIBUV INTERACTION END // UNSAFE LIBUV INTERACTION END
match oldcomm::recv(result_po) { match result_po.recv() {
Some(copy err_data) => result::Err(err_data), Some(copy err_data) => result::Err(err_data),
None => result::Ok(TcpSocket(client_socket_data)) None => result::Ok(TcpSocket(client_socket_data))
} }
@ -622,9 +624,9 @@ pub fn accept(new_conn: TcpNewConnection)
*/ */
pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint, pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint,
iotask: &IoTask, iotask: &IoTask,
on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>), on_establish_cb: fn~(SharedChan<Option<TcpErrData>>),
new_connect_cb: fn~(TcpNewConnection, new_connect_cb: fn~(TcpNewConnection,
oldcomm::Chan<Option<TcpErrData>>)) SharedChan<Option<TcpErrData>>))
-> result::Result<(), TcpListenErrData> { -> result::Result<(), TcpListenErrData> {
do listen_common(move host_ip, port, backlog, iotask, do listen_common(move host_ip, port, backlog, iotask,
move on_establish_cb) move on_establish_cb)
@ -634,7 +636,7 @@ pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint,
let server_data_ptr = uv::ll::get_data_for_uv_handle(handle) let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
as *TcpListenFcData; as *TcpListenFcData;
let new_conn = NewTcpConn(handle); let new_conn = NewTcpConn(handle);
let kill_ch = (*server_data_ptr).kill_ch; let kill_ch = (*server_data_ptr).kill_ch.clone();
new_connect_cb(new_conn, kill_ch); new_connect_cb(new_conn, kill_ch);
} }
} }
@ -642,19 +644,20 @@ pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint,
fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
iotask: &IoTask, iotask: &IoTask,
on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>), on_establish_cb: fn~(SharedChan<Option<TcpErrData>>),
on_connect_cb: fn~(*uv::ll::uv_tcp_t)) on_connect_cb: fn~(*uv::ll::uv_tcp_t))
-> result::Result<(), TcpListenErrData> { -> result::Result<(), TcpListenErrData> {
unsafe { unsafe {
let stream_closed_po = oldcomm::Port::<()>(); let (stream_closed_po, stream_closed_ch) = stream::<()>();
let kill_po = oldcomm::Port::<Option<TcpErrData>>(); let stream_closed_ch = SharedChan(stream_closed_ch);
let kill_ch = oldcomm::Chan(&kill_po); let (kill_po, kill_ch) = stream::<Option<TcpErrData>>();
let kill_ch = SharedChan(kill_ch);
let server_stream = uv::ll::tcp_t(); let server_stream = uv::ll::tcp_t();
let server_stream_ptr = ptr::addr_of(&server_stream); let server_stream_ptr = ptr::addr_of(&server_stream);
let server_data: TcpListenFcData = TcpListenFcData { let server_data: TcpListenFcData = TcpListenFcData {
server_stream_ptr: server_stream_ptr, server_stream_ptr: server_stream_ptr,
stream_closed_ch: oldcomm::Chan(&stream_closed_po), stream_closed_ch: stream_closed_ch,
kill_ch: kill_ch, kill_ch: kill_ch.clone(),
on_connect_cb: move on_connect_cb, on_connect_cb: move on_connect_cb,
iotask: iotask.clone(), iotask: iotask.clone(),
ipv6: match &host_ip { ipv6: match &host_ip {
@ -665,7 +668,8 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
}; };
let server_data_ptr = ptr::addr_of(&server_data); let server_data_ptr = ptr::addr_of(&server_data);
let setup_result = do oldcomm::listen |setup_ch| { let (setup_po, setup_ch) = stream();
// this is to address a compiler warning about // this is to address a compiler warning about
// an implicit copy.. it seems that double nested // an implicit copy.. it seems that double nested
// will defeat a move sigil, as is done to the host_ip // will defeat a move sigil, as is done to the host_ip
@ -705,15 +709,14 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
server_stream_ptr, server_stream_ptr,
backlog as libc::c_int, backlog as libc::c_int,
tcp_lfc_on_connection_cb) { tcp_lfc_on_connection_cb) {
0i32 => oldcomm::send(setup_ch, None), 0i32 => setup_ch.send(None),
_ => { _ => {
log(debug, log(debug,
~"failure to uv_tcp_init"); ~"failure to uv_tcp_init");
let err_data = let err_data =
uv::ll::get_last_err_data( uv::ll::get_last_err_data(
loop_ptr); loop_ptr);
oldcomm::send(setup_ch, setup_ch.send(Some(err_data));
Some(err_data));
} }
} }
} }
@ -721,7 +724,7 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
log(debug, ~"failure to uv_tcp_bind"); log(debug, ~"failure to uv_tcp_bind");
let err_data = uv::ll::get_last_err_data( let err_data = uv::ll::get_last_err_data(
loop_ptr); loop_ptr);
oldcomm::send(setup_ch, Some(err_data)); setup_ch.send(Some(err_data));
} }
} }
} }
@ -729,13 +732,14 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
log(debug, ~"failure to uv_tcp_bind"); log(debug, ~"failure to uv_tcp_bind");
let err_data = uv::ll::get_last_err_data( let err_data = uv::ll::get_last_err_data(
loop_ptr); loop_ptr);
oldcomm::send(setup_ch, Some(err_data)); setup_ch.send(Some(err_data));
} }
} }
} }
} }
setup_ch.recv()
}; let setup_result = setup_po.recv();
match setup_result { match setup_result {
Some(ref err_data) => { Some(ref err_data) => {
do iotask::interact(iotask) |loop_ptr| { do iotask::interact(iotask) |loop_ptr| {
@ -767,8 +771,8 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint,
} }
} }
None => { None => {
on_establish_cb(kill_ch); on_establish_cb(kill_ch.clone());
let kill_result = oldcomm::recv(kill_po); let kill_result = kill_po.recv();
do iotask::interact(iotask) |loop_ptr| { do iotask::interact(iotask) |loop_ptr| {
unsafe { unsafe {
log(debug, log(debug,
@ -816,14 +820,13 @@ pub fn socket_buf(sock: TcpSocket) -> TcpSocketBuf {
/// Convenience methods extending `net::tcp::tcp_socket` /// Convenience methods extending `net::tcp::tcp_socket`
impl TcpSocket { impl TcpSocket {
pub fn read_start() -> result::Result<oldcomm::Port< pub fn read_start() -> result::Result<@Port<
result::Result<~[u8], TcpErrData>>, TcpErrData> { result::Result<~[u8], TcpErrData>>, TcpErrData> {
read_start(&self) read_start(&self)
} }
pub fn read_stop(read_port: pub fn read_stop() ->
oldcomm::Port<result::Result<~[u8], TcpErrData>>) ->
result::Result<(), TcpErrData> { result::Result<(), TcpErrData> {
read_stop(&self, move read_port) read_stop(&self)
} }
fn read(timeout_msecs: uint) -> fn read(timeout_msecs: uint) ->
result::Result<~[u8], TcpErrData> { result::Result<~[u8], TcpErrData> {
@ -995,9 +998,9 @@ impl TcpSocketBuf: io::Writer {
fn tear_down_socket_data(socket_data: @TcpSocketData) { fn tear_down_socket_data(socket_data: @TcpSocketData) {
unsafe { unsafe {
let closed_po = oldcomm::Port::<()>(); let (closed_po, closed_ch) = stream::<()>();
let closed_ch = oldcomm::Chan(&closed_po); let closed_ch = SharedChan(closed_ch);
let close_data = { let close_data = TcpSocketCloseData {
closed_ch: closed_ch closed_ch: closed_ch
}; };
let close_data_ptr = ptr::addr_of(&close_data); let close_data_ptr = ptr::addr_of(&close_data);
@ -1012,7 +1015,7 @@ fn tear_down_socket_data(socket_data: @TcpSocketData) {
uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb); uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
} }
}; };
oldcomm::recv(closed_po); closed_po.recv();
//the line below will most likely crash //the line below will most likely crash
//log(debug, fmt!("about to free socket_data at %?", socket_data)); //log(debug, fmt!("about to free socket_data at %?", socket_data));
rustrt::rust_uv_current_kernel_free(stream_handle_ptr rustrt::rust_uv_current_kernel_free(stream_handle_ptr
@ -1038,9 +1041,9 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint)
log(debug, ~"tcp::read before recv_timeout"); log(debug, ~"tcp::read before recv_timeout");
let read_result = if timeout_msecs > 0u { let read_result = if timeout_msecs > 0u {
timer::recv_timeout( timer::recv_timeout(
iotask, timeout_msecs, result::get(&rs_result)) iotask, timeout_msecs, result::unwrap(rs_result))
} else { } else {
Some(oldcomm::recv(result::get(&rs_result))) Some(result::get(&rs_result).recv())
}; };
log(debug, ~"tcp::read after recv_timeout"); log(debug, ~"tcp::read after recv_timeout");
match move read_result { match move read_result {
@ -1068,8 +1071,7 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) ->
result::Result<(), TcpErrData> { result::Result<(), TcpErrData> {
unsafe { unsafe {
let stream_handle_ptr = (*socket_data).stream_handle_ptr; let stream_handle_ptr = (*socket_data).stream_handle_ptr;
let stop_po = oldcomm::Port::<Option<TcpErrData>>(); let (stop_po, stop_ch) = stream::<Option<TcpErrData>>();
let stop_ch = oldcomm::Chan(&stop_po);
do iotask::interact(&(*socket_data).iotask) |loop_ptr| { do iotask::interact(&(*socket_data).iotask) |loop_ptr| {
unsafe { unsafe {
log(debug, ~"in interact cb for tcp::read_stop"); log(debug, ~"in interact cb for tcp::read_stop");
@ -1077,17 +1079,17 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) ->
as *uv::ll::uv_stream_t) { as *uv::ll::uv_stream_t) {
0i32 => { 0i32 => {
log(debug, ~"successfully called uv_read_stop"); log(debug, ~"successfully called uv_read_stop");
oldcomm::send(stop_ch, None); stop_ch.send(None);
} }
_ => { _ => {
log(debug, ~"failure in calling uv_read_stop"); log(debug, ~"failure in calling uv_read_stop");
let err_data = uv::ll::get_last_err_data(loop_ptr); let err_data = uv::ll::get_last_err_data(loop_ptr);
oldcomm::send(stop_ch, Some(err_data.to_tcp_err())); stop_ch.send(Some(err_data.to_tcp_err()));
} }
} }
} }
} }
match oldcomm::recv(stop_po) { match stop_po.recv() {
Some(move err_data) => Err(err_data), Some(move err_data) => Err(err_data),
None => Ok(()) None => Ok(())
} }
@ -1096,12 +1098,11 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) ->
// shared impl for read_start // shared impl for read_start
fn read_start_common_impl(socket_data: *TcpSocketData) fn read_start_common_impl(socket_data: *TcpSocketData)
-> result::Result<oldcomm::Port< -> result::Result<@Port<
result::Result<~[u8], TcpErrData>>, TcpErrData> { result::Result<~[u8], TcpErrData>>, TcpErrData> {
unsafe { unsafe {
let stream_handle_ptr = (*socket_data).stream_handle_ptr; let stream_handle_ptr = (*socket_data).stream_handle_ptr;
let start_po = oldcomm::Port::<Option<uv::ll::uv_err_data>>(); let (start_po, start_ch) = stream::<Option<uv::ll::uv_err_data>>();
let start_ch = oldcomm::Chan(&start_po);
log(debug, ~"in tcp::read_start before interact loop"); log(debug, ~"in tcp::read_start before interact loop");
do iotask::interact(&(*socket_data).iotask) |loop_ptr| { do iotask::interact(&(*socket_data).iotask) |loop_ptr| {
unsafe { unsafe {
@ -1113,19 +1114,22 @@ fn read_start_common_impl(socket_data: *TcpSocketData)
on_tcp_read_cb) { on_tcp_read_cb) {
0i32 => { 0i32 => {
log(debug, ~"success doing uv_read_start"); log(debug, ~"success doing uv_read_start");
oldcomm::send(start_ch, None); start_ch.send(None);
} }
_ => { _ => {
log(debug, ~"error attempting uv_read_start"); log(debug, ~"error attempting uv_read_start");
let err_data = uv::ll::get_last_err_data(loop_ptr); let err_data = uv::ll::get_last_err_data(loop_ptr);
oldcomm::send(start_ch, Some(err_data)); start_ch.send(Some(err_data));
} }
} }
} }
} }
match oldcomm::recv(start_po) { match start_po.recv() {
Some(ref err_data) => result::Err(err_data.to_tcp_err()), Some(ref err_data) => result::Err(
None => result::Ok((*socket_data).reader_po) err_data.to_tcp_err()),
None => {
result::Ok((*socket_data).reader_po)
}
} }
} }
} }
@ -1144,9 +1148,10 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData,
vec::raw::to_ptr(raw_write_data), vec::raw::to_ptr(raw_write_data),
vec::len(raw_write_data)) ]; vec::len(raw_write_data)) ];
let write_buf_vec_ptr = ptr::addr_of(&write_buf_vec); let write_buf_vec_ptr = ptr::addr_of(&write_buf_vec);
let result_po = oldcomm::Port::<TcpWriteResult>(); let (result_po, result_ch) = stream::<TcpWriteResult>();
let write_data = { let result_ch = SharedChan(result_ch);
result_ch: oldcomm::Chan(&result_po) let write_data = WriteReqData {
result_ch: result_ch
}; };
let write_data_ptr = ptr::addr_of(&write_data); let write_data_ptr = ptr::addr_of(&write_data);
do iotask::interact(&(*socket_data_ptr).iotask) |loop_ptr| { do iotask::interact(&(*socket_data_ptr).iotask) |loop_ptr| {
@ -1165,8 +1170,8 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData,
_ => { _ => {
log(debug, ~"error invoking uv_write()"); log(debug, ~"error invoking uv_write()");
let err_data = uv::ll::get_last_err_data(loop_ptr); let err_data = uv::ll::get_last_err_data(loop_ptr);
oldcomm::send((*write_data_ptr).result_ch, let result_ch = (*write_data_ptr).result_ch.clone();
TcpWriteError(err_data.to_tcp_err())); result_ch.send(TcpWriteError(err_data.to_tcp_err()));
} }
} }
} }
@ -1175,7 +1180,7 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData,
// and waiting here for the write to complete, we should transfer // and waiting here for the write to complete, we should transfer
// ownership of everything to the I/O task and let it deal with the // ownership of everything to the I/O task and let it deal with the
// aftermath, so we don't have to sit here blocking. // aftermath, so we don't have to sit here blocking.
match oldcomm::recv(result_po) { match result_po.recv() {
TcpWriteSuccess => Ok(()), TcpWriteSuccess => Ok(()),
TcpWriteError(move err_data) => Err(err_data) TcpWriteError(move err_data) => Err(err_data)
} }
@ -1188,8 +1193,8 @@ enum TcpNewConnection {
struct TcpListenFcData { struct TcpListenFcData {
server_stream_ptr: *uv::ll::uv_tcp_t, server_stream_ptr: *uv::ll::uv_tcp_t,
stream_closed_ch: oldcomm::Chan<()>, stream_closed_ch: SharedChan<()>,
kill_ch: oldcomm::Chan<Option<TcpErrData>>, kill_ch: SharedChan<Option<TcpErrData>>,
on_connect_cb: fn~(*uv::ll::uv_tcp_t), on_connect_cb: fn~(*uv::ll::uv_tcp_t),
iotask: IoTask, iotask: IoTask,
ipv6: bool, ipv6: bool,
@ -1200,7 +1205,8 @@ extern fn tcp_lfc_close_cb(handle: *uv::ll::uv_tcp_t) {
unsafe { unsafe {
let server_data_ptr = uv::ll::get_data_for_uv_handle( let server_data_ptr = uv::ll::get_data_for_uv_handle(
handle) as *TcpListenFcData; handle) as *TcpListenFcData;
oldcomm::send((*server_data_ptr).stream_closed_ch, ()); let stream_closed_ch = (*server_data_ptr).stream_closed_ch.clone();
stream_closed_ch.send(());
} }
} }
@ -1209,13 +1215,13 @@ extern fn tcp_lfc_on_connection_cb(handle: *uv::ll::uv_tcp_t,
unsafe { unsafe {
let server_data_ptr = uv::ll::get_data_for_uv_handle(handle) let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
as *TcpListenFcData; as *TcpListenFcData;
let kill_ch = (*server_data_ptr).kill_ch; let kill_ch = (*server_data_ptr).kill_ch.clone();
if (*server_data_ptr).active { if (*server_data_ptr).active {
match status { match status {
0i32 => ((*server_data_ptr).on_connect_cb)(handle), 0i32 => ((*server_data_ptr).on_connect_cb)(handle),
_ => { _ => {
let loop_ptr = uv::ll::get_loop_for_uv_handle(handle); let loop_ptr = uv::ll::get_loop_for_uv_handle(handle);
oldcomm::send(kill_ch, kill_ch.send(
Some(uv::ll::get_last_err_data(loop_ptr) Some(uv::ll::get_last_err_data(loop_ptr)
.to_tcp_err())); .to_tcp_err()));
(*server_data_ptr).active = false; (*server_data_ptr).active = false;
@ -1243,7 +1249,7 @@ enum TcpWriteResult {
} }
enum TcpReadStartResult { enum TcpReadStartResult {
TcpReadStartSuccess(oldcomm::Port<TcpReadResult>), TcpReadStartSuccess(Port<TcpReadResult>),
TcpReadStartError(TcpErrData) TcpReadStartError(TcpErrData)
} }
@ -1278,8 +1284,8 @@ extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
let err_data = uv::ll::get_last_err_data(loop_ptr).to_tcp_err(); let err_data = uv::ll::get_last_err_data(loop_ptr).to_tcp_err();
log(debug, fmt!("on_tcp_read_cb: incoming err.. name %? msg %?", log(debug, fmt!("on_tcp_read_cb: incoming err.. name %? msg %?",
err_data.err_name, err_data.err_msg)); err_data.err_name, err_data.err_msg));
let reader_ch = (*socket_data_ptr).reader_ch; let reader_ch = &(*socket_data_ptr).reader_ch;
oldcomm::send(reader_ch, result::Err(err_data)); reader_ch.send(result::Err(err_data));
} }
// do nothing .. unneeded buf // do nothing .. unneeded buf
0 => (), 0 => (),
@ -1287,10 +1293,10 @@ extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
_ => { _ => {
// we have data // we have data
log(debug, fmt!("tcp on_read_cb nread: %d", nread as int)); log(debug, fmt!("tcp on_read_cb nread: %d", nread as int));
let reader_ch = (*socket_data_ptr).reader_ch; let reader_ch = &(*socket_data_ptr).reader_ch;
let buf_base = uv::ll::get_base_from_buf(buf); let buf_base = uv::ll::get_base_from_buf(buf);
let new_bytes = vec::from_buf(buf_base, nread as uint); let new_bytes = vec::from_buf(buf_base, nread as uint);
oldcomm::send(reader_ch, result::Ok(new_bytes)); reader_ch.send(result::Ok(new_bytes));
} }
} }
uv::ll::free_base_of_buf(buf); uv::ll::free_base_of_buf(buf);
@ -1313,15 +1319,15 @@ extern fn on_alloc_cb(handle: *libc::c_void,
} }
struct TcpSocketCloseData { struct TcpSocketCloseData {
closed_ch: oldcomm::Chan<()>, closed_ch: SharedChan<()>,
} }
extern fn tcp_socket_dtor_close_cb(handle: *uv::ll::uv_tcp_t) { extern fn tcp_socket_dtor_close_cb(handle: *uv::ll::uv_tcp_t) {
unsafe { unsafe {
let data = uv::ll::get_data_for_uv_handle(handle) let data = uv::ll::get_data_for_uv_handle(handle)
as *TcpSocketCloseData; as *TcpSocketCloseData;
let closed_ch = (*data).closed_ch; let closed_ch = (*data).closed_ch.clone();
oldcomm::send(closed_ch, ()); closed_ch.send(());
log(debug, ~"tcp_socket_dtor_close_cb exiting.."); log(debug, ~"tcp_socket_dtor_close_cb exiting..");
} }
} }
@ -1333,33 +1339,35 @@ extern fn tcp_write_complete_cb(write_req: *uv::ll::uv_write_t,
as *WriteReqData; as *WriteReqData;
if status == 0i32 { if status == 0i32 {
log(debug, ~"successful write complete"); log(debug, ~"successful write complete");
oldcomm::send((*write_data_ptr).result_ch, TcpWriteSuccess); let result_ch = (*write_data_ptr).result_ch.clone();
result_ch.send(TcpWriteSuccess);
} else { } else {
let stream_handle_ptr = uv::ll::get_stream_handle_from_write_req( let stream_handle_ptr = uv::ll::get_stream_handle_from_write_req(
write_req); write_req);
let loop_ptr = uv::ll::get_loop_for_uv_handle(stream_handle_ptr); let loop_ptr = uv::ll::get_loop_for_uv_handle(stream_handle_ptr);
let err_data = uv::ll::get_last_err_data(loop_ptr); let err_data = uv::ll::get_last_err_data(loop_ptr);
log(debug, ~"failure to write"); log(debug, ~"failure to write");
oldcomm::send((*write_data_ptr).result_ch, let result_ch = (*write_data_ptr).result_ch.clone();
TcpWriteError(err_data.to_tcp_err())); result_ch.send(TcpWriteError(err_data.to_tcp_err()));
} }
} }
} }
struct WriteReqData { struct WriteReqData {
result_ch: oldcomm::Chan<TcpWriteResult>, result_ch: SharedChan<TcpWriteResult>,
} }
struct ConnectReqData { struct ConnectReqData {
result_ch: oldcomm::Chan<ConnAttempt>, result_ch: SharedChan<ConnAttempt>,
closed_signal_ch: oldcomm::Chan<()>, closed_signal_ch: SharedChan<()>,
} }
extern fn stream_error_close_cb(handle: *uv::ll::uv_tcp_t) { extern fn stream_error_close_cb(handle: *uv::ll::uv_tcp_t) {
unsafe { unsafe {
let data = uv::ll::get_data_for_uv_handle(handle) as let data = uv::ll::get_data_for_uv_handle(handle) as
*ConnectReqData; *ConnectReqData;
oldcomm::send((*data).closed_signal_ch, ()); let closed_signal_ch = (*data).closed_signal_ch.clone();
closed_signal_ch.send(());
log(debug, fmt!("exiting steam_error_close_cb for %?", handle)); log(debug, fmt!("exiting steam_error_close_cb for %?", handle));
} }
} }
@ -1375,14 +1383,14 @@ extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t,
unsafe { unsafe {
let conn_data_ptr = (uv::ll::get_data_for_req(connect_req_ptr) let conn_data_ptr = (uv::ll::get_data_for_req(connect_req_ptr)
as *ConnectReqData); as *ConnectReqData);
let result_ch = (*conn_data_ptr).result_ch; let result_ch = (*conn_data_ptr).result_ch.clone();
log(debug, fmt!("tcp_connect result_ch %?", result_ch)); log(debug, fmt!("tcp_connect result_ch %?", result_ch));
let tcp_stream_ptr = let tcp_stream_ptr =
uv::ll::get_stream_handle_from_connect_req(connect_req_ptr); uv::ll::get_stream_handle_from_connect_req(connect_req_ptr);
match status { match status {
0i32 => { 0i32 => {
log(debug, ~"successful tcp connection!"); log(debug, ~"successful tcp connection!");
oldcomm::send(result_ch, ConnSuccess); result_ch.send(ConnSuccess);
} }
_ => { _ => {
log(debug, ~"error in tcp_connect_on_connect_cb"); log(debug, ~"error in tcp_connect_on_connect_cb");
@ -1390,7 +1398,7 @@ extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t,
let err_data = uv::ll::get_last_err_data(loop_ptr); let err_data = uv::ll::get_last_err_data(loop_ptr);
log(debug, fmt!("err_data %? %?", err_data.err_name, log(debug, fmt!("err_data %? %?", err_data.err_name,
err_data.err_msg)); err_data.err_msg));
oldcomm::send(result_ch, ConnFailure(err_data)); result_ch.send(ConnFailure(err_data));
uv::ll::set_data_for_uv_handle(tcp_stream_ptr, uv::ll::set_data_for_uv_handle(tcp_stream_ptr,
conn_data_ptr); conn_data_ptr);
uv::ll::close(tcp_stream_ptr, stream_error_close_cb); uv::ll::close(tcp_stream_ptr, stream_error_close_cb);
@ -1406,8 +1414,8 @@ enum ConnAttempt {
} }
struct TcpSocketData { struct TcpSocketData {
reader_po: oldcomm::Port<result::Result<~[u8], TcpErrData>>, reader_po: @Port<result::Result<~[u8], TcpErrData>>,
reader_ch: oldcomm::Chan<result::Result<~[u8], TcpErrData>>, reader_ch: SharedChan<result::Result<~[u8], TcpErrData>>,
stream_handle_ptr: *uv::ll::uv_tcp_t, stream_handle_ptr: *uv::ll::uv_tcp_t,
connect_req: uv::ll::uv_connect_t, connect_req: uv::ll::uv_connect_t,
write_req: uv::ll::uv_write_t, write_req: uv::ll::uv_write_t,
@ -1431,7 +1439,7 @@ pub mod test {
use uv; use uv;
use core::io; use core::io;
use core::oldcomm; use core::pipes::{stream, Chan, Port, SharedChan};
use core::prelude::*; use core::prelude::*;
use core::result; use core::result;
use core::str; use core::str;
@ -1546,39 +1554,33 @@ pub mod test {
let expected_req = ~"ping"; let expected_req = ~"ping";
let expected_resp = ~"pong"; let expected_resp = ~"pong";
let server_result_po = oldcomm::Port::<~str>(); let (server_result_po, server_result_ch) = stream::<~str>();
let server_result_ch = oldcomm::Chan(&server_result_po);
let cont_po = oldcomm::Port::<()>(); let (cont_po, cont_ch) = stream::<()>();
let cont_ch = oldcomm::Chan(&cont_po); let cont_ch = SharedChan(cont_ch);
// server // server
let hl_loop_clone = hl_loop.clone(); let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) { do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| { let cont_ch = cont_ch.clone();
run_tcp_test_server( let actual_req = run_tcp_test_server(
server_ip, server_ip,
server_port, server_port,
expected_resp, expected_resp,
server_ch, cont_ch.clone(),
cont_ch, &hl_loop_clone);
&hl_loop_clone)
};
server_result_ch.send(actual_req); server_result_ch.send(actual_req);
}; };
oldcomm::recv(cont_po); cont_po.recv();
// client // client
debug!("server started, firing up client.."); debug!("server started, firing up client..");
let actual_resp_result = do oldcomm::listen |client_ch| { let actual_resp_result = run_tcp_test_client(
run_tcp_test_client(
server_ip, server_ip,
server_port, server_port,
expected_req, expected_req,
client_ch, hl_loop);
hl_loop)
};
assert actual_resp_result.is_ok(); assert actual_resp_result.is_ok();
let actual_resp = actual_resp_result.get(); let actual_resp = actual_resp_result.get();
let actual_req = oldcomm::recv(server_result_po); let actual_req = server_result_po.recv();
debug!("REQ: expected: '%s' actual: '%s'", debug!("REQ: expected: '%s' actual: '%s'",
expected_req, actual_req); expected_req, actual_req);
debug!("RESP: expected: '%s' actual: '%s'", debug!("RESP: expected: '%s' actual: '%s'",
@ -1592,29 +1594,22 @@ pub mod test {
let server_port = 8887u; let server_port = 8887u;
let expected_resp = ~"pong"; let expected_resp = ~"pong";
let server_result_po = oldcomm::Port::<~str>(); let (cont_po, cont_ch) = stream::<()>();
let server_result_ch = oldcomm::Chan(&server_result_po); let cont_ch = SharedChan(cont_ch);
let cont_po = oldcomm::Port::<()>();
let cont_ch = oldcomm::Chan(&cont_po);
// server // server
let hl_loop_clone = hl_loop.clone(); let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) { do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| { let cont_ch = cont_ch.clone();
run_tcp_test_server( run_tcp_test_server(
server_ip, server_ip,
server_port, server_port,
expected_resp, expected_resp,
server_ch, cont_ch.clone(),
cont_ch, &hl_loop_clone);
&hl_loop_clone)
}; };
server_result_ch.send(actual_req); cont_po.recv();
};
oldcomm::recv(cont_po);
// client // client
debug!("server started, firing up client.."); debug!("server started, firing up client..");
do oldcomm::listen |client_ch| {
let server_ip_addr = ip::v4::parse_addr(server_ip); let server_ip_addr = ip::v4::parse_addr(server_ip);
let iotask = uv::global_loop::get(); let iotask = uv::global_loop::get();
let connect_result = connect(move server_ip_addr, server_port, let connect_result = connect(move server_ip_addr, server_port,
@ -1632,10 +1627,8 @@ pub mod test {
let resp_bytes = str::to_bytes(~"ping"); let resp_bytes = str::to_bytes(~"ping");
tcp_write_single(&sock, resp_bytes); tcp_write_single(&sock, resp_bytes);
debug!("message sent"); debug!("message sent");
let read_result = sock.read(0u); sock.read(0u);
client_ch.send(str::from_bytes(read_result.get()));
debug!("result read"); debug!("result read");
};
} }
pub fn impl_gl_tcp_ipv4_client_error_connection_refused() { pub fn impl_gl_tcp_ipv4_client_error_connection_refused() {
let hl_loop = &uv::global_loop::get(); let hl_loop = &uv::global_loop::get();
@ -1644,14 +1637,11 @@ pub mod test {
let expected_req = ~"ping"; let expected_req = ~"ping";
// client // client
debug!("firing up client.."); debug!("firing up client..");
let actual_resp_result = do oldcomm::listen |client_ch| { let actual_resp_result = run_tcp_test_client(
run_tcp_test_client(
server_ip, server_ip,
server_port, server_port,
expected_req, expected_req,
client_ch, hl_loop);
hl_loop)
};
match actual_resp_result.get_err() { match actual_resp_result.get_err() {
ConnectionRefused => (), ConnectionRefused => (),
_ => fail ~"unknown error.. expected connection_refused" _ => fail ~"unknown error.. expected connection_refused"
@ -1664,26 +1654,20 @@ pub mod test {
let expected_req = ~"ping"; let expected_req = ~"ping";
let expected_resp = ~"pong"; let expected_resp = ~"pong";
let server_result_po = oldcomm::Port::<~str>(); let (cont_po, cont_ch) = stream::<()>();
let server_result_ch = oldcomm::Chan(&server_result_po); let cont_ch = SharedChan(cont_ch);
let cont_po = oldcomm::Port::<()>();
let cont_ch = oldcomm::Chan(&cont_po);
// server // server
let hl_loop_clone = hl_loop.clone(); let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) { do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| { let cont_ch = cont_ch.clone();
run_tcp_test_server( run_tcp_test_server(
server_ip, server_ip,
server_port, server_port,
expected_resp, expected_resp,
server_ch, cont_ch.clone(),
cont_ch, &hl_loop_clone);
&hl_loop_clone) }
}; cont_po.recv();
server_result_ch.send(actual_req);
};
oldcomm::recv(cont_po);
// this one should fail.. // this one should fail..
let listen_err = run_tcp_test_server_fail( let listen_err = run_tcp_test_server_fail(
server_ip, server_ip,
@ -1691,14 +1675,11 @@ pub mod test {
hl_loop); hl_loop);
// client.. just doing this so that the first server tears down // client.. just doing this so that the first server tears down
debug!("server started, firing up client.."); debug!("server started, firing up client..");
do oldcomm::listen |client_ch| {
run_tcp_test_client( run_tcp_test_client(
server_ip, server_ip,
server_port, server_port,
expected_req, expected_req,
client_ch, hl_loop);
hl_loop)
};
match listen_err { match listen_err {
AddressInUse => { AddressInUse => {
assert true; assert true;
@ -1736,26 +1717,23 @@ pub mod test {
let expected_req = ~"ping"; let expected_req = ~"ping";
let expected_resp = ~"pong"; let expected_resp = ~"pong";
let server_result_po = oldcomm::Port::<~str>(); let (server_result_po, server_result_ch) = stream::<~str>();
let server_result_ch = oldcomm::Chan(&server_result_po);
let cont_po = oldcomm::Port::<()>(); let (cont_po, cont_ch) = stream::<()>();
let cont_ch = oldcomm::Chan(&cont_po); let cont_ch = SharedChan(cont_ch);
// server // server
let iotask_clone = iotask.clone(); let iotask_clone = iotask.clone();
do task::spawn_sched(task::ManualThreads(1u)) { do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| { let cont_ch = cont_ch.clone();
run_tcp_test_server( let actual_req = run_tcp_test_server(
server_ip, server_ip,
server_port, server_port,
expected_resp, expected_resp,
server_ch, cont_ch.clone(),
cont_ch, &iotask_clone);
&iotask_clone)
};
server_result_ch.send(actual_req); server_result_ch.send(actual_req);
}; };
oldcomm::recv(cont_po); cont_po.recv();
// client // client
let server_addr = ip::v4::parse_addr(server_ip); let server_addr = ip::v4::parse_addr(server_ip);
let conn_result = connect(server_addr, server_port, iotask); let conn_result = connect(server_addr, server_port, iotask);
@ -1770,7 +1748,7 @@ pub mod test {
buf_read(sock_buf, resp_buf.len()) buf_read(sock_buf, resp_buf.len())
}; };
let actual_req = oldcomm::recv(server_result_po); let actual_req = server_result_po.recv();
log(debug, fmt!("REQ: expected: '%s' actual: '%s'", log(debug, fmt!("REQ: expected: '%s' actual: '%s'",
expected_req, actual_req)); expected_req, actual_req));
log(debug, fmt!("RESP: expected: '%s' actual: '%s'", log(debug, fmt!("RESP: expected: '%s' actual: '%s'",
@ -1788,26 +1766,20 @@ pub mod test {
let expected_req = ~"GET /"; let expected_req = ~"GET /";
let expected_resp = ~"A string\nwith multiple lines\n"; let expected_resp = ~"A string\nwith multiple lines\n";
let server_result_po = oldcomm::Port::<~str>(); let (cont_po, cont_ch) = stream::<()>();
let server_result_ch = oldcomm::Chan(&server_result_po); let cont_ch = SharedChan(cont_ch);
let cont_po = oldcomm::Port::<()>();
let cont_ch = oldcomm::Chan(&cont_po);
// server // server
let hl_loop_clone = hl_loop.clone(); let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) { do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do oldcomm::listen |server_ch| { let cont_ch = cont_ch.clone();
run_tcp_test_server( run_tcp_test_server(
server_ip, server_ip,
server_port, server_port,
expected_resp, expected_resp,
server_ch, cont_ch.clone(),
cont_ch, &hl_loop_clone);
&hl_loop_clone)
}; };
server_result_ch.send(actual_req); cont_po.recv();
};
oldcomm::recv(cont_po);
// client // client
debug!("server started, firing up client.."); debug!("server started, firing up client..");
let server_addr = ip::v4::parse_addr(server_ip); let server_addr = ip::v4::parse_addr(server_ip);
@ -1841,22 +1813,25 @@ pub mod test {
} }
fn run_tcp_test_server(server_ip: &str, server_port: uint, resp: ~str, fn run_tcp_test_server(server_ip: &str, server_port: uint, resp: ~str,
server_ch: oldcomm::Chan<~str>, cont_ch: SharedChan<()>,
cont_ch: oldcomm::Chan<()>,
iotask: &IoTask) -> ~str { iotask: &IoTask) -> ~str {
let (server_po, server_ch) = stream::<~str>();
let server_ch = SharedChan(server_ch);
let server_ip_addr = ip::v4::parse_addr(server_ip); let server_ip_addr = ip::v4::parse_addr(server_ip);
let listen_result = listen(move server_ip_addr, server_port, 128, let listen_result = listen(move server_ip_addr, server_port, 128,
iotask, iotask,
// on_establish_cb -- called when listener is set up // on_establish_cb -- called when listener is set up
|kill_ch| { |kill_ch| {
debug!("establish_cb %?", kill_ch); debug!("establish_cb %?",
oldcomm::send(cont_ch, ()); kill_ch);
cont_ch.send(());
}, },
// risky to run this on the loop, but some users // risky to run this on the loop, but some users
// will want the POWER // will want the POWER
|new_conn, kill_ch| { |new_conn, kill_ch| {
debug!("SERVER: new connection!"); debug!("SERVER: new connection!");
do oldcomm::listen |cont_ch| { let (cont_po, cont_ch) = stream();
let server_ch = server_ch.clone();
do task::spawn_sched(task::ManualThreads(1u)) { do task::spawn_sched(task::ManualThreads(1u)) {
debug!("SERVER: starting worker for new req"); debug!("SERVER: starting worker for new req");
@ -1865,8 +1840,9 @@ pub mod test {
if result::is_err(&accept_result) { if result::is_err(&accept_result) {
debug!("SERVER: error accept connection"); debug!("SERVER: error accept connection");
let err_data = result::get_err(&accept_result); let err_data = result::get_err(&accept_result);
oldcomm::send(kill_ch, Some(err_data)); kill_ch.send(Some(err_data));
debug!("SERVER/WORKER: send on err cont ch"); debug!(
"SERVER/WORKER: send on err cont ch");
cont_ch.send(()); cont_ch.send(());
} }
else { else {
@ -1889,12 +1865,12 @@ pub mod test {
debug!("SERVER: before write"); debug!("SERVER: before write");
tcp_write_single(&sock, str::to_bytes(resp)); tcp_write_single(&sock, str::to_bytes(resp));
debug!("SERVER: after write.. die"); debug!("SERVER: after write.. die");
oldcomm::send(kill_ch, None); kill_ch.send(None);
} }
result::Err(move err_data) => { result::Err(move err_data) => {
debug!("SERVER: error recvd: %s %s", debug!("SERVER: error recvd: %s %s",
err_data.err_name, err_data.err_msg); err_data.err_name, err_data.err_msg);
oldcomm::send(kill_ch, Some(err_data)); kill_ch.send(Some(err_data));
server_ch.send(~""); server_ch.send(~"");
} }
} }
@ -1902,9 +1878,7 @@ pub mod test {
} }
} }
debug!("SERVER: waiting to recv on cont_ch"); debug!("SERVER: waiting to recv on cont_ch");
cont_ch.recv() cont_po.recv();
};
debug!("SERVER: recv'd on cont_ch..leaving listen cb");
}); });
// err check on listen_result // err check on listen_result
if result::is_err(&listen_result) { if result::is_err(&listen_result) {
@ -1921,7 +1895,7 @@ pub mod test {
} }
} }
} }
let ret_val = server_ch.recv(); let ret_val = server_po.recv();
debug!("SERVER: exited and got return val: '%s'", ret_val); debug!("SERVER: exited and got return val: '%s'", ret_val);
ret_val ret_val
} }
@ -1949,7 +1923,6 @@ pub mod test {
} }
fn run_tcp_test_client(server_ip: &str, server_port: uint, resp: &str, fn run_tcp_test_client(server_ip: &str, server_port: uint, resp: &str,
client_ch: oldcomm::Chan<~str>,
iotask: &IoTask) -> result::Result<~str, iotask: &IoTask) -> result::Result<~str,
TcpConnectErrData> { TcpConnectErrData> {
let server_ip_addr = ip::v4::parse_addr(server_ip); let server_ip_addr = ip::v4::parse_addr(server_ip);
@ -1972,9 +1945,9 @@ pub mod test {
Ok(~"") Ok(~"")
} }
else { else {
client_ch.send(str::from_bytes(read_result.get())); let ret_val = str::from_bytes(read_result.get());
let ret_val = client_ch.recv(); debug!("CLIENT: after client_ch recv ret: '%s'",
debug!("CLIENT: after client_ch recv ret: '%s'", ret_val); ret_val);
Ok(ret_val) Ok(ret_val)
} }
} }

View file

@ -27,7 +27,7 @@ use core::either;
use core::io::WriterUtil; use core::io::WriterUtil;
use core::io; use core::io;
use core::libc::size_t; use core::libc::size_t;
use core::oldcomm; use core::pipes::{stream, Chan, Port, SharedChan};
use core::option; use core::option;
use core::prelude::*; use core::prelude::*;
use core::result; use core::result;
@ -305,8 +305,8 @@ fn run_tests(opts: &TestOpts,
let mut wait_idx = 0; let mut wait_idx = 0;
let mut done_idx = 0; let mut done_idx = 0;
let p = oldcomm::Port(); let (p, ch) = stream();
let ch = oldcomm::Chan(&p); let ch = SharedChan(ch);
while done_idx < total { while done_idx < total {
while wait_idx < concurrency && run_idx < total { while wait_idx < concurrency && run_idx < total {
@ -317,12 +317,12 @@ fn run_tests(opts: &TestOpts,
// that hang forever. // that hang forever.
callback(TeWait(copy test)); callback(TeWait(copy test));
} }
run_test(move test, ch); run_test(move test, ch.clone());
wait_idx += 1; wait_idx += 1;
run_idx += 1; run_idx += 1;
} }
let (test, result) = oldcomm::recv(p); let (test, result) = p.recv();
if concurrency != 1 { if concurrency != 1 {
callback(TeWait(copy test)); callback(TeWait(copy test));
} }
@ -406,9 +406,9 @@ struct TestFuture {
wait: fn@() -> TestResult, wait: fn@() -> TestResult,
} }
pub fn run_test(test: TestDesc, monitor_ch: oldcomm::Chan<MonitorMsg>) { pub fn run_test(test: TestDesc, monitor_ch: SharedChan<MonitorMsg>) {
if test.ignore { if test.ignore {
oldcomm::send(monitor_ch, (copy test, TrIgnored)); monitor_ch.send((copy test, TrIgnored));
return; return;
} }
@ -420,7 +420,7 @@ pub fn run_test(test: TestDesc, monitor_ch: oldcomm::Chan<MonitorMsg>) {
}).spawn(move testfn); }).spawn(move testfn);
let task_result = option::unwrap(move result_future).recv(); let task_result = option::unwrap(move result_future).recv();
let test_result = calc_result(&test, task_result == task::Success); let test_result = calc_result(&test, task_result == task::Success);
oldcomm::send(monitor_ch, (copy test, test_result)); monitor_ch.send((copy test, test_result));
}; };
} }
@ -440,7 +440,7 @@ mod tests {
use test::{TestOpts, run_test}; use test::{TestOpts, run_test};
use core::either; use core::either;
use core::oldcomm; use core::pipes::{stream, SharedChan};
use core::option; use core::option;
use core::vec; use core::vec;
@ -453,10 +453,10 @@ mod tests {
ignore: true, ignore: true,
should_fail: false should_fail: false
}; };
let p = oldcomm::Port(); let (p, ch) = stream();
let ch = oldcomm::Chan(&p); let ch = SharedChan(ch);
run_test(desc, ch); run_test(desc, ch);
let (_, res) = oldcomm::recv(p); let (_, res) = p.recv();
assert res != TrOk; assert res != TrOk;
} }
@ -469,10 +469,10 @@ mod tests {
ignore: true, ignore: true,
should_fail: false should_fail: false
}; };
let p = oldcomm::Port(); let (p, ch) = stream();
let ch = oldcomm::Chan(&p); let ch = SharedChan(ch);
run_test(desc, ch); run_test(desc, ch);
let (_, res) = oldcomm::recv(p); let (_, res) = p.recv();
assert res == TrIgnored; assert res == TrIgnored;
} }
@ -486,10 +486,10 @@ mod tests {
ignore: false, ignore: false,
should_fail: true should_fail: true
}; };
let p = oldcomm::Port(); let (p, ch) = stream();
let ch = oldcomm::Chan(&p); let ch = SharedChan(ch);
run_test(desc, ch); run_test(desc, ch);
let (_, res) = oldcomm::recv(p); let (_, res) = p.recv();
assert res == TrOk; assert res == TrOk;
} }
@ -502,10 +502,10 @@ mod tests {
ignore: false, ignore: false,
should_fail: true should_fail: true
}; };
let p = oldcomm::Port(); let (p, ch) = stream();
let ch = oldcomm::Chan(&p); let ch = SharedChan(ch);
run_test(desc, ch); run_test(desc, ch);
let (_, res) = oldcomm::recv(p); let (_, res) = p.recv();
assert res == TrFailed; assert res == TrFailed;
} }

View file

@ -18,7 +18,9 @@ use uv::iotask::IoTask;
use core::either; use core::either;
use core::libc; use core::libc;
use core::oldcomm; use core::libc::c_void;
use core::cast::transmute;
use core::pipes::{stream, Chan, SharedChan, Port, select2i};
use core::prelude::*; use core::prelude::*;
use core::ptr; use core::ptr;
use core; use core;
@ -41,12 +43,11 @@ use core;
*/ */
pub fn delayed_send<T: Owned>(iotask: &IoTask, pub fn delayed_send<T: Owned>(iotask: &IoTask,
msecs: uint, msecs: uint,
ch: oldcomm::Chan<T>, ch: &Chan<T>,
val: T) { val: T) {
unsafe { unsafe {
let timer_done_po = oldcomm::Port::<()>(); let (timer_done_po, timer_done_ch) = stream::<()>();
let timer_done_ch = oldcomm::Chan(&timer_done_po); let timer_done_ch = SharedChan(timer_done_ch);
let timer_done_ch_ptr = ptr::addr_of(&timer_done_ch);
let timer = uv::ll::timer_t(); let timer = uv::ll::timer_t();
let timer_ptr = ptr::addr_of(&timer); let timer_ptr = ptr::addr_of(&timer);
do iotask::interact(iotask) |loop_ptr| { do iotask::interact(iotask) |loop_ptr| {
@ -56,9 +57,15 @@ pub fn delayed_send<T: Owned>(iotask: &IoTask,
let start_result = uv::ll::timer_start( let start_result = uv::ll::timer_start(
timer_ptr, delayed_send_cb, msecs, 0u); timer_ptr, delayed_send_cb, msecs, 0u);
if (start_result == 0i32) { if (start_result == 0i32) {
// Note: putting the channel into a ~
// to cast to *c_void
let timer_done_ch_clone = ~timer_done_ch.clone();
let timer_done_ch_ptr = transmute::<
~SharedChan<()>, *c_void>(
timer_done_ch_clone);
uv::ll::set_data_for_uv_handle( uv::ll::set_data_for_uv_handle(
timer_ptr, timer_ptr,
timer_done_ch_ptr as *libc::c_void); timer_done_ch_ptr);
} else { } else {
let error_msg = uv::ll::get_last_err_info( let error_msg = uv::ll::get_last_err_info(
loop_ptr); loop_ptr);
@ -73,11 +80,11 @@ pub fn delayed_send<T: Owned>(iotask: &IoTask,
} }
}; };
// delayed_send_cb has been processed by libuv // delayed_send_cb has been processed by libuv
oldcomm::recv(timer_done_po); timer_done_po.recv();
// notify the caller immediately // notify the caller immediately
oldcomm::send(ch, move(val)); ch.send(val);
// uv_close for this timer has been processed // uv_close for this timer has been processed
oldcomm::recv(timer_done_po); timer_done_po.recv();
}; };
} }
@ -93,10 +100,9 @@ pub fn delayed_send<T: Owned>(iotask: &IoTask,
* * msecs - an amount of time, in milliseconds, for the current task to block * * msecs - an amount of time, in milliseconds, for the current task to block
*/ */
pub fn sleep(iotask: &IoTask, msecs: uint) { pub fn sleep(iotask: &IoTask, msecs: uint) {
let exit_po = oldcomm::Port::<()>(); let (exit_po, exit_ch) = stream::<()>();
let exit_ch = oldcomm::Chan(&exit_po); delayed_send(iotask, msecs, &exit_ch, ());
delayed_send(iotask, msecs, exit_ch, ()); exit_po.recv();
oldcomm::recv(exit_po);
} }
/** /**
@ -121,20 +127,17 @@ pub fn sleep(iotask: &IoTask, msecs: uint) {
*/ */
pub fn recv_timeout<T: Copy Owned>(iotask: &IoTask, pub fn recv_timeout<T: Copy Owned>(iotask: &IoTask,
msecs: uint, msecs: uint,
wait_po: oldcomm::Port<T>) wait_po: &Port<T>)
-> Option<T> { -> Option<T> {
let timeout_po = oldcomm::Port::<()>(); let (timeout_po, timeout_ch) = stream::<()>();
let timeout_ch = oldcomm::Chan(&timeout_po); delayed_send(iotask, msecs, &timeout_ch, ());
delayed_send(iotask, msecs, timeout_ch, ());
// FIXME: This could be written clearer (#2618) // FIXME: This could be written clearer (#2618)
either::either( either::either(
|left_val| { |_| {
log(debug, fmt!("recv_time .. left_val %?",
left_val));
None None
}, |right_val| { }, |_| {
Some(*right_val) Some(wait_po.recv())
}, &oldcomm::select2(timeout_po, wait_po) }, &select2i(&timeout_po, wait_po)
) )
} }
@ -144,11 +147,14 @@ extern fn delayed_send_cb(handle: *uv::ll::uv_timer_t,
unsafe { unsafe {
log(debug, log(debug,
fmt!("delayed_send_cb handle %? status %?", handle, status)); fmt!("delayed_send_cb handle %? status %?", handle, status));
let timer_done_ch = // Faking a borrowed pointer to our ~SharedChan
*(uv::ll::get_data_for_uv_handle(handle) as *oldcomm::Chan<()>); let timer_done_ch_ptr: &*c_void = &uv::ll::get_data_for_uv_handle(
handle);
let timer_done_ch_ptr = transmute::<&*c_void, &~SharedChan<()>>(
timer_done_ch_ptr);
let stop_result = uv::ll::timer_stop(handle); let stop_result = uv::ll::timer_stop(handle);
if (stop_result == 0i32) { if (stop_result == 0i32) {
oldcomm::send(timer_done_ch, ()); timer_done_ch_ptr.send(());
uv::ll::close(handle, delayed_send_close_cb); uv::ll::close(handle, delayed_send_close_cb);
} else { } else {
let loop_ptr = uv::ll::get_loop_for_uv_handle(handle); let loop_ptr = uv::ll::get_loop_for_uv_handle(handle);
@ -161,9 +167,10 @@ extern fn delayed_send_cb(handle: *uv::ll::uv_timer_t,
extern fn delayed_send_close_cb(handle: *uv::ll::uv_timer_t) { extern fn delayed_send_close_cb(handle: *uv::ll::uv_timer_t) {
unsafe { unsafe {
log(debug, fmt!("delayed_send_close_cb handle %?", handle)); log(debug, fmt!("delayed_send_close_cb handle %?", handle));
let timer_done_ch = let timer_done_ch_ptr = uv::ll::get_data_for_uv_handle(handle);
*(uv::ll::get_data_for_uv_handle(handle) as *oldcomm::Chan<()>); let timer_done_ch = transmute::<*c_void, ~SharedChan<()>>(
oldcomm::send(timer_done_ch, ()); timer_done_ch_ptr);
timer_done_ch.send(());
} }
} }
@ -175,9 +182,9 @@ mod test {
use uv; use uv;
use core::iter; use core::iter;
use core::oldcomm;
use core::rand; use core::rand;
use core::task; use core::task;
use core::pipes::{stream, SharedChan};
#[test] #[test]
pub fn test_gl_timer_simple_sleep_test() { pub fn test_gl_timer_simple_sleep_test() {
@ -195,8 +202,8 @@ mod test {
#[test] #[test]
pub fn test_gl_timer_sleep_stress2() { pub fn test_gl_timer_sleep_stress2() {
let po = oldcomm::Port(); let (po, ch) = stream();
let ch = oldcomm::Chan(&po); let ch = SharedChan(ch);
let hl_loop = &uv::global_loop::get(); let hl_loop = &uv::global_loop::get();
let repeat = 20u; let repeat = 20u;
@ -210,8 +217,10 @@ mod test {
for iter::repeat(repeat) { for iter::repeat(repeat) {
let ch = ch.clone();
for spec.each |spec| { for spec.each |spec| {
let (times, maxms) = *spec; let (times, maxms) = *spec;
let ch = ch.clone();
let hl_loop_clone = hl_loop.clone(); let hl_loop_clone = hl_loop.clone();
do task::spawn { do task::spawn {
use rand::*; use rand::*;
@ -219,13 +228,13 @@ mod test {
for iter::repeat(times) { for iter::repeat(times) {
sleep(&hl_loop_clone, rng.next() as uint % maxms); sleep(&hl_loop_clone, rng.next() as uint % maxms);
} }
oldcomm::send(ch, ()); ch.send(());
} }
} }
} }
for iter::repeat(repeat * spec.len()) { for iter::repeat(repeat * spec.len()) {
oldcomm::recv(po) po.recv()
} }
} }
@ -246,14 +255,13 @@ mod test {
task::yield(); task::yield();
let expected = rand::rng().gen_str(16u); let expected = rand::rng().gen_str(16u);
let test_po = core::comm::port::<str>(); let (test_po, test_ch) = stream::<~str>();
let test_ch = core::comm::chan(test_po);
do task::spawn() { do task::spawn() {
delayed_send(hl_loop, 1u, test_ch, expected); delayed_send(hl_loop, 1u, &test_ch, expected);
}; };
match recv_timeout(hl_loop, 10u, test_po) { match recv_timeout(hl_loop, 10u, &test_po) {
Some(val) => { Some(val) => {
assert val == expected; assert val == expected;
successes += 1; successes += 1;
@ -274,14 +282,13 @@ mod test {
for iter::repeat(times as uint) { for iter::repeat(times as uint) {
let expected = rand::Rng().gen_str(16u); let expected = rand::Rng().gen_str(16u);
let test_po = oldcomm::Port::<~str>(); let (test_po, test_ch) = stream::<~str>();
let test_ch = oldcomm::Chan(&test_po);
let hl_loop_clone = hl_loop.clone(); let hl_loop_clone = hl_loop.clone();
do task::spawn() { do task::spawn() {
delayed_send(&hl_loop_clone, 50u, test_ch, expected); delayed_send(&hl_loop_clone, 50u, &test_ch, expected);
}; };
match recv_timeout(&hl_loop, 1u, test_po) { match recv_timeout(&hl_loop, 1u, &test_po) {
None => successes += 1, None => successes += 1,
_ => failures += 1 _ => failures += 1
}; };

View file

@ -209,16 +209,17 @@ mod test {
use core::iter; use core::iter;
use core::libc; use core::libc;
use core::oldcomm;
use core::ptr; use core::ptr;
use core::task; use core::task;
use core::pipes::{stream, Chan, SharedChan, Port};
extern fn async_close_cb(handle: *ll::uv_async_t) { extern fn async_close_cb(handle: *ll::uv_async_t) {
unsafe { unsafe {
log(debug, fmt!("async_close_cb handle %?", handle)); log(debug, fmt!("async_close_cb handle %?", handle));
let exit_ch = (*(ll::get_data_for_uv_handle(handle) let exit_ch = &(*(ll::get_data_for_uv_handle(handle)
as *AhData)).exit_ch; as *AhData)).exit_ch;
oldcomm::send(exit_ch, ()); let exit_ch = exit_ch.clone();
exit_ch.send(());
} }
} }
extern fn async_handle_cb(handle: *ll::uv_async_t, status: libc::c_int) { extern fn async_handle_cb(handle: *ll::uv_async_t, status: libc::c_int) {
@ -230,17 +231,16 @@ mod test {
} }
struct AhData { struct AhData {
iotask: IoTask, iotask: IoTask,
exit_ch: oldcomm::Chan<()> exit_ch: SharedChan<()>
} }
fn impl_uv_iotask_async(iotask: &IoTask) { fn impl_uv_iotask_async(iotask: &IoTask) {
unsafe { unsafe {
let async_handle = ll::async_t(); let async_handle = ll::async_t();
let ah_ptr = ptr::addr_of(&async_handle); let ah_ptr = ptr::addr_of(&async_handle);
let exit_po = oldcomm::Port::<()>(); let (exit_po, exit_ch) = stream::<()>();
let exit_ch = oldcomm::Chan(&exit_po);
let ah_data = AhData { let ah_data = AhData {
iotask: iotask.clone(), iotask: iotask.clone(),
exit_ch: exit_ch exit_ch: SharedChan(exit_ch)
}; };
let ah_data_ptr: *AhData = unsafe { let ah_data_ptr: *AhData = unsafe {
ptr::to_unsafe_ptr(&ah_data) ptr::to_unsafe_ptr(&ah_data)
@ -256,13 +256,13 @@ mod test {
} }
}; };
debug!("waiting for async close"); debug!("waiting for async close");
oldcomm::recv(exit_po); exit_po.recv();
} }
} }
// this fn documents the bear minimum neccesary to roll your own // this fn documents the bear minimum neccesary to roll your own
// high_level_loop // high_level_loop
unsafe fn spawn_test_loop(exit_ch: oldcomm::Chan<()>) -> IoTask { unsafe fn spawn_test_loop(exit_ch: ~Chan<()>) -> IoTask {
let (iotask_port, iotask_ch) = stream::<IoTask>(); let (iotask_port, iotask_ch) = stream::<IoTask>();
do task::spawn_sched(task::ManualThreads(1u)) { do task::spawn_sched(task::ManualThreads(1u)) {
debug!("about to run a test loop"); debug!("about to run a test loop");
@ -287,9 +287,8 @@ mod test {
#[test] #[test]
fn test_uv_iotask_async() { fn test_uv_iotask_async() {
unsafe { unsafe {
let exit_po = oldcomm::Port::<()>(); let (exit_po, exit_ch) = stream::<()>();
let exit_ch = oldcomm::Chan(&exit_po); let iotask = &spawn_test_loop(~exit_ch);
let iotask = &spawn_test_loop(exit_ch);
debug!("spawned iotask"); debug!("spawned iotask");
@ -300,24 +299,25 @@ mod test {
// race-condition type situations.. this ensures that the // race-condition type situations.. this ensures that the
// loop lives until, at least, all of the // loop lives until, at least, all of the
// impl_uv_hl_async() runs have been called, at least. // impl_uv_hl_async() runs have been called, at least.
let work_exit_po = oldcomm::Port::<()>(); let (work_exit_po, work_exit_ch) = stream::<()>();
let work_exit_ch = oldcomm::Chan(&work_exit_po); let work_exit_ch = SharedChan(work_exit_ch);
for iter::repeat(7u) { for iter::repeat(7u) {
let iotask_clone = iotask.clone(); let iotask_clone = iotask.clone();
let work_exit_ch_clone = work_exit_ch.clone();
do task::spawn_sched(task::ManualThreads(1u)) { do task::spawn_sched(task::ManualThreads(1u)) {
debug!("async"); debug!("async");
impl_uv_iotask_async(&iotask_clone); impl_uv_iotask_async(&iotask_clone);
debug!("done async"); debug!("done async");
oldcomm::send(work_exit_ch, ()); work_exit_ch_clone.send(());
}; };
}; };
for iter::repeat(7u) { for iter::repeat(7u) {
debug!("waiting"); debug!("waiting");
oldcomm::recv(work_exit_po); work_exit_po.recv();
}; };
log(debug, ~"sending teardown_loop msg.."); log(debug, ~"sending teardown_loop msg..");
exit(iotask); exit(iotask);
oldcomm::recv(exit_po); exit_po.recv();
log(debug, ~"after recv on exit_po.. exiting.."); log(debug, ~"after recv on exit_po.. exiting..");
} }
} }

View file

@ -39,6 +39,7 @@ use core::ptr::to_unsafe_ptr;
use core::ptr; use core::ptr;
use core::str; use core::str;
use core::vec; use core::vec;
use core::pipes::{stream, Chan, SharedChan, Port};
// libuv struct mappings // libuv struct mappings
pub struct uv_ip4_addr { pub struct uv_ip4_addr {
@ -1132,7 +1133,6 @@ pub mod test {
use uv_ll::*; use uv_ll::*;
use core::libc; use core::libc;
use core::oldcomm;
use core::ptr; use core::ptr;
use core::str; use core::str;
use core::sys; use core::sys;
@ -1148,7 +1148,7 @@ pub mod test {
struct request_wrapper { struct request_wrapper {
write_req: *uv_write_t, write_req: *uv_write_t,
req_buf: *~[uv_buf_t], req_buf: *~[uv_buf_t],
read_chan: *oldcomm::Chan<~str>, read_chan: SharedChan<~str>,
} }
extern fn after_close_cb(handle: *libc::c_void) { extern fn after_close_cb(handle: *libc::c_void) {
@ -1187,9 +1187,9 @@ pub mod test {
let buf_base = get_base_from_buf(buf); let buf_base = get_base_from_buf(buf);
let buf_len = get_len_from_buf(buf); let buf_len = get_len_from_buf(buf);
let bytes = vec::from_buf(buf_base, buf_len as uint); let bytes = vec::from_buf(buf_base, buf_len as uint);
let read_chan = *((*client_data).read_chan); let read_chan = (*client_data).read_chan.clone();
let msg_from_server = str::from_bytes(bytes); let msg_from_server = str::from_bytes(bytes);
oldcomm::send(read_chan, msg_from_server); read_chan.send(msg_from_server);
close(stream as *libc::c_void, after_close_cb) close(stream as *libc::c_void, after_close_cb)
} }
else if (nread == -1) { else if (nread == -1) {
@ -1257,7 +1257,7 @@ pub mod test {
} }
fn impl_uv_tcp_request(ip: &str, port: int, req_str: &str, fn impl_uv_tcp_request(ip: &str, port: int, req_str: &str,
client_chan: *oldcomm::Chan<~str>) { client_chan: SharedChan<~str>) {
unsafe { unsafe {
let test_loop = loop_new(); let test_loop = loop_new();
let tcp_handle = tcp_t(); let tcp_handle = tcp_t();
@ -1283,9 +1283,11 @@ pub mod test {
log(debug, fmt!("tcp req: tcp stream: %d write_handle: %d", log(debug, fmt!("tcp req: tcp stream: %d write_handle: %d",
tcp_handle_ptr as int, tcp_handle_ptr as int,
write_handle_ptr as int)); write_handle_ptr as int));
let client_data = { writer_handle: write_handle_ptr, let client_data = request_wrapper {
write_req: write_handle_ptr,
req_buf: ptr::addr_of(&req_msg), req_buf: ptr::addr_of(&req_msg),
read_chan: client_chan }; read_chan: client_chan
};
let tcp_init_result = tcp_init( let tcp_init_result = tcp_init(
test_loop as *libc::c_void, tcp_handle_ptr); test_loop as *libc::c_void, tcp_handle_ptr);
@ -1388,8 +1390,8 @@ pub mod test {
log(debug, ~"SERVER: client req contains kill_msg!"); log(debug, ~"SERVER: client req contains kill_msg!");
log(debug, ~"SERVER: sending response to client"); log(debug, ~"SERVER: sending response to client");
read_stop(client_stream_ptr); read_stop(client_stream_ptr);
let server_chan = *((*client_data).server_chan); let server_chan = (*client_data).server_chan.clone();
oldcomm::send(server_chan, request_str); server_chan.send(request_str);
let write_result = write( let write_result = write(
write_req, write_req,
client_stream_ptr as *libc::c_void, client_stream_ptr as *libc::c_void,
@ -1484,12 +1486,12 @@ pub mod test {
server: *uv_tcp_t, server: *uv_tcp_t,
server_kill_msg: ~str, server_kill_msg: ~str,
server_resp_buf: *~[uv_buf_t], server_resp_buf: *~[uv_buf_t],
server_chan: *oldcomm::Chan<~str>, server_chan: SharedChan<~str>,
server_write_req: *uv_write_t, server_write_req: *uv_write_t,
} }
struct async_handle_data { struct async_handle_data {
continue_chan: *oldcomm::Chan<bool>, continue_chan: SharedChan<bool>,
} }
extern fn async_close_cb(handle: *libc::c_void) { extern fn async_close_cb(handle: *libc::c_void) {
@ -1506,9 +1508,9 @@ pub mod test {
// do its thang // do its thang
let data = get_data_for_uv_handle( let data = get_data_for_uv_handle(
async_handle as *libc::c_void) as *async_handle_data; async_handle as *libc::c_void) as *async_handle_data;
let continue_chan = *((*data).continue_chan); let continue_chan = (*data).continue_chan.clone();
let should_continue = status == 0i32; let should_continue = status == 0i32;
oldcomm::send(continue_chan, should_continue); continue_chan.send(should_continue);
close(async_handle as *libc::c_void, async_close_cb); close(async_handle as *libc::c_void, async_close_cb);
} }
} }
@ -1517,8 +1519,8 @@ pub mod test {
server_port: int, server_port: int,
+kill_server_msg: ~str, +kill_server_msg: ~str,
+server_resp_msg: ~str, +server_resp_msg: ~str,
server_chan: *oldcomm::Chan<~str>, server_chan: SharedChan<~str>,
continue_chan: *oldcomm::Chan<bool>) { continue_chan: SharedChan<bool>) {
unsafe { unsafe {
let test_loop = loop_new(); let test_loop = loop_new();
let tcp_server = tcp_t(); let tcp_server = tcp_t();
@ -1626,36 +1628,35 @@ pub mod test {
let port = 8886; let port = 8886;
let kill_server_msg = ~"does a dog have buddha nature?"; let kill_server_msg = ~"does a dog have buddha nature?";
let server_resp_msg = ~"mu!"; let server_resp_msg = ~"mu!";
let client_port = oldcomm::Port::<~str>(); let (client_port, client_chan) = stream::<~str>();
let client_chan = oldcomm::Chan::<~str>(&client_port); let client_chan = SharedChan(client_chan);
let server_port = oldcomm::Port::<~str>(); let (server_port, server_chan) = stream::<~str>();
let server_chan = oldcomm::Chan::<~str>(&server_port); let server_chan = SharedChan(server_chan);
let continue_port = oldcomm::Port::<bool>(); let (continue_port, continue_chan) = stream::<bool>();
let continue_chan = oldcomm::Chan::<bool>(&continue_port); let continue_chan = SharedChan(continue_chan);
let continue_chan_ptr = ptr::addr_of(&continue_chan);
do task::spawn_sched(task::ManualThreads(1)) { do task::spawn_sched(task::ManualThreads(1)) {
impl_uv_tcp_server(bind_ip, port, impl_uv_tcp_server(bind_ip, port,
kill_server_msg, kill_server_msg,
server_resp_msg, server_resp_msg,
ptr::addr_of(&server_chan), server_chan.clone(),
continue_chan_ptr); continue_chan.clone());
}; };
// block until the server up is.. possibly a race? // block until the server up is.. possibly a race?
log(debug, ~"before receiving on server continue_port"); log(debug, ~"before receiving on server continue_port");
oldcomm::recv(continue_port); continue_port.recv();
log(debug, ~"received on continue port, set up tcp client"); log(debug, ~"received on continue port, set up tcp client");
do task::spawn_sched(task::ManualThreads(1u)) { do task::spawn_sched(task::ManualThreads(1u)) {
impl_uv_tcp_request(request_ip, port, impl_uv_tcp_request(request_ip, port,
kill_server_msg, kill_server_msg,
ptr::addr_of(&client_chan)); client_chan.clone());
}; };
let msg_from_client = oldcomm::recv(server_port); let msg_from_client = server_port.recv();
let msg_from_server = oldcomm::recv(client_port); let msg_from_server = client_port.recv();
assert str::contains(msg_from_client, kill_server_msg); assert str::contains(msg_from_client, kill_server_msg);
assert str::contains(msg_from_server, server_resp_msg); assert str::contains(msg_from_server, server_resp_msg);