diff --git a/src/libcore/comm.rs b/src/libcore/comm.rs index d075ff08bb7..56c301cd1c7 100644 --- a/src/libcore/comm.rs +++ b/src/libcore/comm.rs @@ -12,6 +12,7 @@ Message passing */ +use cast::transmute; use cast; use either::{Either, Left, Right}; use kinds::Owned; @@ -192,9 +193,9 @@ impl Peekable for Port { fn peek(&self) -> bool { let mut endp = None; endp <-> self.endp; - let peek = match &endp { - &Some(ref endp) => peek(endp), - &None => fail!(~"peeking empty stream") + let peek = match endp { + Some(ref mut endp) => peek(endp), + None => fail!(~"peeking empty stream") }; self.endp <-> endp; peek @@ -202,10 +203,10 @@ impl Peekable for Port { } impl Selectable for Port { - fn header(&self) -> *PacketHeader { + fn header(&mut self) -> *mut PacketHeader { unsafe { match self.endp { - Some(ref endp) => endp.header(), + Some(ref mut endp) => endp.header(), None => fail!(~"peeking empty stream") } } @@ -327,23 +328,20 @@ impl ::clone::Clone for SharedChan { #[allow(non_camel_case_types)] pub mod oneshot { priv use core::kinds::Owned; - use ptr::to_unsafe_ptr; + use ptr::to_mut_unsafe_ptr; pub fn init() -> (client::Oneshot, server::Oneshot) { pub use core::pipes::HasBuffer; - let buffer = - ~::core::pipes::Buffer{ + let mut buffer = ~::core::pipes::Buffer { header: ::core::pipes::BufferHeader(), - data: __Buffer{ + data: __Buffer { Oneshot: ::core::pipes::mk_packet::>() }, }; do ::core::pipes::entangle_buffer(buffer) |buffer, data| { - { - data.Oneshot.set_buffer(buffer); - to_unsafe_ptr(&data.Oneshot) - } + data.Oneshot.set_buffer(buffer); + to_mut_unsafe_ptr(&mut data.Oneshot) } } #[allow(non_camel_case_types)] @@ -497,48 +495,66 @@ pub fn try_send_one(chan: ChanOne, data: T) -> bool { /// Returns the index of an endpoint that is ready to receive. -pub fn selecti(endpoints: &[T]) -> uint { +pub fn selecti(endpoints: &mut [T]) -> uint { wait_many(endpoints) } /// Returns 0 or 1 depending on which endpoint is ready to receive -pub fn select2i(a: &A, b: &B) -> - Either<(), ()> { - match wait_many([a.header(), b.header()]) { - 0 => Left(()), - 1 => Right(()), - _ => fail!(~"wait returned unexpected index") +pub fn select2i(a: &mut A, b: &mut B) + -> Either<(), ()> { + let mut endpoints = [ a.header(), b.header() ]; + match wait_many(endpoints) { + 0 => Left(()), + 1 => Right(()), + _ => fail!(~"wait returned unexpected index"), } } /// Receive a message from one of two endpoints. pub trait Select2 { /// Receive a message or return `None` if a connection closes. - fn try_select(&self) -> Either, Option>; + fn try_select(&mut self) -> Either, Option>; /// Receive a message or fail if a connection closes. - fn select(&self) -> Either; + fn select(&mut self) -> Either; } -impl, - Right: Selectable + GenericPort> - Select2 for (Left, Right) { - - fn select(&self) -> Either { - match *self { - (ref lp, ref rp) => match select2i(lp, rp) { - Left(()) => Left (lp.recv()), - Right(()) => Right(rp.recv()) - } +impl, + Right:Selectable + GenericPort> + Select2 + for (Left, Right) { + fn select(&mut self) -> Either { + // XXX: Bad borrow check workaround. + unsafe { + let this: &(Left, Right) = transmute(self); + match *this { + (ref lp, ref rp) => { + let lp: &mut Left = transmute(lp); + let rp: &mut Right = transmute(rp); + match select2i(lp, rp) { + Left(()) => Left(lp.recv()), + Right(()) => Right(rp.recv()), + } + } + } } } - fn try_select(&self) -> Either, Option> { - match *self { - (ref lp, ref rp) => match select2i(lp, rp) { - Left(()) => Left (lp.try_recv()), - Right(()) => Right(rp.try_recv()) - } + fn try_select(&mut self) -> Either, Option> { + // XXX: Bad borrow check workaround. + unsafe { + let this: &(Left, Right) = transmute(self); + match *this { + (ref lp, ref rp) => { + let lp: &mut Left = transmute(lp); + let rp: &mut Right = transmute(rp); + match select2i(lp, rp) { + Left(()) => Left (lp.try_recv()), + Right(()) => Right(rp.try_recv()), + } + } + } } } } diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index 19674900f90..1fda5a97a37 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -111,7 +111,7 @@ enum State { pub struct BufferHeader { // Tracks whether this buffer needs to be freed. We can probably // get away with restricting it to 0 or 1, if we're careful. - mut ref_count: int, + ref_count: int, // We may want a drop, and to be careful about stringing this // thing along. @@ -130,12 +130,12 @@ pub struct Buffer { } pub struct PacketHeader { - mut state: State, - mut blocked_task: *rust_task, + state: State, + blocked_task: *rust_task, // This is a transmute_copy of a ~buffer, that can also be cast // to a buffer_header if need be. - mut buffer: *libc::c_void, + buffer: *libc::c_void, } pub fn PacketHeader() -> PacketHeader { @@ -148,14 +148,14 @@ pub fn PacketHeader() -> PacketHeader { pub impl PacketHeader { // Returns the old state. - unsafe fn mark_blocked(&self, this: *rust_task) -> State { + unsafe fn mark_blocked(&mut self, this: *rust_task) -> State { rustrt::rust_task_ref(this); let old_task = swap_task(&mut self.blocked_task, this); assert!(old_task.is_null()); swap_state_acq(&mut self.state, Blocked) } - unsafe fn unblock(&self) { + unsafe fn unblock(&mut self) { let old_task = swap_task(&mut self.blocked_task, ptr::null()); if !old_task.is_null() { rustrt::rust_task_deref(old_task) @@ -169,13 +169,13 @@ pub impl PacketHeader { // unsafe because this can do weird things to the space/time // continuum. It ends making multiple unique pointers to the same - // thing. You'll proobably want to forget them when you're done. - unsafe fn buf_header(&self) -> ~BufferHeader { + // thing. You'll probably want to forget them when you're done. + unsafe fn buf_header(&mut self) -> ~BufferHeader { assert!(self.buffer.is_not_null()); transmute_copy(&self.buffer) } - fn set_buffer(&self, b: ~Buffer) { + fn set_buffer(&mut self, b: ~Buffer) { unsafe { self.buffer = transmute_copy(&b); } @@ -184,15 +184,15 @@ pub impl PacketHeader { pub struct Packet { header: PacketHeader, - mut payload: Option, + payload: Option, } pub trait HasBuffer { - fn set_buffer(&self, b: *libc::c_void); + fn set_buffer(&mut self, b: *libc::c_void); } impl HasBuffer for Packet { - fn set_buffer(&self, b: *libc::c_void) { + fn set_buffer(&mut self, b: *libc::c_void) { self.header.buffer = b; } } @@ -204,7 +204,7 @@ pub fn mk_packet() -> Packet { } } fn unibuffer() -> ~Buffer> { - let b = ~Buffer { + let mut b = ~Buffer { header: BufferHeader(), data: Packet { header: PacketHeader(), @@ -218,22 +218,25 @@ fn unibuffer() -> ~Buffer> { b } -pub fn packet() -> *Packet { - let b = unibuffer(); - let p = ptr::to_unsafe_ptr(&(b.data)); +pub fn packet() -> *mut Packet { + let mut b = unibuffer(); + let p = ptr::to_mut_unsafe_ptr(&mut b.data); // We'll take over memory management from here. - unsafe { forget(b) } + unsafe { + forget(b); + } p } pub fn entangle_buffer( - buffer: ~Buffer, - init: &fn(*libc::c_void, x: &T) -> *Packet) - -> (SendPacketBuffered, RecvPacketBuffered) -{ - let p = init(unsafe { transmute_copy(&buffer) }, &buffer.data); - unsafe { forget(buffer) } - (SendPacketBuffered(p), RecvPacketBuffered(p)) + mut buffer: ~Buffer, + init: &fn(*libc::c_void, x: &mut T) -> *mut Packet) + -> (SendPacketBuffered, RecvPacketBuffered) { + unsafe { + let p = init(transmute_copy(&buffer), &mut buffer.data); + forget(buffer); + (SendPacketBuffered(p), RecvPacketBuffered(p)) + } } pub fn swap_task(dst: &mut *rust_task, src: *rust_task) -> *rust_task { @@ -292,7 +295,7 @@ fn swap_state_rel(dst: &mut State, src: State) -> State { } } -pub unsafe fn get_buffer(p: *PacketHeader) -> ~Buffer { +pub unsafe fn get_buffer(p: *mut PacketHeader) -> ~Buffer { transmute((*p).buf_header()) } @@ -306,10 +309,14 @@ struct BufferResource { impl Drop for BufferResource { fn finalize(&self) { unsafe { - let b = move_it!(self.buffer); + let this: &mut BufferResource = transmute(self); + + let mut b = move_it!(this.buffer); //let p = ptr::to_unsafe_ptr(*b); //error!("drop %?", p); - let old_count = intrinsics::atomic_xsub_rel(&mut b.header.ref_count, 1); + let old_count = intrinsics::atomic_xsub_rel( + &mut b.header.ref_count, + 1); //let old_count = atomic_xchng_rel(b.header.ref_count, 0); if old_count == 1 { // The new count is 0. @@ -323,10 +330,12 @@ impl Drop for BufferResource { } } -fn BufferResource(b: ~Buffer) -> BufferResource { +fn BufferResource(mut b: ~Buffer) -> BufferResource { //let p = ptr::to_unsafe_ptr(*b); //error!("take %?", p); - unsafe { intrinsics::atomic_xadd_acq(&mut b.header.ref_count, 1) }; + unsafe { + intrinsics::atomic_xadd_acq(&mut b.header.ref_count, 1); + } BufferResource { // tjc: ???? @@ -334,10 +343,12 @@ fn BufferResource(b: ~Buffer) -> BufferResource { } } -pub fn send(p: SendPacketBuffered, payload: T) -> bool { +pub fn send(mut p: SendPacketBuffered, + payload: T) + -> bool { let header = p.header(); - let p_ = p.unwrap(); - let p = unsafe { &*p_ }; + let mut p_ = p.unwrap(); + let p = unsafe { &mut *p_ }; assert!(ptr::to_unsafe_ptr(&(p.header)) == header); assert!(p.payload.is_none()); p.payload = Some(payload); @@ -391,11 +402,12 @@ Returns `None` if the sender has closed the connection without sending a message, or `Some(T)` if a message was received. */ -pub fn try_recv(p: RecvPacketBuffered) - -> Option -{ - let p_ = p.unwrap(); - let p = unsafe { &*p_ }; +pub fn try_recv(mut p: RecvPacketBuffered) + -> Option { + let mut p_ = p.unwrap(); + let mut p = unsafe { + &mut *p_ + }; do (|| { try_recv_(p) @@ -412,7 +424,7 @@ pub fn try_recv(p: RecvPacketBuffered) } } -fn try_recv_(p: &Packet) -> Option { +fn try_recv_(p: &mut Packet) -> Option { // optimistic path match p.header.state { Full => { @@ -498,16 +510,20 @@ fn try_recv_(p: &Packet) -> Option { } /// Returns true if messages are available. -pub fn peek(p: &RecvPacketBuffered) -> bool { - match unsafe {(*p.header()).state} { - Empty | Terminated => false, - Blocked => fail!(~"peeking on blocked packet"), - Full => true +pub fn peek(p: &mut RecvPacketBuffered) -> bool { + unsafe { + match (*p.header()).state { + Empty | Terminated => false, + Blocked => fail!(~"peeking on blocked packet"), + Full => true + } } } -fn sender_terminate(p: *Packet) { - let p = unsafe { &*p }; +fn sender_terminate(p: *mut Packet) { + let p = unsafe { + &mut *p + }; match swap_state_rel(&mut p.header.state, Terminated) { Empty => { // The receiver will eventually clean up. @@ -536,8 +552,10 @@ fn sender_terminate(p: *Packet) { } } -fn receiver_terminate(p: *Packet) { - let p = unsafe { &*p }; +fn receiver_terminate(p: *mut Packet) { + let p = unsafe { + &mut *p + }; match swap_state_rel(&mut p.header.state, Terminated) { Empty => { assert!(p.header.blocked_task.is_null()); @@ -569,8 +587,10 @@ that vector. The index points to an endpoint that has either been closed by the sender or has a message waiting to be received. */ -pub fn wait_many(pkts: &[T]) -> uint { - let this = unsafe { rustrt::rust_get_task() }; +pub fn wait_many(pkts: &mut [T]) -> uint { + let this = unsafe { + rustrt::rust_get_task() + }; unsafe { rustrt::task_clear_event_reject(this); @@ -578,19 +598,19 @@ pub fn wait_many(pkts: &[T]) -> uint { let mut data_avail = false; let mut ready_packet = pkts.len(); - for pkts.eachi |i, p| { + for vec::eachi_mut(pkts) |i, p| { unsafe { - let p = &*p.header(); + let p = &mut *p.header(); let old = p.mark_blocked(this); match old { - Full | Terminated => { - data_avail = true; - ready_packet = i; - (*p).state = old; - break; - } - Blocked => fail!(~"blocking on blocked packet"), - Empty => () + Full | Terminated => { + data_avail = true; + ready_packet = i; + (*p).state = old; + break; + } + Blocked => fail!(~"blocking on blocked packet"), + Empty => () } } } @@ -598,7 +618,14 @@ pub fn wait_many(pkts: &[T]) -> uint { while !data_avail { debug!("sleeping on %? packets", pkts.len()); let event = wait_event(this) as *PacketHeader; - let pos = vec::position(pkts, |p| p.header() == event); + + let mut pos = None; + for vec::eachi_mut(pkts) |i, p| { + if p.header() == event { + pos = Some(i); + break; + } + }; match pos { Some(i) => { @@ -609,11 +636,15 @@ pub fn wait_many(pkts: &[T]) -> uint { } } - debug!("%?", pkts[ready_packet]); + debug!("%?", &mut pkts[ready_packet]); - for pkts.each |p| { unsafe{ (*p.header()).unblock()} } + for vec::each_mut(pkts) |p| { + unsafe { + (*p.header()).unblock() + } + } - debug!("%?, %?", ready_packet, pkts[ready_packet]); + debug!("%?, %?", ready_packet, &mut pkts[ready_packet]); unsafe { assert!((*pkts[ready_packet].header()).state == Full @@ -629,65 +660,58 @@ message. */ pub type SendPacket = SendPacketBuffered>; -pub fn SendPacket(p: *Packet) -> SendPacket { +pub fn SendPacket(p: *mut Packet) -> SendPacket { SendPacketBuffered(p) } pub struct SendPacketBuffered { - mut p: Option<*Packet>, - mut buffer: Option>, + p: Option<*mut Packet>, + buffer: Option>, } #[unsafe_destructor] impl Drop for SendPacketBuffered { fn finalize(&self) { - //if self.p != none { - // debug!("drop send %?", option::get(self.p)); - //} - if self.p != None { - let mut p = None; - p <-> self.p; - sender_terminate(p.unwrap()) + unsafe { + let this: &mut SendPacketBuffered = transmute(self); + if this.p != None { + let mut p = None; + p <-> this.p; + sender_terminate(p.unwrap()) + } } - //unsafe { error!("send_drop: %?", - // if self.buffer == none { - // "none" - // } else { "some" }); } } } -pub fn SendPacketBuffered(p: *Packet) - -> SendPacketBuffered { - //debug!("take send %?", p); +pub fn SendPacketBuffered(p: *mut Packet) + -> SendPacketBuffered { SendPacketBuffered { p: Some(p), buffer: unsafe { - Some(BufferResource( - get_buffer(ptr::to_unsafe_ptr(&((*p).header))))) + Some(BufferResource(get_buffer(&mut (*p).header))) } } } pub impl SendPacketBuffered { - fn unwrap(&self) -> *Packet { + fn unwrap(&mut self) -> *mut Packet { let mut p = None; p <-> self.p; p.unwrap() } - fn header(&self) -> *PacketHeader { + fn header(&mut self) -> *mut PacketHeader { match self.p { - Some(packet) => unsafe { - let packet = &*packet; - let header = ptr::to_unsafe_ptr(&(packet.header)); - //forget(packet); - header - }, - None => fail!(~"packet already consumed") + Some(packet) => unsafe { + let packet = &mut *packet; + let header = ptr::to_mut_unsafe_ptr(&mut packet.header); + header + }, + None => fail!(~"packet already consumed") } } - fn reuse_buffer(&self) -> BufferResource { + fn reuse_buffer(&mut self) -> BufferResource { //error!("send reuse_buffer"); let mut tmp = None; tmp <-> self.buffer; @@ -699,41 +723,37 @@ pub impl SendPacketBuffered { /// message. pub type RecvPacket = RecvPacketBuffered>; -pub fn RecvPacket(p: *Packet) -> RecvPacket { +pub fn RecvPacket(p: *mut Packet) -> RecvPacket { RecvPacketBuffered(p) } + pub struct RecvPacketBuffered { - mut p: Option<*Packet>, - mut buffer: Option>, + p: Option<*mut Packet>, + buffer: Option>, } #[unsafe_destructor] impl Drop for RecvPacketBuffered { fn finalize(&self) { - //if self.p != none { - // debug!("drop recv %?", option::get(self.p)); - //} - if self.p != None { - let mut p = None; - p <-> self.p; - receiver_terminate(p.unwrap()) + unsafe { + let this: &mut RecvPacketBuffered = transmute(self); + if this.p != None { + let mut p = None; + p <-> this.p; + receiver_terminate(p.unwrap()) + } } - //unsafe { error!("recv_drop: %?", - // if self.buffer == none { - // "none" - // } else { "some" }); } } } pub impl RecvPacketBuffered { - fn unwrap(&self) -> *Packet { + fn unwrap(&mut self) -> *mut Packet { let mut p = None; p <-> self.p; p.unwrap() } - fn reuse_buffer(&self) -> BufferResource { - //error!("recv reuse_buffer"); + fn reuse_buffer(&mut self) -> BufferResource { let mut tmp = None; tmp <-> self.buffer; tmp.unwrap() @@ -741,27 +761,24 @@ pub impl RecvPacketBuffered { } impl Selectable for RecvPacketBuffered { - fn header(&self) -> *PacketHeader { + fn header(&mut self) -> *mut PacketHeader { match self.p { - Some(packet) => unsafe { - let packet = &*packet; - let header = ptr::to_unsafe_ptr(&(packet.header)); - //forget(packet); - header - }, - None => fail!(~"packet already consumed") + Some(packet) => unsafe { + let packet = &mut *packet; + let header = ptr::to_mut_unsafe_ptr(&mut packet.header); + header + }, + None => fail!(~"packet already consumed") } } } -pub fn RecvPacketBuffered(p: *Packet) - -> RecvPacketBuffered { - //debug!("take recv %?", p); +pub fn RecvPacketBuffered(p: *mut Packet) + -> RecvPacketBuffered { RecvPacketBuffered { p: Some(p), buffer: unsafe { - Some(BufferResource( - get_buffer(ptr::to_unsafe_ptr(&((*p).header))))) + Some(BufferResource(get_buffer(&mut (*p).header))) } } } @@ -800,51 +817,55 @@ this case, `select2` may return either `left` or `right`. */ pub fn select2( - a: RecvPacketBuffered, - b: RecvPacketBuffered) + mut a: RecvPacketBuffered, + mut b: RecvPacketBuffered) -> Either<(Option, RecvPacketBuffered), - (RecvPacketBuffered, Option)> -{ - let i = wait_many([a.header(), b.header()]); - + (RecvPacketBuffered, Option)> { + let mut endpoints = [ a.header(), b.header() ]; + let i = wait_many(endpoints); match i { - 0 => Left((try_recv(a), b)), - 1 => Right((a, try_recv(b))), - _ => fail!(~"select2 return an invalid packet") + 0 => Left((try_recv(a), b)), + 1 => Right((a, try_recv(b))), + _ => fail!(~"select2 return an invalid packet") } } pub trait Selectable { - fn header(&self) -> *PacketHeader; + fn header(&mut self) -> *mut PacketHeader; } -impl Selectable for *PacketHeader { - fn header(&self) -> *PacketHeader { *self } +impl Selectable for *mut PacketHeader { + fn header(&mut self) -> *mut PacketHeader { *self } } /// Returns the index of an endpoint that is ready to receive. -pub fn selecti(endpoints: &[T]) -> uint { +pub fn selecti(endpoints: &mut [T]) -> uint { wait_many(endpoints) } /// Returns 0 or 1 depending on which endpoint is ready to receive -pub fn select2i(a: &A, b: &B) -> - Either<(), ()> { - match wait_many([a.header(), b.header()]) { - 0 => Left(()), - 1 => Right(()), - _ => fail!(~"wait returned unexpected index") +pub fn select2i(a: &mut A, b: &mut B) + -> Either<(), ()> { + let mut endpoints = [ a.header(), b.header() ]; + match wait_many(endpoints) { + 0 => Left(()), + 1 => Right(()), + _ => fail!(~"wait returned unexpected index") } } -/** Waits on a set of endpoints. Returns a message, its index, and a - list of the remaining endpoints. +/// Waits on a set of endpoints. Returns a message, its index, and a +/// list of the remaining endpoints. +pub fn select(mut endpoints: ~[RecvPacketBuffered]) + -> (uint, + Option, + ~[RecvPacketBuffered]) { + let mut endpoint_headers = ~[]; + for vec::each_mut(endpoints) |endpoint| { + endpoint_headers.push(endpoint.header()); + } -*/ -pub fn select(endpoints: ~[RecvPacketBuffered]) - -> (uint, Option, ~[RecvPacketBuffered]) -{ - let ready = wait_many(endpoints.map(|p| p.header())); + let ready = wait_many(endpoint_headers); let mut remaining = endpoints; let port = remaining.swap_remove(ready); let result = try_recv(port); diff --git a/src/libstd/comm.rs b/src/libstd/comm.rs index d866ee6cedb..20ab2d61ecc 100644 --- a/src/libstd/comm.rs +++ b/src/libstd/comm.rs @@ -72,7 +72,7 @@ impl Peekable for DuplexStream { } impl Selectable for DuplexStream { - fn header(&self) -> *pipes::PacketHeader { + fn header(&mut self) -> *mut pipes::PacketHeader { self.port.header() } } diff --git a/src/libstd/timer.rs b/src/libstd/timer.rs index b19b2f2889e..76aa4d615e1 100644 --- a/src/libstd/timer.rs +++ b/src/libstd/timer.rs @@ -14,10 +14,11 @@ use uv; use uv::iotask; use uv::iotask::IoTask; -use core::libc; -use core::libc::c_void; use core::cast::transmute; +use core::cast; use core::comm::{stream, Chan, SharedChan, Port, select2i}; +use core::libc::c_void; +use core::libc; /** * Wait for timeout period then send provided value over a channel @@ -120,22 +121,28 @@ pub fn sleep(iotask: &IoTask, msecs: uint) { pub fn recv_timeout(iotask: &IoTask, msecs: uint, wait_po: &Port) - -> Option { - let (timeout_po, timeout_ch) = stream::<()>(); + -> Option { + let mut (timeout_po, timeout_ch) = stream::<()>(); delayed_send(iotask, msecs, &timeout_ch, ()); - // FIXME: This could be written clearer (#2618) - either::either( - |_| { - None - }, |_| { - Some(wait_po.recv()) - }, &select2i(&timeout_po, wait_po) - ) + + // XXX: Workaround due to ports and channels not being &mut. They should + // be. + unsafe { + let wait_po = cast::transmute_mut(wait_po); + + // FIXME: This could be written clearer (#2618) + either::either( + |_| { + None + }, |_| { + Some(wait_po.recv()) + }, &select2i(&mut timeout_po, wait_po) + ) + } } // INTERNAL API -extern fn delayed_send_cb(handle: *uv::ll::uv_timer_t, - status: libc::c_int) { +extern fn delayed_send_cb(handle: *uv::ll::uv_timer_t, status: libc::c_int) { unsafe { debug!( "delayed_send_cb handle %? status %?", handle, status);