diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index 2e463df9e90..e020d5c3e9f 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -6,7 +6,7 @@ import option::unwrap; import arc::methods; // Things used by code generated by the pipe compiler. -export entangle; +export entangle, get_buffer, drop_buffer; // User-level things export send_packet, recv_packet, send, recv, try_recv, peek; @@ -22,6 +22,59 @@ macro_rules! move { // places. Once there is unary move, it can be removed. fn move(-x: T) -> T { x } +/** + +Some thoughts about fixed buffers. + +The idea is if a protocol is bounded, we will synthesize a record that +has a field for each state. Each of these states contains a packet for +the messages that are legal to be sent in that state. Then, instead of +allocating, the send code just finds a pointer to the right field and +uses that instead. + +Unforunately, this makes things kind of tricky. We need to be able to +find the buffer, which means we need to pass it around. This could +either be associated with the (send|recv)_packet classes, or with the +packet itself. We will also need some form of reference counting so we +can track who has the responsibility of freeing the buffer. + +We want to preserve the ability to do things like optimistic buffer +re-use, and skipping over to a new buffer when necessary. What I mean +is, suppose we had the typical stream protocol. It'd make sense to +amortize allocation costs by allocating a buffer with say 16 +messages. When the sender gets to the end of the buffer, it could +check if the receiver is done with the packet in slot 0. If so, it can +just reuse that one, checking if the receiver is done with the next +one in each case. If it is ever not done, it just allocates a new +buffer and skips over to that. + +Also, since protocols are in libcore, we have to do this in a way that +maintains backwards compatibility. + +buffer header and buffer. Cast as c_void when necessary. + +=== + +Okay, here are some new ideas. + +It'd be nice to keep the bounded/unbounded case as uniform as +possible. It leads to less code duplication, and less things that can +go sublty wrong. For the bounded case, we could either have a struct +with a bunch of unique pointers to pre-allocated packets, or we could +lay them out inline. Inline layout is better, if for no other reason +than that we don't have to allocate each packet +individually. Currently we pass unique packets around as unsafe +pointers, but they are actually unique pointers. We should instead use +real unsafe pointers. This makes freeing data and running destructors +trickier though. Thus, we should allocate all packets in parter of a +higher level buffer structure. Packets can maintain a pointer to their +buffer, and this is the part that gets freed. + +It might be helpful to have some idea of a semi-unique pointer (like +being partially pregnant, also like an ARC). + +*/ + enum state { empty, full, @@ -29,32 +82,86 @@ enum state { terminated } -type packet_header_ = { - mut state: state, - mut blocked_task: option<*rust_task>, -}; +class buffer_header { + // Tracks whether this buffer needs to be freed. We can probably + // get away with restricting it to 0 or 1, if we're careful. + let mut ref_count: int; -enum packet_header { - packet_header_(packet_header_) + new() { self.ref_count = 1; } + + // We may want a drop, and to be careful about stringing this + // thing along. } -type packet_ = { +// This is for protocols to associate extra data to thread around. +type buffer = { + header: buffer_header, + data: T, +}; + +class packet_header { + let mut state: state; + let mut blocked_task: option<*rust_task>; + + // This is a reinterpret_cast of a ~buffer, that can also be cast + // to a buffer_header if need be. + let mut buffer: *libc::c_void; + + new() { + self.state = empty; + self.blocked_task = none; + self.buffer = ptr::null(); + } + + // Returns the old state. + unsafe fn mark_blocked(this: *rust_task) -> state { + self.blocked_task = some(this); + swap_state_acq(self.state, blocked) + } + + unsafe fn unblock() { + alt swap_state_acq(self.state, empty) { + empty | blocked { } + terminated { self.state = terminated; } + full { self.state = full; } + } + } + + // unsafe because this can do weird things to the space/time + // continuum. It ends making multiple unique pointers to the same + // thing. You'll proobably want to forget them when you're done. + unsafe fn buf_header() -> ~buffer_header { + assert self.buffer.is_not_null(); + reinterpret_cast(self.buffer) + } +} + +type packet = { header: packet_header, - mut payload: option + mut payload: option, }; -enum packet { - packet_(packet_) +fn unibuffer() -> ~buffer> { + let b = ~{ + header: buffer_header(), + data: { + header: packet_header(), + mut payload: none, + } + }; + + unsafe { + b.data.header.buffer = reinterpret_cast(b); + } + + b } -fn packet() -> *packet unsafe { - let p: *packet = unsafe::transmute(~{ - header: { - mut state: empty, - mut blocked_task: none::, - }, - mut payload: none:: - }); +fn packet() -> *packet { + let b = unibuffer(); + let p = ptr::addr_of(b.data); + // We'll take over memory management from here. + unsafe { forget(b) } p } @@ -65,6 +172,10 @@ extern mod rusti { fn atomic_xchng_rel(&dst: int, src: int) -> int; } +fn atomic_xchng_rel(&dst: int, src: int) -> int { + rusti::atomic_xchng_rel(dst, src) +} + type rust_task = libc::c_void; extern mod rustrt { @@ -75,13 +186,7 @@ extern mod rustrt { fn task_clear_event_reject(task: *rust_task); fn task_wait_event(this: *rust_task, killed: &mut *libc::c_void) -> bool; - fn task_signal_event(target: *rust_task, event: *libc::c_void); -} - -// We should consider moving this to core::unsafe, although I -// suspect graydon would want us to use void pointers instead. -unsafe fn uniquify(x: *T) -> ~T { - unsafe { unsafe::reinterpret_cast(x) } + pure fn task_signal_event(target: *rust_task, event: *libc::c_void); } fn wait_event(this: *rust_task) -> *libc::c_void { @@ -110,18 +215,42 @@ fn swap_state_rel(&dst: state, src: state) -> state { } } +unsafe fn get_buffer(p: *packet_header) -> ~buffer { + transmute((*p).buf_header()) +} + +class buffer_resource { + let buffer: ~buffer; + new(+b: ~buffer) { + self.buffer = b; + } + + drop unsafe { + let b = move!{self.buffer}; + let old_count = atomic_xchng_rel(b.header.ref_count, 0); + if old_count == 0 { + // go go gadget drop glue + } + else { + forget(b) + } + } +} + fn send(-p: send_packet, -payload: T) { + let header = p.header(); let p_ = p.unwrap(); - let p = unsafe { uniquify(p_) }; - assert (*p).payload == none; - (*p).payload <- some(payload); + let p = unsafe { &*p_ }; + assert ptr::addr_of(p.header) == header; + assert p.payload == none; + p.payload <- some(payload); let old_state = swap_state_rel(p.header.state, full); alt old_state { empty { // Yay, fastpath. // The receiver will eventually clean this up. - unsafe { forget(p); } + //unsafe { forget(p); } } full { fail ~"duplicate send" } blocked { @@ -135,7 +264,7 @@ fn send(-p: send_packet, -payload: T) { } // The receiver will eventually clean this up. - unsafe { forget(p); } + //unsafe { forget(p); } } terminated { // The receiver will never receive this. Rely on drop_glue @@ -150,7 +279,7 @@ fn recv(-p: recv_packet) -> T { fn try_recv(-p: recv_packet) -> option { let p_ = p.unwrap(); - let p = unsafe { uniquify(p_) }; + let p = unsafe { &*p_ }; let this = rustrt::rust_get_task(); rustrt::task_clear_event_reject(this); p.header.blocked_task = some(this); @@ -163,7 +292,7 @@ fn try_recv(-p: recv_packet) -> option { empty { #debug("no data available on %?, going to sleep.", p_); wait_event(this); - #debug("woke up, p.state = %?", p.header.state); + #debug("woke up, p.state = %?", copy p.header.state); } blocked { if first { @@ -172,7 +301,7 @@ fn try_recv(-p: recv_packet) -> option { } full { let mut payload = none; - payload <-> (*p).payload; + payload <-> p.payload; p.header.state = terminated; ret some(option::unwrap(payload)) } @@ -195,11 +324,11 @@ pure fn peek(p: recv_packet) -> bool { } fn sender_terminate(p: *packet) { - let p = unsafe { uniquify(p) }; + let p = unsafe { &*p }; alt swap_state_rel(p.header.state, terminated) { empty { // The receiver will eventually clean up. - unsafe { forget(p) } + //unsafe { forget(p) } } blocked { // wake up the target @@ -208,7 +337,7 @@ fn sender_terminate(p: *packet) { ptr::addr_of(p.header) as *libc::c_void); // The receiver will eventually clean up. - unsafe { forget(p) } + //unsafe { forget(p) } } full { // This is impossible @@ -221,11 +350,11 @@ fn sender_terminate(p: *packet) { } fn receiver_terminate(p: *packet) { - let p = unsafe { uniquify(p) }; + let p = unsafe { &*p }; alt swap_state_rel(p.header.state, terminated) { empty { // the sender will clean up - unsafe { forget(p) } + //unsafe { forget(p) } } blocked { // this shouldn't happen. @@ -237,24 +366,6 @@ fn receiver_terminate(p: *packet) { } } -impl private_methods for *packet_header { - // Returns the old state. - unsafe fn mark_blocked(this: *rust_task) -> state { - let self = &*self; - self.blocked_task = some(this); - swap_state_acq(self.state, blocked) - } - - unsafe fn unblock() { - let self = &*self; - alt swap_state_acq(self.state, empty) { - empty | blocked { } - terminated { self.state = terminated; } - full { self.state = full; } - } - } -} - #[doc = "Returns when one of the packet headers reports data is available."] fn wait_many(pkts: &[*packet_header]) -> uint { @@ -264,6 +375,7 @@ fn wait_many(pkts: &[*packet_header]) -> uint { let mut data_avail = false; let mut ready_packet = pkts.len(); for pkts.eachi |i, p| unsafe { + let p = unsafe { &*p }; let old = p.mark_blocked(this); alt old { full | terminated { @@ -295,7 +407,7 @@ fn wait_many(pkts: &[*packet_header]) -> uint { #debug("%?", pkts[ready_packet]); - for pkts.each |p| { unsafe{p.unblock()} } + for pkts.each |p| { unsafe{ (*p).unblock()} } #debug("%?, %?", ready_packet, pkts[ready_packet]); @@ -359,11 +471,23 @@ fn select(+endpoints: ~[recv_packet]) (ready, result, remaining) } -class send_packet { +type send_packet = send_packet_buffered>; + +fn send_packet(p: *packet) -> send_packet { + send_packet_buffered(p) +} + +class send_packet_buffered { let mut p: option<*packet>; + let mut buffer: option>; new(p: *packet) { //#debug("take send %?", p); self.p = some(p); + unsafe { + self.buffer = some( + buffer_resource( + get_buffer(ptr::addr_of((*p).header)))); + }; } drop { //if self.p != none { @@ -380,13 +504,39 @@ class send_packet { p <-> self.p; option::unwrap(p) } + + pure fn header() -> *packet_header { + alt self.p { + some(packet) { + unsafe { + let packet = &*packet; + let header = ptr::addr_of(packet.header); + //forget(packet); + header + } + } + none { fail ~"packet already consumed" } + } + } } -class recv_packet { +type recv_packet = recv_packet_buffered>; + +fn recv_packet(p: *packet) -> recv_packet { + recv_packet_buffered(p) +} + +class recv_packet_buffered : selectable { let mut p: option<*packet>; + let mut buffer: option>; new(p: *packet) { //#debug("take recv %?", p); self.p = some(p); + unsafe { + self.buffer = some( + buffer_resource( + get_buffer(ptr::addr_of((*p).header)))); + }; } drop { //if self.p != none { @@ -408,9 +558,9 @@ class recv_packet { alt self.p { some(packet) { unsafe { - let packet = uniquify(packet); + let packet = &*packet; let header = ptr::addr_of(packet.header); - forget(packet); + //forget(packet); header } } diff --git a/src/libsyntax/ext/pipes/pipec.rs b/src/libsyntax/ext/pipes/pipec.rs index df5df748a74..70423f5437c 100644 --- a/src/libsyntax/ext/pipes/pipec.rs +++ b/src/libsyntax/ext/pipes/pipec.rs @@ -51,10 +51,12 @@ impl compile of gen_send for message { |n, t| cx.arg_mode(n, t, ast::by_copy) ); + let pipe_ty = cx.ty_path_ast_builder( + path(this.data_name()) + .add_tys(cx.ty_vars(this.ty_params))); let args_ast = vec::append( ~[cx.arg_mode(@~"pipe", - cx.ty_path_ast_builder(path(this.data_name()) - .add_tys(cx.ty_vars(this.ty_params))), + pipe_ty, ast::by_copy)], args_ast); @@ -73,6 +75,7 @@ impl compile of gen_send for message { .map(|x| *x), ~", ")); body += #fmt("pipes::send(pipe, message);\n"); + // return the new channel body += ~"c }"; let body = cx.parse_expr(body);