1
Fork 0

Refactoring pipes to allow implementing bounded protocols.

This commit is contained in:
Eric Holk 2012-07-20 19:06:32 -07:00
parent f65d6026ef
commit d74fb9875b
2 changed files with 217 additions and 64 deletions

View file

@ -6,7 +6,7 @@ import option::unwrap;
import arc::methods; import arc::methods;
// Things used by code generated by the pipe compiler. // Things used by code generated by the pipe compiler.
export entangle; export entangle, get_buffer, drop_buffer;
// User-level things // User-level things
export send_packet, recv_packet, send, recv, try_recv, peek; export send_packet, recv_packet, send, recv, try_recv, peek;
@ -22,6 +22,59 @@ macro_rules! move {
// places. Once there is unary move, it can be removed. // places. Once there is unary move, it can be removed.
fn move<T>(-x: T) -> T { x } fn move<T>(-x: T) -> T { x }
/**
Some thoughts about fixed buffers.
The idea is if a protocol is bounded, we will synthesize a record that
has a field for each state. Each of these states contains a packet for
the messages that are legal to be sent in that state. Then, instead of
allocating, the send code just finds a pointer to the right field and
uses that instead.
Unforunately, this makes things kind of tricky. We need to be able to
find the buffer, which means we need to pass it around. This could
either be associated with the (send|recv)_packet classes, or with the
packet itself. We will also need some form of reference counting so we
can track who has the responsibility of freeing the buffer.
We want to preserve the ability to do things like optimistic buffer
re-use, and skipping over to a new buffer when necessary. What I mean
is, suppose we had the typical stream protocol. It'd make sense to
amortize allocation costs by allocating a buffer with say 16
messages. When the sender gets to the end of the buffer, it could
check if the receiver is done with the packet in slot 0. If so, it can
just reuse that one, checking if the receiver is done with the next
one in each case. If it is ever not done, it just allocates a new
buffer and skips over to that.
Also, since protocols are in libcore, we have to do this in a way that
maintains backwards compatibility.
buffer header and buffer. Cast as c_void when necessary.
===
Okay, here are some new ideas.
It'd be nice to keep the bounded/unbounded case as uniform as
possible. It leads to less code duplication, and less things that can
go sublty wrong. For the bounded case, we could either have a struct
with a bunch of unique pointers to pre-allocated packets, or we could
lay them out inline. Inline layout is better, if for no other reason
than that we don't have to allocate each packet
individually. Currently we pass unique packets around as unsafe
pointers, but they are actually unique pointers. We should instead use
real unsafe pointers. This makes freeing data and running destructors
trickier though. Thus, we should allocate all packets in parter of a
higher level buffer structure. Packets can maintain a pointer to their
buffer, and this is the part that gets freed.
It might be helpful to have some idea of a semi-unique pointer (like
being partially pregnant, also like an ARC).
*/
enum state { enum state {
empty, empty,
full, full,
@ -29,32 +82,86 @@ enum state {
terminated terminated
} }
type packet_header_ = { class buffer_header {
mut state: state, // Tracks whether this buffer needs to be freed. We can probably
mut blocked_task: option<*rust_task>, // get away with restricting it to 0 or 1, if we're careful.
}; let mut ref_count: int;
enum packet_header { new() { self.ref_count = 1; }
packet_header_(packet_header_)
// We may want a drop, and to be careful about stringing this
// thing along.
} }
type packet_<T:send> = { // This is for protocols to associate extra data to thread around.
type buffer<T: send> = {
header: buffer_header,
data: T,
};
class packet_header {
let mut state: state;
let mut blocked_task: option<*rust_task>;
// This is a reinterpret_cast of a ~buffer, that can also be cast
// to a buffer_header if need be.
let mut buffer: *libc::c_void;
new() {
self.state = empty;
self.blocked_task = none;
self.buffer = ptr::null();
}
// Returns the old state.
unsafe fn mark_blocked(this: *rust_task) -> state {
self.blocked_task = some(this);
swap_state_acq(self.state, blocked)
}
unsafe fn unblock() {
alt swap_state_acq(self.state, empty) {
empty | blocked { }
terminated { self.state = terminated; }
full { self.state = full; }
}
}
// unsafe because this can do weird things to the space/time
// continuum. It ends making multiple unique pointers to the same
// thing. You'll proobably want to forget them when you're done.
unsafe fn buf_header() -> ~buffer_header {
assert self.buffer.is_not_null();
reinterpret_cast(self.buffer)
}
}
type packet<T: send> = {
header: packet_header, header: packet_header,
mut payload: option<T> mut payload: option<T>,
}; };
enum packet<T:send> { fn unibuffer<T: send>() -> ~buffer<packet<T>> {
packet_(packet_<T>) let b = ~{
header: buffer_header(),
data: {
header: packet_header(),
mut payload: none,
}
};
unsafe {
b.data.header.buffer = reinterpret_cast(b);
}
b
} }
fn packet<T: send>() -> *packet<T> unsafe { fn packet<T: send>() -> *packet<T> {
let p: *packet<T> = unsafe::transmute(~{ let b = unibuffer();
header: { let p = ptr::addr_of(b.data);
mut state: empty, // We'll take over memory management from here.
mut blocked_task: none::<task::task>, unsafe { forget(b) }
},
mut payload: none::<T>
});
p p
} }
@ -65,6 +172,10 @@ extern mod rusti {
fn atomic_xchng_rel(&dst: int, src: int) -> int; fn atomic_xchng_rel(&dst: int, src: int) -> int;
} }
fn atomic_xchng_rel(&dst: int, src: int) -> int {
rusti::atomic_xchng_rel(dst, src)
}
type rust_task = libc::c_void; type rust_task = libc::c_void;
extern mod rustrt { extern mod rustrt {
@ -75,13 +186,7 @@ extern mod rustrt {
fn task_clear_event_reject(task: *rust_task); fn task_clear_event_reject(task: *rust_task);
fn task_wait_event(this: *rust_task, killed: &mut *libc::c_void) -> bool; fn task_wait_event(this: *rust_task, killed: &mut *libc::c_void) -> bool;
fn task_signal_event(target: *rust_task, event: *libc::c_void); pure fn task_signal_event(target: *rust_task, event: *libc::c_void);
}
// We should consider moving this to core::unsafe, although I
// suspect graydon would want us to use void pointers instead.
unsafe fn uniquify<T>(x: *T) -> ~T {
unsafe { unsafe::reinterpret_cast(x) }
} }
fn wait_event(this: *rust_task) -> *libc::c_void { fn wait_event(this: *rust_task) -> *libc::c_void {
@ -110,18 +215,42 @@ fn swap_state_rel(&dst: state, src: state) -> state {
} }
} }
unsafe fn get_buffer<T: send>(p: *packet_header) -> ~buffer<T> {
transmute((*p).buf_header())
}
class buffer_resource<T: send> {
let buffer: ~buffer<T>;
new(+b: ~buffer<T>) {
self.buffer = b;
}
drop unsafe {
let b = move!{self.buffer};
let old_count = atomic_xchng_rel(b.header.ref_count, 0);
if old_count == 0 {
// go go gadget drop glue
}
else {
forget(b)
}
}
}
fn send<T: send>(-p: send_packet<T>, -payload: T) { fn send<T: send>(-p: send_packet<T>, -payload: T) {
let header = p.header();
let p_ = p.unwrap(); let p_ = p.unwrap();
let p = unsafe { uniquify(p_) }; let p = unsafe { &*p_ };
assert (*p).payload == none; assert ptr::addr_of(p.header) == header;
(*p).payload <- some(payload); assert p.payload == none;
p.payload <- some(payload);
let old_state = swap_state_rel(p.header.state, full); let old_state = swap_state_rel(p.header.state, full);
alt old_state { alt old_state {
empty { empty {
// Yay, fastpath. // Yay, fastpath.
// The receiver will eventually clean this up. // The receiver will eventually clean this up.
unsafe { forget(p); } //unsafe { forget(p); }
} }
full { fail ~"duplicate send" } full { fail ~"duplicate send" }
blocked { blocked {
@ -135,7 +264,7 @@ fn send<T: send>(-p: send_packet<T>, -payload: T) {
} }
// The receiver will eventually clean this up. // The receiver will eventually clean this up.
unsafe { forget(p); } //unsafe { forget(p); }
} }
terminated { terminated {
// The receiver will never receive this. Rely on drop_glue // The receiver will never receive this. Rely on drop_glue
@ -150,7 +279,7 @@ fn recv<T: send>(-p: recv_packet<T>) -> T {
fn try_recv<T: send>(-p: recv_packet<T>) -> option<T> { fn try_recv<T: send>(-p: recv_packet<T>) -> option<T> {
let p_ = p.unwrap(); let p_ = p.unwrap();
let p = unsafe { uniquify(p_) }; let p = unsafe { &*p_ };
let this = rustrt::rust_get_task(); let this = rustrt::rust_get_task();
rustrt::task_clear_event_reject(this); rustrt::task_clear_event_reject(this);
p.header.blocked_task = some(this); p.header.blocked_task = some(this);
@ -163,7 +292,7 @@ fn try_recv<T: send>(-p: recv_packet<T>) -> option<T> {
empty { empty {
#debug("no data available on %?, going to sleep.", p_); #debug("no data available on %?, going to sleep.", p_);
wait_event(this); wait_event(this);
#debug("woke up, p.state = %?", p.header.state); #debug("woke up, p.state = %?", copy p.header.state);
} }
blocked { blocked {
if first { if first {
@ -172,7 +301,7 @@ fn try_recv<T: send>(-p: recv_packet<T>) -> option<T> {
} }
full { full {
let mut payload = none; let mut payload = none;
payload <-> (*p).payload; payload <-> p.payload;
p.header.state = terminated; p.header.state = terminated;
ret some(option::unwrap(payload)) ret some(option::unwrap(payload))
} }
@ -195,11 +324,11 @@ pure fn peek<T: send>(p: recv_packet<T>) -> bool {
} }
fn sender_terminate<T: send>(p: *packet<T>) { fn sender_terminate<T: send>(p: *packet<T>) {
let p = unsafe { uniquify(p) }; let p = unsafe { &*p };
alt swap_state_rel(p.header.state, terminated) { alt swap_state_rel(p.header.state, terminated) {
empty { empty {
// The receiver will eventually clean up. // The receiver will eventually clean up.
unsafe { forget(p) } //unsafe { forget(p) }
} }
blocked { blocked {
// wake up the target // wake up the target
@ -208,7 +337,7 @@ fn sender_terminate<T: send>(p: *packet<T>) {
ptr::addr_of(p.header) as *libc::c_void); ptr::addr_of(p.header) as *libc::c_void);
// The receiver will eventually clean up. // The receiver will eventually clean up.
unsafe { forget(p) } //unsafe { forget(p) }
} }
full { full {
// This is impossible // This is impossible
@ -221,11 +350,11 @@ fn sender_terminate<T: send>(p: *packet<T>) {
} }
fn receiver_terminate<T: send>(p: *packet<T>) { fn receiver_terminate<T: send>(p: *packet<T>) {
let p = unsafe { uniquify(p) }; let p = unsafe { &*p };
alt swap_state_rel(p.header.state, terminated) { alt swap_state_rel(p.header.state, terminated) {
empty { empty {
// the sender will clean up // the sender will clean up
unsafe { forget(p) } //unsafe { forget(p) }
} }
blocked { blocked {
// this shouldn't happen. // this shouldn't happen.
@ -237,24 +366,6 @@ fn receiver_terminate<T: send>(p: *packet<T>) {
} }
} }
impl private_methods for *packet_header {
// Returns the old state.
unsafe fn mark_blocked(this: *rust_task) -> state {
let self = &*self;
self.blocked_task = some(this);
swap_state_acq(self.state, blocked)
}
unsafe fn unblock() {
let self = &*self;
alt swap_state_acq(self.state, empty) {
empty | blocked { }
terminated { self.state = terminated; }
full { self.state = full; }
}
}
}
#[doc = "Returns when one of the packet headers reports data is #[doc = "Returns when one of the packet headers reports data is
available."] available."]
fn wait_many(pkts: &[*packet_header]) -> uint { fn wait_many(pkts: &[*packet_header]) -> uint {
@ -264,6 +375,7 @@ fn wait_many(pkts: &[*packet_header]) -> uint {
let mut data_avail = false; let mut data_avail = false;
let mut ready_packet = pkts.len(); let mut ready_packet = pkts.len();
for pkts.eachi |i, p| unsafe { for pkts.eachi |i, p| unsafe {
let p = unsafe { &*p };
let old = p.mark_blocked(this); let old = p.mark_blocked(this);
alt old { alt old {
full | terminated { full | terminated {
@ -295,7 +407,7 @@ fn wait_many(pkts: &[*packet_header]) -> uint {
#debug("%?", pkts[ready_packet]); #debug("%?", pkts[ready_packet]);
for pkts.each |p| { unsafe{p.unblock()} } for pkts.each |p| { unsafe{ (*p).unblock()} }
#debug("%?, %?", ready_packet, pkts[ready_packet]); #debug("%?, %?", ready_packet, pkts[ready_packet]);
@ -359,11 +471,23 @@ fn select<T: send>(+endpoints: ~[recv_packet<T>])
(ready, result, remaining) (ready, result, remaining)
} }
class send_packet<T: send> { type send_packet<T: send> = send_packet_buffered<T, packet<T>>;
fn send_packet<T: send>(p: *packet<T>) -> send_packet<T> {
send_packet_buffered(p)
}
class send_packet_buffered<T: send, Tbuffer: send> {
let mut p: option<*packet<T>>; let mut p: option<*packet<T>>;
let mut buffer: option<buffer_resource<Tbuffer>>;
new(p: *packet<T>) { new(p: *packet<T>) {
//#debug("take send %?", p); //#debug("take send %?", p);
self.p = some(p); self.p = some(p);
unsafe {
self.buffer = some(
buffer_resource(
get_buffer(ptr::addr_of((*p).header))));
};
} }
drop { drop {
//if self.p != none { //if self.p != none {
@ -380,13 +504,39 @@ class send_packet<T: send> {
p <-> self.p; p <-> self.p;
option::unwrap(p) option::unwrap(p)
} }
pure fn header() -> *packet_header {
alt self.p {
some(packet) {
unsafe {
let packet = &*packet;
let header = ptr::addr_of(packet.header);
//forget(packet);
header
}
}
none { fail ~"packet already consumed" }
}
}
} }
class recv_packet<T: send> { type recv_packet<T: send> = recv_packet_buffered<T, packet<T>>;
fn recv_packet<T: send>(p: *packet<T>) -> recv_packet<T> {
recv_packet_buffered(p)
}
class recv_packet_buffered<T: send, Tbuffer: send> : selectable {
let mut p: option<*packet<T>>; let mut p: option<*packet<T>>;
let mut buffer: option<buffer_resource<Tbuffer>>;
new(p: *packet<T>) { new(p: *packet<T>) {
//#debug("take recv %?", p); //#debug("take recv %?", p);
self.p = some(p); self.p = some(p);
unsafe {
self.buffer = some(
buffer_resource(
get_buffer(ptr::addr_of((*p).header))));
};
} }
drop { drop {
//if self.p != none { //if self.p != none {
@ -408,9 +558,9 @@ class recv_packet<T: send> {
alt self.p { alt self.p {
some(packet) { some(packet) {
unsafe { unsafe {
let packet = uniquify(packet); let packet = &*packet;
let header = ptr::addr_of(packet.header); let header = ptr::addr_of(packet.header);
forget(packet); //forget(packet);
header header
} }
} }

View file

@ -51,10 +51,12 @@ impl compile of gen_send for message {
|n, t| cx.arg_mode(n, t, ast::by_copy) |n, t| cx.arg_mode(n, t, ast::by_copy)
); );
let pipe_ty = cx.ty_path_ast_builder(
path(this.data_name())
.add_tys(cx.ty_vars(this.ty_params)));
let args_ast = vec::append( let args_ast = vec::append(
~[cx.arg_mode(@~"pipe", ~[cx.arg_mode(@~"pipe",
cx.ty_path_ast_builder(path(this.data_name()) pipe_ty,
.add_tys(cx.ty_vars(this.ty_params))),
ast::by_copy)], ast::by_copy)],
args_ast); args_ast);
@ -73,6 +75,7 @@ impl compile of gen_send for message {
.map(|x| *x), .map(|x| *x),
~", ")); ~", "));
body += #fmt("pipes::send(pipe, message);\n"); body += #fmt("pipes::send(pipe, message);\n");
// return the new channel
body += ~"c }"; body += ~"c }";
let body = cx.parse_expr(body); let body = cx.parse_expr(body);