From e0a80ee332dc347d588bfcea66c11896c04263bb Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Sat, 17 Aug 2013 00:34:24 -0700 Subject: [PATCH] std: support async/threadpool & sync paths in uv_fs_* calls + add sync test --- src/libstd/rt/uv/file.rs | 204 ++++++++++++++++++++++++++++++++------- 1 file changed, 169 insertions(+), 35 deletions(-) diff --git a/src/libstd/rt/uv/file.rs b/src/libstd/rt/uv/file.rs index ef8c131688b..35e425ce659 100644 --- a/src/libstd/rt/uv/file.rs +++ b/src/libstd/rt/uv/file.rs @@ -147,49 +147,92 @@ impl FileDescriptor { FileDescriptor::new(req.get_result()) } + fn open_common(loop_: Loop, path: Path, flags: int, mode: int, + cb: Option) -> int { + let complete_cb_ptr = match cb { + Some(_) => compl_cb, + None => 0 as *u8 + }; + let is_sync = cb.is_none(); + let req = FsRequest::new(cb); + let result = path.to_str().to_c_str().with_ref(|p| unsafe { + uvll::fs_open(loop_.native_handle(), + req.native_handle(), p, flags, mode, complete_cb_ptr) as int + }); + if is_sync { req.cleanup_and_delete(); } + result + } pub fn open(loop_: Loop, path: Path, flags: int, mode: int, cb: FsCallback) -> int { - let req = FsRequest::new(Some(cb)); - path.to_str().to_c_str().with_ref(|p| unsafe { - uvll::fs_open(loop_.native_handle(), - req.native_handle(), p, flags, mode, complete_cb) as int - }) + FileDescriptor::open_common(loop_, path, flags, mode, Some(cb)) + } + pub fn open_sync(loop_: Loop, path: Path, flags: int, mode: int) -> int { + FileDescriptor::open_common(loop_, path, flags, mode, None) } - pub fn unlink(loop_: Loop, path: Path, cb: FsCallback) -> int { - let req = FsRequest::new(Some(cb)); - path.to_str().to_c_str().with_ref(|p| unsafe { + fn unlink_common(loop_: Loop, path: Path, cb: Option) -> int { + let complete_cb_ptr = match cb { + Some(_) => compl_cb, + None => 0 as *u8 + }; + let is_sync = cb.is_none(); + let req = FsRequest::new(cb); + let result = path.to_str().to_c_str().with_ref(|p| unsafe { uvll::fs_unlink(loop_.native_handle(), - req.native_handle(), p, complete_cb) as int - }) + req.native_handle(), p, complete_cb_ptr) as int + }); + if is_sync { req.cleanup_and_delete(); } + result + } + pub fn unlink(loop_: Loop, path: Path, cb: FsCallback) -> int { + FileDescriptor::unlink_common(loop_, path, Some(cb)) + } + pub fn unlink_sync(loop_: Loop, path: Path) -> int { + FileDescriptor::unlink_common(loop_, path, None) } // as per bnoordhuis in #libuv: offset >= 0 uses prwrite instead of write - pub fn write(&self, loop_: Loop, buf: ~[u8], offset: i64, cb: FsCallback) + fn write_common(&self, loop_: Loop, buf: ~[u8], offset: i64, cb: Option) -> int { - let mut req = FsRequest::new(Some(cb)); + let complete_cb_ptr = match cb { + Some(_) => compl_cb, + None => 0 as *u8 + }; + let is_sync = cb.is_none(); + let mut req = FsRequest::new(cb); let len = buf.len(); let buf = vec_to_uv_buf(buf); let base_ptr = buf.base as *c_void; req.get_req_data().buf = Some(buf); req.get_req_data().raw_fd = Some(self.native_handle()); - unsafe { + let result = unsafe { uvll::fs_write(loop_.native_handle(), req.native_handle(), self.native_handle(), base_ptr, - len, offset, complete_cb) as int - } + len, offset, complete_cb_ptr) as int + }; + if is_sync { req.cleanup_and_delete(); } + result + } + pub fn write(&self, loop_: Loop, buf: ~[u8], offset: i64, cb: FsCallback) + -> int { + self.write_common(loop_, buf, offset, Some(cb)) + } + pub fn write_sync(&self, loop_: Loop, buf: ~[u8], offset: i64) + -> int { + self.write_common(loop_, buf, offset, None) } - // really contemplated having this just take a read_len param and have - // the buf live in the scope of this request.. but decided that exposing - // an unsafe mechanism that takes a buf_ptr and len would be much more - // flexible, but the caller is now in the position of managing that - // buf (with all of the sadface that this entails) - pub fn read(&self, loop_: Loop, buf_ptr: Option<*c_void>, len: uint, offset: i64, cb: FsCallback) + fn read_common(&self, loop_: Loop, buf_ptr: Option<*c_void>, + len: uint, offset: i64, cb: Option) -> int { - let mut req = FsRequest::new(Some(cb)); + let complete_cb_ptr = match cb { + Some(_) => compl_cb, + None => 0 as *u8 + }; + let is_sync = cb.is_none(); + let mut req = FsRequest::new(cb); req.get_req_data().raw_fd = Some(self.native_handle()); - unsafe { + let result = unsafe { let buf_ptr = match buf_ptr { Some(ptr) => ptr, None => { @@ -201,19 +244,43 @@ impl FileDescriptor { }; uvll::fs_read(loop_.native_handle(), req.native_handle(), self.native_handle(), buf_ptr, - len, offset, complete_cb) as int - } + len, offset, complete_cb_ptr) as int + }; + if is_sync { req.cleanup_and_delete(); } + result + } + pub fn read(&self, loop_: Loop, buf_ptr: Option<*c_void>, + len: uint, offset: i64, cb: FsCallback) + -> int { + self.read_common(loop_, buf_ptr, len, offset, Some(cb)) + } + pub fn read_sync(&self, loop_: Loop, buf_ptr: Option<*c_void>, len: uint, offset: i64) + -> int { + self.read_common(loop_, buf_ptr, len, offset, None) } - pub fn close(self, loop_: Loop, cb: FsCallback) -> int { - let req = FsRequest::new(Some(cb)); - unsafe { + fn close_common(self, loop_: Loop, cb: Option) -> int { + let complete_cb_ptr = match cb { + Some(_) => compl_cb, + None => 0 as *u8 + }; + let is_sync = cb.is_none(); + let req = FsRequest::new(cb); + let result = unsafe { uvll::fs_close(loop_.native_handle(), req.native_handle(), - self.native_handle(), complete_cb) as int - } + self.native_handle(), complete_cb_ptr) as int + }; + if is_sync { req.cleanup_and_delete(); } + result + } + pub fn close(self, loop_: Loop, cb: FsCallback) -> int { + self.close_common(loop_, Some(cb)) + } + pub fn close_sync(self, loop_: Loop) -> int { + self.close_common(loop_, None) } } -extern fn complete_cb(req: *uv_fs_t) { +extern fn compl_cb(req: *uv_fs_t) { let mut req: FsRequest = NativeHandle::from_native_handle(req); let loop_ = req.get_loop(); // pull the user cb out of the req data @@ -249,17 +316,18 @@ impl NativeHandle for FileDescriptor { mod test { use super::*; //use rt::test::*; - use libc::{STDOUT_FILENO}; + use option::{Some}; + use libc::{STDOUT_FILENO, c_void}; + use vec; use str; use unstable::run_in_bare_thread; use path::Path; - use rt::uv::{Loop, vec_from_uv_buf};//, slice_to_uv_buf}; + use rt::uv::{Loop, vec_to_uv_buf, vec_from_uv_buf, + status_to_maybe_uv_error_with_loop}; use option::{None}; fn file_test_full_simple_impl() { - debug!("hello?") do run_in_bare_thread { - debug!("In bare thread") let mut loop_ = Loop::new(); let create_flags = map_flag(O_RDWR) | map_flag(O_CREAT); @@ -321,12 +389,78 @@ mod test { loop_.close(); } } + fn file_test_full_simple_impl_sync() { + do run_in_bare_thread { + // setup + let mut loop_ = Loop::new(); + let create_flags = map_flag(O_RDWR) | + map_flag(O_CREAT); + let read_flags = map_flag(O_RDONLY); + // 0644 + let mode = map_mode(S_IWUSR) | + map_mode(S_IRUSR) | + map_mode(S_IRGRP) | + map_mode(S_IROTH); + let path_str = "./file_full_simple_sync.txt"; + let write_val = "hello"; + // open/create + let result = FileDescriptor::open_sync(loop_, Path(path_str), create_flags, mode); + assert!(status_to_maybe_uv_error_with_loop( + loop_.native_handle(), result as i32).is_none()); + let fd = FileDescriptor(result as i32); + let msg: ~[u8] = write_val.as_bytes().to_owned(); + // write + let result = fd.write_sync(loop_, msg, -1); + assert!(status_to_maybe_uv_error_with_loop( + loop_.native_handle(), result as i32).is_none()); + // close + let result = fd.close_sync(loop_); + assert!(status_to_maybe_uv_error_with_loop( + loop_.native_handle(), result as i32).is_none()); + // re-open + let result = FileDescriptor::open_sync(loop_, Path(path_str), read_flags,0); + assert!(status_to_maybe_uv_error_with_loop( + loop_.native_handle(), result as i32).is_none()); + let len = 1028; + let fd = FileDescriptor(result as i32); + // read + let buf: ~[u8] = vec::from_elem(len, 0u8); + let buf = vec_to_uv_buf(buf); + let buf_ptr = buf.base as *c_void; + let result = fd.read_sync(loop_, Some(buf_ptr), len, 0); + assert!(status_to_maybe_uv_error_with_loop( + loop_.native_handle(), result as i32).is_none()); + let nread = result; + // nread == 0 would be EOF.. we know it's >= zero because otherwise + // the above assert would fail + if nread > 0 { + let buf = vec_from_uv_buf(buf).take_unwrap(); + let read_str = str::from_bytes( + buf.slice(0, nread as uint)); + assert!(read_str == ~"hello"); + // close + let result = fd.close_sync(loop_); + assert!(status_to_maybe_uv_error_with_loop( + loop_.native_handle(), result as i32).is_none()); + // unlink + let result = FileDescriptor::unlink_sync(loop_, Path(path_str)); + assert!(status_to_maybe_uv_error_with_loop( + loop_.native_handle(), result as i32).is_none()); + } else { fail!("nread was 0.. wudn't expectin' that."); } + loop_.close(); + } + } #[test] fn file_test_full_simple() { file_test_full_simple_impl(); } + #[test] + fn file_test_full_simple_sync() { + file_test_full_simple_impl_sync(); + } + fn naive_print(loop_: Loop, input: ~str) { let stdout = FileDescriptor(STDOUT_FILENO); let msg = input.as_bytes().to_owned();