auto merge of #8655 : olsonjeffery/rust/newrt_file_io, r=pcwalton,brson
This PR includes the addition of the essential CRUD functionality exposed as a part of the `uv_fs_*` api. There's a lot more to be done, but the essential abstractions are in place and can be easily expanded. A summary: * `rt::io::file::FileStream` is fleshed out and behaves as a *non-positional* file stream (that is, it has a cursor that can be viewed/changed via `tell` and `seek` * The underlying abstraction in `RtioFileStream` exposes pairs of `read(), write()` and `pread(), pwrite()`. The latter two take explicit `offset` params and don't respect the current cursor location in a file afaik. They both use the same underlying libuv impl * Because libuv explicitly does *not* support `seek`/`tell` operations, these are impl'd in `UvFileStream` by using `lseek(2)` on the raw file descriptor. * I did my best to flesh out and adhere to the stubbing that was already present in `rt::io::file` and the tests should back that up. There may be things missing. * All of the work to test `seek`/`tell` is done in `rt::io::file`, even though the actual impl is down in `rt::uv::uvio`. * We have the ability to spin up an `~RtioFileStream` from a raw file descriptor. This would be useful for interacting with stdin and stdout via newrt. * The lowest level abstractions (in `rt::uv::file`) support fully synchronous/blocking interactions with the uv API and there is a CRUD test using it. This may also be useful for blocking printf, if desired (the default would be non-blocking and uses libuv's io threadpool) There are a few polish things I need to do still (the foremost that I know of is undefined behavior when seek'ing beyond the file's boundary). After this lands, I want to move on to mapping more of the `uv_fs_*` API (especially `uv_fs_stat`). Also a few people have mentioned interest in `uv_pipe_t` support. I'm open to suggestions.
This commit is contained in:
commit
95c542e7fe
@ -11,69 +11,332 @@
|
||||
use prelude::*;
|
||||
use super::support::PathLike;
|
||||
use super::{Reader, Writer, Seek};
|
||||
use super::SeekStyle;
|
||||
use super::{SeekSet, SeekCur, SeekEnd, SeekStyle};
|
||||
use rt::rtio::{RtioFileStream, IoFactory, IoFactoryObject};
|
||||
use rt::io::{io_error, read_error, EndOfFile,
|
||||
FileMode, FileAccess, Open, Read, Create, ReadWrite};
|
||||
use rt::local::Local;
|
||||
use rt::test::*;
|
||||
|
||||
/// # FIXME #7785
|
||||
/// * Ugh, this is ridiculous. What is the best way to represent these options?
|
||||
enum FileMode {
|
||||
/// Opens an existing file. IoError if file does not exist.
|
||||
Open,
|
||||
/// Creates a file. IoError if file exists.
|
||||
Create,
|
||||
/// Opens an existing file or creates a new one.
|
||||
OpenOrCreate,
|
||||
/// Opens an existing file or creates a new one, positioned at EOF.
|
||||
Append,
|
||||
/// Opens an existing file, truncating it to 0 bytes.
|
||||
Truncate,
|
||||
/// Opens an existing file or creates a new one, truncating it to 0 bytes.
|
||||
CreateOrTruncate,
|
||||
/// Open a file for reading/writing, as indicated by `path`.
|
||||
pub fn open<P: PathLike>(path: &P,
|
||||
mode: FileMode,
|
||||
access: FileAccess
|
||||
) -> Option<FileStream> {
|
||||
let open_result = unsafe {
|
||||
let io = Local::unsafe_borrow::<IoFactoryObject>();
|
||||
(*io).fs_open(path, mode, access)
|
||||
};
|
||||
match open_result {
|
||||
Ok(fd) => Some(FileStream {
|
||||
fd: fd,
|
||||
last_nread: -1
|
||||
}),
|
||||
Err(ioerr) => {
|
||||
io_error::cond.raise(ioerr);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum FileAccess {
|
||||
Read,
|
||||
Write,
|
||||
ReadWrite
|
||||
/// Unlink (remove) a file from the filesystem, as indicated
|
||||
/// by `path`.
|
||||
pub fn unlink<P: PathLike>(path: &P) {
|
||||
let unlink_result = unsafe {
|
||||
let io = Local::unsafe_borrow::<IoFactoryObject>();
|
||||
(*io).fs_unlink(path)
|
||||
};
|
||||
match unlink_result {
|
||||
Ok(_) => (),
|
||||
Err(ioerr) => {
|
||||
io_error::cond.raise(ioerr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FileStream;
|
||||
/// Abstraction representing *positional* access to a file. In this case,
|
||||
/// *positional* refers to it keeping an encounter *cursor* of where in the
|
||||
/// file a subsequent `read` or `write` will begin from. Users of a `FileStream`
|
||||
/// can `seek` to move the cursor to a given location *within the bounds of the
|
||||
/// file* and can ask to have the `FileStream` `tell` them the location, in
|
||||
/// bytes, of the cursor.
|
||||
///
|
||||
/// This abstraction is roughly modeled on the access workflow as represented
|
||||
/// by `open(2)`, `read(2)`, `write(2)` and friends.
|
||||
///
|
||||
/// The `open` and `unlink` static methods are provided to manage creation/removal
|
||||
/// of files. All other methods operatin on an instance of `FileStream`.
|
||||
pub struct FileStream {
|
||||
fd: ~RtioFileStream,
|
||||
last_nread: int,
|
||||
}
|
||||
|
||||
impl FileStream {
|
||||
pub fn open<P: PathLike>(_path: &P,
|
||||
_mode: FileMode,
|
||||
_access: FileAccess
|
||||
) -> Option<FileStream> {
|
||||
fail!()
|
||||
}
|
||||
}
|
||||
|
||||
impl Reader for FileStream {
|
||||
fn read(&mut self, _buf: &mut [u8]) -> Option<uint> {
|
||||
fail!()
|
||||
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
|
||||
match self.fd.read(buf) {
|
||||
Ok(read) => {
|
||||
self.last_nread = read;
|
||||
match read {
|
||||
0 => None,
|
||||
_ => Some(read as uint)
|
||||
}
|
||||
},
|
||||
Err(ioerr) => {
|
||||
// EOF is indicated by returning None
|
||||
if ioerr.kind != EndOfFile {
|
||||
read_error::cond.raise(ioerr);
|
||||
}
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn eof(&mut self) -> bool {
|
||||
fail!()
|
||||
self.last_nread == 0
|
||||
}
|
||||
}
|
||||
|
||||
impl Writer for FileStream {
|
||||
fn write(&mut self, _v: &[u8]) { fail!() }
|
||||
fn write(&mut self, buf: &[u8]) {
|
||||
match self.fd.write(buf) {
|
||||
Ok(_) => (),
|
||||
Err(ioerr) => {
|
||||
io_error::cond.raise(ioerr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn flush(&mut self) { fail!() }
|
||||
fn flush(&mut self) {
|
||||
match self.fd.flush() {
|
||||
Ok(_) => (),
|
||||
Err(ioerr) => {
|
||||
read_error::cond.raise(ioerr);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Seek for FileStream {
|
||||
fn tell(&self) -> u64 { fail!() }
|
||||
fn tell(&self) -> u64 {
|
||||
let res = self.fd.tell();
|
||||
match res {
|
||||
Ok(cursor) => cursor,
|
||||
Err(ioerr) => {
|
||||
read_error::cond.raise(ioerr);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn seek(&mut self, _pos: i64, _style: SeekStyle) { fail!() }
|
||||
fn seek(&mut self, pos: i64, style: SeekStyle) {
|
||||
match self.fd.seek(pos, style) {
|
||||
Ok(_) => {
|
||||
// successful seek resets EOF indicator
|
||||
self.last_nread = -1;
|
||||
()
|
||||
},
|
||||
Err(ioerr) => {
|
||||
read_error::cond.raise(ioerr);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn file_test_smoke_test_impl() {
|
||||
do run_in_newsched_task {
|
||||
let message = "it's alright. have a good time";
|
||||
let filename = &Path("./tmp/file_rt_io_file_test.txt");
|
||||
{
|
||||
let mut write_stream = open(filename, Create, ReadWrite).unwrap();
|
||||
write_stream.write(message.as_bytes());
|
||||
}
|
||||
{
|
||||
use str;
|
||||
let mut read_stream = open(filename, Open, Read).unwrap();
|
||||
let mut read_buf = [0, .. 1028];
|
||||
let read_str = match read_stream.read(read_buf).unwrap() {
|
||||
-1|0 => fail!("shouldn't happen"),
|
||||
n => str::from_bytes(read_buf.slice_to(n))
|
||||
};
|
||||
assert!(read_str == message.to_owned());
|
||||
}
|
||||
unlink(filename);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn super_simple_smoke_test_lets_go_read_some_files_and_have_a_good_time() {
|
||||
let message = "it's alright. have a good time";
|
||||
let filename = &Path("test.txt");
|
||||
let mut outstream = FileStream::open(filename, Create, Read).unwrap();
|
||||
outstream.write(message.as_bytes());
|
||||
fn file_test_io_smoke_test() {
|
||||
file_test_smoke_test_impl();
|
||||
}
|
||||
|
||||
fn file_test_invalid_path_opened_without_create_should_raise_condition_impl() {
|
||||
do run_in_newsched_task {
|
||||
let filename = &Path("./tmp/file_that_does_not_exist.txt");
|
||||
let mut called = false;
|
||||
do io_error::cond.trap(|_| {
|
||||
called = true;
|
||||
}).inside {
|
||||
let result = open(filename, Open, Read);
|
||||
assert!(result.is_none());
|
||||
}
|
||||
assert!(called);
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn file_test_io_invalid_path_opened_without_create_should_raise_condition() {
|
||||
file_test_invalid_path_opened_without_create_should_raise_condition_impl();
|
||||
}
|
||||
|
||||
fn file_test_unlinking_invalid_path_should_raise_condition_impl() {
|
||||
do run_in_newsched_task {
|
||||
let filename = &Path("./tmp/file_another_file_that_does_not_exist.txt");
|
||||
let mut called = false;
|
||||
do io_error::cond.trap(|_| {
|
||||
called = true;
|
||||
}).inside {
|
||||
unlink(filename);
|
||||
}
|
||||
assert!(called);
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn file_test_iounlinking_invalid_path_should_raise_condition() {
|
||||
file_test_unlinking_invalid_path_should_raise_condition_impl();
|
||||
}
|
||||
|
||||
fn file_test_io_non_positional_read_impl() {
|
||||
do run_in_newsched_task {
|
||||
use str;
|
||||
let message = "ten-four";
|
||||
let mut read_mem = [0, .. 8];
|
||||
let filename = &Path("./tmp/file_rt_io_file_test_positional.txt");
|
||||
{
|
||||
let mut rw_stream = open(filename, Create, ReadWrite).unwrap();
|
||||
rw_stream.write(message.as_bytes());
|
||||
}
|
||||
{
|
||||
let mut read_stream = open(filename, Open, Read).unwrap();
|
||||
{
|
||||
let read_buf = read_mem.mut_slice(0, 4);
|
||||
read_stream.read(read_buf);
|
||||
}
|
||||
{
|
||||
let read_buf = read_mem.mut_slice(4, 8);
|
||||
read_stream.read(read_buf);
|
||||
}
|
||||
}
|
||||
unlink(filename);
|
||||
let read_str = str::from_bytes(read_mem);
|
||||
assert!(read_str == message.to_owned());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn file_test_io_non_positional_read() {
|
||||
file_test_io_non_positional_read_impl();
|
||||
}
|
||||
|
||||
fn file_test_io_seeking_impl() {
|
||||
do run_in_newsched_task {
|
||||
use str;
|
||||
let message = "ten-four";
|
||||
let mut read_mem = [0, .. 4];
|
||||
let set_cursor = 4 as u64;
|
||||
let mut tell_pos_pre_read;
|
||||
let mut tell_pos_post_read;
|
||||
let filename = &Path("./tmp/file_rt_io_file_test_seeking.txt");
|
||||
{
|
||||
let mut rw_stream = open(filename, Create, ReadWrite).unwrap();
|
||||
rw_stream.write(message.as_bytes());
|
||||
}
|
||||
{
|
||||
let mut read_stream = open(filename, Open, Read).unwrap();
|
||||
read_stream.seek(set_cursor as i64, SeekSet);
|
||||
tell_pos_pre_read = read_stream.tell();
|
||||
read_stream.read(read_mem);
|
||||
tell_pos_post_read = read_stream.tell();
|
||||
}
|
||||
unlink(filename);
|
||||
let read_str = str::from_bytes(read_mem);
|
||||
assert!(read_str == message.slice(4, 8).to_owned());
|
||||
assert!(tell_pos_pre_read == set_cursor);
|
||||
assert!(tell_pos_post_read == message.len() as u64);
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn file_test_io_seek_and_tell_smoke_test() {
|
||||
file_test_io_seeking_impl();
|
||||
}
|
||||
|
||||
fn file_test_io_seek_and_write_impl() {
|
||||
use io;
|
||||
do run_in_newsched_task {
|
||||
use str;
|
||||
let initial_msg = "food-is-yummy";
|
||||
let overwrite_msg = "-the-bar!!";
|
||||
let final_msg = "foo-the-bar!!";
|
||||
let seek_idx = 3;
|
||||
let mut read_mem = [0, .. 13];
|
||||
let filename = &Path("./tmp/file_rt_io_file_test_seek_and_write.txt");
|
||||
{
|
||||
let mut rw_stream = open(filename, Create, ReadWrite).unwrap();
|
||||
rw_stream.write(initial_msg.as_bytes());
|
||||
rw_stream.seek(seek_idx as i64, SeekSet);
|
||||
rw_stream.write(overwrite_msg.as_bytes());
|
||||
}
|
||||
{
|
||||
let mut read_stream = open(filename, Open, Read).unwrap();
|
||||
read_stream.read(read_mem);
|
||||
}
|
||||
unlink(filename);
|
||||
let read_str = str::from_bytes(read_mem);
|
||||
io::println(fmt!("read_str: '%?' final_msg: '%?'", read_str, final_msg));
|
||||
assert!(read_str == final_msg.to_owned());
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn file_test_io_seek_and_write() {
|
||||
file_test_io_seek_and_write_impl();
|
||||
}
|
||||
|
||||
fn file_test_io_seek_shakedown_impl() {
|
||||
do run_in_newsched_task {
|
||||
use str; // 01234567890123
|
||||
let initial_msg = "qwer-asdf-zxcv";
|
||||
let chunk_one = "qwer";
|
||||
let chunk_two = "asdf";
|
||||
let chunk_three = "zxcv";
|
||||
let mut read_mem = [0, .. 4];
|
||||
let filename = &Path("./tmp/file_rt_io_file_test_seek_shakedown.txt");
|
||||
{
|
||||
let mut rw_stream = open(filename, Create, ReadWrite).unwrap();
|
||||
rw_stream.write(initial_msg.as_bytes());
|
||||
}
|
||||
{
|
||||
let mut read_stream = open(filename, Open, Read).unwrap();
|
||||
|
||||
read_stream.seek(-4, SeekEnd);
|
||||
read_stream.read(read_mem);
|
||||
let read_str = str::from_bytes(read_mem);
|
||||
assert!(read_str == chunk_three.to_owned());
|
||||
|
||||
read_stream.seek(-9, SeekCur);
|
||||
read_stream.read(read_mem);
|
||||
let read_str = str::from_bytes(read_mem);
|
||||
assert!(read_str == chunk_two.to_owned());
|
||||
|
||||
read_stream.seek(0, SeekSet);
|
||||
read_stream.read(read_mem);
|
||||
let read_str = str::from_bytes(read_mem);
|
||||
assert!(read_str == chunk_one.to_owned());
|
||||
}
|
||||
unlink(filename);
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn file_test_io_seek_shakedown() {
|
||||
file_test_io_seek_shakedown_impl();
|
||||
}
|
||||
|
@ -461,6 +461,7 @@ pub enum SeekStyle {
|
||||
/// # XXX
|
||||
/// * Are `u64` and `i64` the right choices?
|
||||
pub trait Seek {
|
||||
/// Return position of file cursor in the stream
|
||||
fn tell(&self) -> u64;
|
||||
|
||||
/// Seek to an offset in a stream
|
||||
@ -539,3 +540,27 @@ pub fn placeholder_error() -> IoError {
|
||||
detail: None
|
||||
}
|
||||
}
|
||||
|
||||
/// Instructions on how to open a file and return a `FileStream`.
|
||||
pub enum FileMode {
|
||||
/// Opens an existing file. IoError if file does not exist.
|
||||
Open,
|
||||
/// Creates a file. IoError if file exists.
|
||||
Create,
|
||||
/// Opens an existing file or creates a new one.
|
||||
OpenOrCreate,
|
||||
/// Opens an existing file or creates a new one, positioned at EOF.
|
||||
Append,
|
||||
/// Opens an existing file, truncating it to 0 bytes.
|
||||
Truncate,
|
||||
/// Opens an existing file or creates a new one, truncating it to 0 bytes.
|
||||
CreateOrTruncate,
|
||||
}
|
||||
|
||||
/// Access permissions with which the file should be opened.
|
||||
/// `FileStream`s opened with `Read` will raise an `io_error` condition if written to.
|
||||
pub enum FileAccess {
|
||||
Read,
|
||||
Write,
|
||||
ReadWrite
|
||||
}
|
||||
|
@ -10,10 +10,15 @@
|
||||
|
||||
use option::*;
|
||||
use result::*;
|
||||
use libc::c_int;
|
||||
|
||||
use rt::io::IoError;
|
||||
use super::io::net::ip::{IpAddr, SocketAddr};
|
||||
use rt::uv::uvio;
|
||||
use path::Path;
|
||||
use super::io::support::PathLike;
|
||||
use super::io::{SeekStyle};
|
||||
use super::io::{FileMode, FileAccess};
|
||||
|
||||
// XXX: ~object doesn't work currently so these are some placeholder
|
||||
// types to use instead
|
||||
@ -46,11 +51,27 @@ pub trait RemoteCallback {
|
||||
fn fire(&mut self);
|
||||
}
|
||||
|
||||
/// Data needed to make a successful open(2) call
|
||||
/// Using unix flag conventions for now, which happens to also be what's supported
|
||||
/// libuv (it does translation to windows under the hood).
|
||||
pub struct FileOpenConfig {
|
||||
/// Path to file to be opened
|
||||
path: Path,
|
||||
/// Flags for file access mode (as per open(2))
|
||||
flags: int,
|
||||
/// File creation mode, ignored unless O_CREAT is passed as part of flags
|
||||
mode: int
|
||||
}
|
||||
|
||||
pub trait IoFactory {
|
||||
fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError>;
|
||||
fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError>;
|
||||
fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError>;
|
||||
fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError>;
|
||||
fn fs_from_raw_fd(&mut self, fd: c_int, close_on_drop: bool) -> ~RtioFileStream;
|
||||
fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess)
|
||||
-> Result<~RtioFileStream, IoError>;
|
||||
fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError>;
|
||||
}
|
||||
|
||||
pub trait RtioTcpListener : RtioSocket {
|
||||
@ -93,3 +114,13 @@ pub trait RtioUdpSocket : RtioSocket {
|
||||
pub trait RtioTimer {
|
||||
fn sleep(&mut self, msecs: u64);
|
||||
}
|
||||
|
||||
pub trait RtioFileStream {
|
||||
fn read(&mut self, buf: &mut [u8]) -> Result<int, IoError>;
|
||||
fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
|
||||
fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result<int, IoError>;
|
||||
fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError>;
|
||||
fn seek(&mut self, pos: i64, whence: SeekStyle) -> Result<u64, IoError>;
|
||||
fn tell(&self) -> Result<u64, IoError>;
|
||||
fn flush(&mut self) -> Result<(), IoError>;
|
||||
}
|
||||
|
@ -11,30 +11,123 @@
|
||||
use prelude::*;
|
||||
use ptr::null;
|
||||
use libc::c_void;
|
||||
use rt::uv::{Request, NativeHandle, Loop, FsCallback};
|
||||
use rt::uv::{Request, NativeHandle, Loop, FsCallback, Buf,
|
||||
status_to_maybe_uv_error_with_loop, UvError};
|
||||
use rt::uv::uvll;
|
||||
use rt::uv::uvll::*;
|
||||
use super::super::io::support::PathLike;
|
||||
use cast::transmute;
|
||||
use libc::{c_int};
|
||||
use option::{None, Some, Option};
|
||||
|
||||
pub struct FsRequest(*uvll::uv_fs_t);
|
||||
impl Request for FsRequest;
|
||||
|
||||
pub struct RequestData {
|
||||
complete_cb: Option<FsCallback>,
|
||||
raw_fd: Option<c_int>
|
||||
}
|
||||
|
||||
impl FsRequest {
|
||||
fn new() -> FsRequest {
|
||||
pub fn new(cb: Option<FsCallback>) -> FsRequest {
|
||||
let fs_req = unsafe { malloc_req(UV_FS) };
|
||||
assert!(fs_req.is_not_null());
|
||||
let fs_req = fs_req as *uvll::uv_write_t;
|
||||
unsafe { uvll::set_data_for_req(fs_req, null::<()>()); }
|
||||
NativeHandle::from_native_handle(fs_req)
|
||||
let fs_req: FsRequest = NativeHandle::from_native_handle(fs_req);
|
||||
fs_req.install_req_data(cb);
|
||||
fs_req
|
||||
}
|
||||
|
||||
fn delete(self) {
|
||||
unsafe { free_req(self.native_handle() as *c_void) }
|
||||
fn open_common<P: PathLike>(loop_: &Loop, path: &P, flags: int, mode: int,
|
||||
cb: Option<FsCallback>) -> int {
|
||||
let complete_cb_ptr = match cb {
|
||||
Some(_) => compl_cb as *u8,
|
||||
None => 0 as *u8
|
||||
};
|
||||
let is_sync = cb.is_none();
|
||||
let req = FsRequest::new(cb);
|
||||
let result = path.path_as_str(|p| {
|
||||
p.to_c_str().with_ref(|p| unsafe {
|
||||
uvll::fs_open(loop_.native_handle(),
|
||||
req.native_handle(), p, flags, mode, complete_cb_ptr) as int
|
||||
})
|
||||
});
|
||||
if is_sync { req.cleanup_and_delete(); }
|
||||
result
|
||||
}
|
||||
pub fn open<P: PathLike>(loop_: &Loop, path: &P, flags: int, mode: int,
|
||||
cb: FsCallback) {
|
||||
FsRequest::open_common(loop_, path, flags, mode, Some(cb));
|
||||
}
|
||||
|
||||
fn open(&mut self, _loop_: &Loop, _cb: FsCallback) {
|
||||
pub fn open_sync<P: PathLike>(loop_: &Loop, path: &P, flags: int, mode: int)
|
||||
-> Result<int, UvError> {
|
||||
let result = FsRequest::open_common(loop_, path, flags, mode, None);
|
||||
sync_cleanup(loop_, result)
|
||||
}
|
||||
|
||||
fn close(&mut self, _loop_: &Loop, _cb: FsCallback) {
|
||||
fn unlink_common<P: PathLike>(loop_: &Loop, path: &P, cb: Option<FsCallback>) -> int {
|
||||
let complete_cb_ptr = match cb {
|
||||
Some(_) => compl_cb as *u8,
|
||||
None => 0 as *u8
|
||||
};
|
||||
let is_sync = cb.is_none();
|
||||
let req = FsRequest::new(cb);
|
||||
let result = path.path_as_str(|p| {
|
||||
p.to_c_str().with_ref(|p| unsafe {
|
||||
uvll::fs_unlink(loop_.native_handle(),
|
||||
req.native_handle(), p, complete_cb_ptr) as int
|
||||
})
|
||||
});
|
||||
if is_sync { req.cleanup_and_delete(); }
|
||||
result
|
||||
}
|
||||
pub fn unlink<P: PathLike>(loop_: &Loop, path: &P, cb: FsCallback) {
|
||||
let result = FsRequest::unlink_common(loop_, path, Some(cb));
|
||||
sync_cleanup(loop_, result);
|
||||
}
|
||||
pub fn unlink_sync<P: PathLike>(loop_: &Loop, path: &P) -> Result<int, UvError> {
|
||||
let result = FsRequest::unlink_common(loop_, path, None);
|
||||
sync_cleanup(loop_, result)
|
||||
}
|
||||
|
||||
pub fn install_req_data(&self, cb: Option<FsCallback>) {
|
||||
let fs_req = (self.native_handle()) as *uvll::uv_write_t;
|
||||
let data = ~RequestData {
|
||||
complete_cb: cb,
|
||||
raw_fd: None
|
||||
};
|
||||
unsafe {
|
||||
let data = transmute::<~RequestData, *c_void>(data);
|
||||
uvll::set_data_for_req(fs_req, data);
|
||||
}
|
||||
}
|
||||
|
||||
fn get_req_data<'r>(&'r mut self) -> &'r mut RequestData {
|
||||
unsafe {
|
||||
let data = uvll::get_data_for_req((self.native_handle()));
|
||||
let data = transmute::<&*c_void, &mut ~RequestData>(&data);
|
||||
return &mut **data;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_result(&mut self) -> c_int {
|
||||
unsafe {
|
||||
uvll::get_result_from_fs_req(self.native_handle())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_loop(&self) -> Loop {
|
||||
unsafe { Loop{handle:uvll::get_loop_from_fs_req(self.native_handle())} }
|
||||
}
|
||||
|
||||
fn cleanup_and_delete(self) {
|
||||
unsafe {
|
||||
let data = uvll::get_data_for_req(self.native_handle());
|
||||
let _data = transmute::<*c_void, ~RequestData>(data);
|
||||
uvll::set_data_for_req(self.native_handle(), null::<()>());
|
||||
uvll::fs_req_cleanup(self.native_handle());
|
||||
free_req(self.native_handle() as *c_void)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -45,4 +138,299 @@ fn from_native_handle(handle: *uvll:: uv_fs_t) -> FsRequest {
|
||||
fn native_handle(&self) -> *uvll::uv_fs_t {
|
||||
match self { &FsRequest(ptr) => ptr }
|
||||
}
|
||||
}
|
||||
fn sync_cleanup(loop_: &Loop, result: int)
|
||||
-> Result<int, UvError> {
|
||||
match status_to_maybe_uv_error_with_loop(loop_.native_handle(), result as i32) {
|
||||
Some(err) => Err(err),
|
||||
None => Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FileDescriptor(c_int);
|
||||
impl FileDescriptor {
|
||||
fn new(fd: c_int) -> FileDescriptor {
|
||||
FileDescriptor(fd)
|
||||
}
|
||||
|
||||
|
||||
pub fn from_open_req(req: &mut FsRequest) -> FileDescriptor {
|
||||
FileDescriptor::new(req.get_result())
|
||||
}
|
||||
|
||||
// as per bnoordhuis in #libuv: offset >= 0 uses prwrite instead of write
|
||||
fn write_common(&mut self, loop_: &Loop, buf: Buf, offset: i64, cb: Option<FsCallback>)
|
||||
-> int {
|
||||
let complete_cb_ptr = match cb {
|
||||
Some(_) => compl_cb as *u8,
|
||||
None => 0 as *u8
|
||||
};
|
||||
let is_sync = cb.is_none();
|
||||
let mut req = FsRequest::new(cb);
|
||||
let base_ptr = buf.base as *c_void;
|
||||
let len = buf.len as uint;
|
||||
req.get_req_data().raw_fd = Some(self.native_handle());
|
||||
let result = unsafe {
|
||||
uvll::fs_write(loop_.native_handle(), req.native_handle(),
|
||||
self.native_handle(), base_ptr,
|
||||
len, offset, complete_cb_ptr) as int
|
||||
};
|
||||
if is_sync { req.cleanup_and_delete(); }
|
||||
result
|
||||
}
|
||||
pub fn write(&mut self, loop_: &Loop, buf: Buf, offset: i64, cb: FsCallback) {
|
||||
self.write_common(loop_, buf, offset, Some(cb));
|
||||
}
|
||||
pub fn write_sync(&mut self, loop_: &Loop, buf: Buf, offset: i64)
|
||||
-> Result<int, UvError> {
|
||||
let result = self.write_common(loop_, buf, offset, None);
|
||||
sync_cleanup(loop_, result)
|
||||
}
|
||||
|
||||
fn read_common(&mut self, loop_: &Loop, buf: Buf,
|
||||
offset: i64, cb: Option<FsCallback>)
|
||||
-> int {
|
||||
let complete_cb_ptr = match cb {
|
||||
Some(_) => compl_cb as *u8,
|
||||
None => 0 as *u8
|
||||
};
|
||||
let is_sync = cb.is_none();
|
||||
let mut req = FsRequest::new(cb);
|
||||
req.get_req_data().raw_fd = Some(self.native_handle());
|
||||
let buf_ptr = buf.base as *c_void;
|
||||
let result = unsafe {
|
||||
uvll::fs_read(loop_.native_handle(), req.native_handle(),
|
||||
self.native_handle(), buf_ptr,
|
||||
buf.len as uint, offset, complete_cb_ptr) as int
|
||||
};
|
||||
if is_sync { req.cleanup_and_delete(); }
|
||||
result
|
||||
}
|
||||
pub fn read(&mut self, loop_: &Loop, buf: Buf, offset: i64, cb: FsCallback) {
|
||||
self.read_common(loop_, buf, offset, Some(cb));
|
||||
}
|
||||
pub fn read_sync(&mut self, loop_: &Loop, buf: Buf, offset: i64)
|
||||
-> Result<int, UvError> {
|
||||
let result = self.read_common(loop_, buf, offset, None);
|
||||
sync_cleanup(loop_, result)
|
||||
}
|
||||
|
||||
fn close_common(self, loop_: &Loop, cb: Option<FsCallback>) -> int {
|
||||
let complete_cb_ptr = match cb {
|
||||
Some(_) => compl_cb as *u8,
|
||||
None => 0 as *u8
|
||||
};
|
||||
let is_sync = cb.is_none();
|
||||
let req = FsRequest::new(cb);
|
||||
let result = unsafe {
|
||||
uvll::fs_close(loop_.native_handle(), req.native_handle(),
|
||||
self.native_handle(), complete_cb_ptr) as int
|
||||
};
|
||||
if is_sync { req.cleanup_and_delete(); }
|
||||
result
|
||||
}
|
||||
pub fn close(self, loop_: &Loop, cb: FsCallback) {
|
||||
self.close_common(loop_, Some(cb));
|
||||
}
|
||||
pub fn close_sync(self, loop_: &Loop) -> Result<int, UvError> {
|
||||
let result = self.close_common(loop_, None);
|
||||
sync_cleanup(loop_, result)
|
||||
}
|
||||
}
|
||||
extern fn compl_cb(req: *uv_fs_t) {
|
||||
let mut req: FsRequest = NativeHandle::from_native_handle(req);
|
||||
let loop_ = req.get_loop();
|
||||
// pull the user cb out of the req data
|
||||
let cb = {
|
||||
let data = req.get_req_data();
|
||||
assert!(data.complete_cb.is_some());
|
||||
// option dance, option dance. oooooh yeah.
|
||||
data.complete_cb.take_unwrap()
|
||||
};
|
||||
// in uv_fs_open calls, the result will be the fd in the
|
||||
// case of success, otherwise it's -1 indicating an error
|
||||
let result = req.get_result();
|
||||
let status = status_to_maybe_uv_error_with_loop(
|
||||
loop_.native_handle(), result);
|
||||
// we have a req and status, call the user cb..
|
||||
// only giving the user a ref to the FsRequest, as we
|
||||
// have to clean it up, afterwards (and they aren't really
|
||||
// reusable, anyways
|
||||
cb(&mut req, status);
|
||||
// clean up the req (and its data!) after calling the user cb
|
||||
req.cleanup_and_delete();
|
||||
}
|
||||
|
||||
impl NativeHandle<c_int> for FileDescriptor {
|
||||
fn from_native_handle(handle: c_int) -> FileDescriptor {
|
||||
FileDescriptor(handle)
|
||||
}
|
||||
fn native_handle(&self) -> c_int {
|
||||
match self { &FileDescriptor(ptr) => ptr }
|
||||
}
|
||||
}
|
||||
|
||||
mod test {
|
||||
use super::*;
|
||||
//use rt::test::*;
|
||||
use libc::{STDOUT_FILENO};
|
||||
use vec;
|
||||
use str;
|
||||
use unstable::run_in_bare_thread;
|
||||
use path::Path;
|
||||
use rt::uv::{Loop, Buf, slice_to_uv_buf};
|
||||
use libc::{O_CREAT, O_RDWR, O_RDONLY,
|
||||
S_IWUSR, S_IRUSR}; //NOTE: need defs for S_**GRP|S_**OTH in libc:: ...
|
||||
//S_IRGRP, S_IROTH};
|
||||
|
||||
fn file_test_full_simple_impl() {
|
||||
do run_in_bare_thread {
|
||||
let mut loop_ = Loop::new();
|
||||
let create_flags = O_RDWR | O_CREAT;
|
||||
let read_flags = O_RDONLY;
|
||||
// 0644 BZZT! WRONG! 0600! See below.
|
||||
let mode = S_IWUSR |S_IRUSR;
|
||||
// these aren't defined in std::libc :(
|
||||
//map_mode(S_IRGRP) |
|
||||
//map_mode(S_IROTH);
|
||||
let path_str = "./tmp/file_full_simple.txt";
|
||||
let write_val = "hello".as_bytes().to_owned();
|
||||
let write_buf = slice_to_uv_buf(write_val);
|
||||
let write_buf_ptr: *Buf = &write_buf;
|
||||
let read_buf_len = 1028;
|
||||
let read_mem = vec::from_elem(read_buf_len, 0u8);
|
||||
let read_buf = slice_to_uv_buf(read_mem);
|
||||
let read_buf_ptr: *Buf = &read_buf;
|
||||
let p = Path(path_str);
|
||||
do FsRequest::open(&loop_, &p, create_flags as int, mode as int)
|
||||
|req, uverr| {
|
||||
assert!(uverr.is_none());
|
||||
let mut fd = FileDescriptor::from_open_req(req);
|
||||
let raw_fd = fd.native_handle();
|
||||
let buf = unsafe { *write_buf_ptr };
|
||||
do fd.write(&req.get_loop(), buf, -1) |req, uverr| {
|
||||
let fd = FileDescriptor(raw_fd);
|
||||
do fd.close(&req.get_loop()) |req, _| {
|
||||
let loop_ = req.get_loop();
|
||||
assert!(uverr.is_none());
|
||||
do FsRequest::open(&loop_, &Path(path_str), read_flags as int,0)
|
||||
|req, uverr| {
|
||||
assert!(uverr.is_none());
|
||||
let loop_ = req.get_loop();
|
||||
let mut fd = FileDescriptor::from_open_req(req);
|
||||
let raw_fd = fd.native_handle();
|
||||
let read_buf = unsafe { *read_buf_ptr };
|
||||
do fd.read(&loop_, read_buf, 0) |req, uverr| {
|
||||
assert!(uverr.is_none());
|
||||
let loop_ = req.get_loop();
|
||||
// we know nread >=0 because uverr is none..
|
||||
let nread = req.get_result() as uint;
|
||||
// nread == 0 would be EOF
|
||||
if nread > 0 {
|
||||
let read_str = unsafe {
|
||||
let read_buf = *read_buf_ptr;
|
||||
str::from_bytes(
|
||||
vec::from_buf(
|
||||
read_buf.base, nread))
|
||||
};
|
||||
assert!(read_str == ~"hello");
|
||||
do FileDescriptor(raw_fd).close(&loop_) |req,uverr| {
|
||||
assert!(uverr.is_none());
|
||||
let loop_ = &req.get_loop();
|
||||
do FsRequest::unlink(loop_, &Path(path_str))
|
||||
|_,uverr| {
|
||||
assert!(uverr.is_none());
|
||||
};
|
||||
};
|
||||
}
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
loop_.run();
|
||||
loop_.close();
|
||||
}
|
||||
}
|
||||
fn file_test_full_simple_impl_sync() {
|
||||
do run_in_bare_thread {
|
||||
// setup
|
||||
let mut loop_ = Loop::new();
|
||||
let create_flags = O_RDWR |
|
||||
O_CREAT;
|
||||
let read_flags = O_RDONLY;
|
||||
// 0644
|
||||
let mode = S_IWUSR |
|
||||
S_IRUSR;
|
||||
//S_IRGRP |
|
||||
//S_IROTH;
|
||||
let path_str = "./tmp/file_full_simple_sync.txt";
|
||||
let write_val = "hello".as_bytes().to_owned();
|
||||
let write_buf = slice_to_uv_buf(write_val);
|
||||
// open/create
|
||||
let result = FsRequest::open_sync(&loop_, &Path(path_str),
|
||||
create_flags as int, mode as int);
|
||||
assert!(result.is_ok());
|
||||
let mut fd = FileDescriptor(result.unwrap() as i32);
|
||||
// write
|
||||
let result = fd.write_sync(&loop_, write_buf, -1);
|
||||
assert!(result.is_ok());
|
||||
// close
|
||||
let result = fd.close_sync(&loop_);
|
||||
assert!(result.is_ok());
|
||||
// re-open
|
||||
let result = FsRequest::open_sync(&loop_, &Path(path_str),
|
||||
read_flags as int,0);
|
||||
assert!(result.is_ok());
|
||||
let len = 1028;
|
||||
let mut fd = FileDescriptor(result.unwrap() as i32);
|
||||
// read
|
||||
let read_mem: ~[u8] = vec::from_elem(len, 0u8);
|
||||
let buf = slice_to_uv_buf(read_mem);
|
||||
let result = fd.read_sync(&loop_, buf, 0);
|
||||
assert!(result.is_ok());
|
||||
let nread = result.unwrap();
|
||||
// nread == 0 would be EOF.. we know it's >= zero because otherwise
|
||||
// the above assert would fail
|
||||
if nread > 0 {
|
||||
let read_str = str::from_bytes(
|
||||
read_mem.slice(0, nread as uint));
|
||||
assert!(read_str == ~"hello");
|
||||
// close
|
||||
let result = fd.close_sync(&loop_);
|
||||
assert!(result.is_ok());
|
||||
// unlink
|
||||
let result = FsRequest::unlink_sync(&loop_, &Path(path_str));
|
||||
assert!(result.is_ok());
|
||||
} else { fail!("nread was 0.. wudn't expectin' that."); }
|
||||
loop_.close();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn file_test_full_simple() {
|
||||
file_test_full_simple_impl();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn file_test_full_simple_sync() {
|
||||
file_test_full_simple_impl_sync();
|
||||
}
|
||||
|
||||
fn naive_print(loop_: &Loop, input: &str) {
|
||||
let mut stdout = FileDescriptor(STDOUT_FILENO);
|
||||
let write_val = input.as_bytes();
|
||||
let write_buf = slice_to_uv_buf(write_val);
|
||||
stdout.write_sync(loop_, write_buf, -1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn file_test_write_to_stdout() {
|
||||
do run_in_bare_thread {
|
||||
let mut loop_ = Loop::new();
|
||||
naive_print(&loop_, "zanzibar!\n");
|
||||
loop_.run();
|
||||
loop_.close();
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -53,7 +53,7 @@
|
||||
|
||||
//#[cfg(test)] use unstable::run_in_bare_thread;
|
||||
|
||||
pub use self::file::FsRequest;
|
||||
pub use self::file::{FsRequest};
|
||||
pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher};
|
||||
pub use self::idle::IdleWatcher;
|
||||
pub use self::timer::TimerWatcher;
|
||||
@ -125,7 +125,7 @@ fn native_handle(&self) -> *uvll::uv_loop_t {
|
||||
pub type NullCallback = ~fn();
|
||||
pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
|
||||
pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
|
||||
pub type FsCallback = ~fn(FsRequest, Option<UvError>);
|
||||
pub type FsCallback = ~fn(&mut FsRequest, Option<UvError>);
|
||||
pub type TimerCallback = ~fn(TimerWatcher, Option<UvError>);
|
||||
pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>);
|
||||
pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option<UvError>);
|
||||
@ -281,6 +281,20 @@ pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
|
||||
}
|
||||
}
|
||||
|
||||
/// Given a uv handle, convert a callback status to a UvError
|
||||
pub fn status_to_maybe_uv_error_with_loop(
|
||||
loop_: *uvll::uv_loop_t,
|
||||
status: c_int) -> Option<UvError> {
|
||||
if status != -1 {
|
||||
None
|
||||
} else {
|
||||
unsafe {
|
||||
rtdebug!("loop: %x", loop_ as uint);
|
||||
let err = uvll::last_error(loop_);
|
||||
Some(UvError(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
/// Given a uv handle, convert a callback status to a UvError
|
||||
pub fn status_to_maybe_uv_error<T, U: Watcher + NativeHandle<*T>>(handle: U,
|
||||
status: c_int) -> Option<UvError> {
|
||||
@ -290,9 +304,7 @@ pub fn status_to_maybe_uv_error<T, U: Watcher + NativeHandle<*T>>(handle: U,
|
||||
unsafe {
|
||||
rtdebug!("handle: %x", handle.native_handle() as uint);
|
||||
let loop_ = uvll::get_loop_for_uv_handle(handle.native_handle());
|
||||
rtdebug!("loop: %x", loop_ as uint);
|
||||
let err = uvll::last_error(loop_);
|
||||
Some(UvError(err))
|
||||
status_to_maybe_uv_error_with_loop(loop_, status)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,10 +17,11 @@
|
||||
use ops::Drop;
|
||||
use option::*;
|
||||
use ptr;
|
||||
use str;
|
||||
use result::*;
|
||||
use rt::io::IoError;
|
||||
use rt::io::net::ip::{SocketAddr, IpAddr};
|
||||
use rt::io::{standard_error, OtherIoError};
|
||||
use rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, SeekEnd};
|
||||
use rt::local::Local;
|
||||
use rt::rtio::*;
|
||||
use rt::sched::{Scheduler, SchedHandle};
|
||||
@ -29,6 +30,11 @@
|
||||
use rt::uv::idle::IdleWatcher;
|
||||
use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr};
|
||||
use unstable::sync::Exclusive;
|
||||
use super::super::io::support::PathLike;
|
||||
use libc::{lseek, off_t, O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY,
|
||||
S_IRUSR, S_IWUSR};
|
||||
use rt::io::{FileMode, FileAccess, OpenOrCreate, Open, Create,
|
||||
CreateOrTruncate, Append, Truncate, Read, Write, ReadWrite};
|
||||
|
||||
#[cfg(test)] use container::Container;
|
||||
#[cfg(test)] use unstable::run_in_bare_thread;
|
||||
@ -455,6 +461,87 @@ fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
|
||||
let home = get_handle_to_current_scheduler!();
|
||||
Ok(~UvTimer::new(watcher, home))
|
||||
}
|
||||
|
||||
fn fs_from_raw_fd(&mut self, fd: c_int, close_on_drop: bool) -> ~RtioFileStream {
|
||||
let loop_ = Loop {handle: self.uv_loop().native_handle()};
|
||||
let fd = file::FileDescriptor(fd);
|
||||
let home = get_handle_to_current_scheduler!();
|
||||
~UvFileStream::new(loop_, fd, close_on_drop, home) as ~RtioFileStream
|
||||
}
|
||||
|
||||
fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess)
|
||||
-> Result<~RtioFileStream, IoError> {
|
||||
let mut flags = match fm {
|
||||
Open => 0,
|
||||
Create => O_CREAT,
|
||||
OpenOrCreate => O_CREAT,
|
||||
Append => O_APPEND,
|
||||
Truncate => O_TRUNC,
|
||||
CreateOrTruncate => O_TRUNC | O_CREAT
|
||||
};
|
||||
flags = match fa {
|
||||
Read => flags | O_RDONLY,
|
||||
Write => flags | O_WRONLY,
|
||||
ReadWrite => flags | O_RDWR
|
||||
};
|
||||
let create_mode = match fm {
|
||||
Create|OpenOrCreate|CreateOrTruncate =>
|
||||
S_IRUSR | S_IWUSR,
|
||||
_ => 0
|
||||
};
|
||||
let result_cell = Cell::new_empty();
|
||||
let result_cell_ptr: *Cell<Result<~RtioFileStream,
|
||||
IoError>> = &result_cell;
|
||||
let path_cell = Cell::new(path);
|
||||
let scheduler = Local::take::<Scheduler>();
|
||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||
let task_cell = Cell::new(task);
|
||||
let path = path_cell.take();
|
||||
do file::FsRequest::open(self.uv_loop(), path, flags as int, create_mode as int)
|
||||
|req,err| {
|
||||
if err.is_none() {
|
||||
let loop_ = Loop {handle: req.get_loop().native_handle()};
|
||||
let home = get_handle_to_current_scheduler!();
|
||||
let fd = file::FileDescriptor(req.get_result());
|
||||
let fs = ~UvFileStream::new(
|
||||
loop_, fd, true, home) as ~RtioFileStream;
|
||||
let res = Ok(fs);
|
||||
unsafe { (*result_cell_ptr).put_back(res); }
|
||||
let scheduler = Local::take::<Scheduler>();
|
||||
scheduler.resume_blocked_task_immediately(task_cell.take());
|
||||
} else {
|
||||
let res = Err(uv_error_to_io_error(err.unwrap()));
|
||||
unsafe { (*result_cell_ptr).put_back(res); }
|
||||
let scheduler = Local::take::<Scheduler>();
|
||||
scheduler.resume_blocked_task_immediately(task_cell.take());
|
||||
}
|
||||
};
|
||||
};
|
||||
assert!(!result_cell.is_empty());
|
||||
return result_cell.take();
|
||||
}
|
||||
|
||||
fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> {
|
||||
let result_cell = Cell::new_empty();
|
||||
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
|
||||
let path_cell = Cell::new(path);
|
||||
let scheduler = Local::take::<Scheduler>();
|
||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||
let task_cell = Cell::new(task);
|
||||
let path = path_cell.take();
|
||||
do file::FsRequest::unlink(self.uv_loop(), path) |_, err| {
|
||||
let res = match err {
|
||||
None => Ok(()),
|
||||
Some(err) => Err(uv_error_to_io_error(err))
|
||||
};
|
||||
unsafe { (*result_cell_ptr).put_back(res); }
|
||||
let scheduler = Local::take::<Scheduler>();
|
||||
scheduler.resume_blocked_task_immediately(task_cell.take());
|
||||
};
|
||||
};
|
||||
assert!(!result_cell.is_empty());
|
||||
return result_cell.take();
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UvTcpListener {
|
||||
@ -992,6 +1079,140 @@ fn sleep(&mut self, msecs: u64) {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UvFileStream {
|
||||
loop_: Loop,
|
||||
fd: file::FileDescriptor,
|
||||
close_on_drop: bool,
|
||||
home: SchedHandle
|
||||
}
|
||||
|
||||
impl HomingIO for UvFileStream {
|
||||
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
|
||||
}
|
||||
|
||||
impl UvFileStream {
|
||||
fn new(loop_: Loop, fd: file::FileDescriptor, close_on_drop: bool,
|
||||
home: SchedHandle) -> UvFileStream {
|
||||
UvFileStream {
|
||||
loop_: loop_,
|
||||
fd: fd,
|
||||
close_on_drop: close_on_drop,
|
||||
home: home
|
||||
}
|
||||
}
|
||||
fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result<int, IoError> {
|
||||
let result_cell = Cell::new_empty();
|
||||
let result_cell_ptr: *Cell<Result<int, IoError>> = &result_cell;
|
||||
let buf_ptr: *&mut [u8] = &buf;
|
||||
do self.home_for_io |self_| {
|
||||
let scheduler = Local::take::<Scheduler>();
|
||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
|
||||
let task_cell = Cell::new(task);
|
||||
do self_.fd.read(&self_.loop_, buf, offset) |req, uverr| {
|
||||
let res = match uverr {
|
||||
None => Ok(req.get_result() as int),
|
||||
Some(err) => Err(uv_error_to_io_error(err))
|
||||
};
|
||||
unsafe { (*result_cell_ptr).put_back(res); }
|
||||
let scheduler = Local::take::<Scheduler>();
|
||||
scheduler.resume_blocked_task_immediately(task_cell.take());
|
||||
};
|
||||
};
|
||||
};
|
||||
result_cell.take()
|
||||
}
|
||||
fn base_write(&mut self, buf: &[u8], offset: i64) -> Result<(), IoError> {
|
||||
let result_cell = Cell::new_empty();
|
||||
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
|
||||
let buf_ptr: *&[u8] = &buf;
|
||||
do self.home_for_io |self_| {
|
||||
let scheduler = Local::take::<Scheduler>();
|
||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
|
||||
let task_cell = Cell::new(task);
|
||||
do self_.fd.write(&self_.loop_, buf, offset) |_, uverr| {
|
||||
let res = match uverr {
|
||||
None => Ok(()),
|
||||
Some(err) => Err(uv_error_to_io_error(err))
|
||||
};
|
||||
unsafe { (*result_cell_ptr).put_back(res); }
|
||||
let scheduler = Local::take::<Scheduler>();
|
||||
scheduler.resume_blocked_task_immediately(task_cell.take());
|
||||
};
|
||||
};
|
||||
};
|
||||
result_cell.take()
|
||||
}
|
||||
fn seek_common(&mut self, pos: i64, whence: c_int) ->
|
||||
Result<u64, IoError>{
|
||||
#[fixed_stack_segment]; #[inline(never)];
|
||||
unsafe {
|
||||
match lseek((*self.fd), pos as off_t, whence) {
|
||||
-1 => {
|
||||
Err(IoError {
|
||||
kind: OtherIoError,
|
||||
desc: "Failed to lseek.",
|
||||
detail: None
|
||||
})
|
||||
},
|
||||
n => Ok(n as u64)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for UvFileStream {
|
||||
fn drop(&self) {
|
||||
let self_ = unsafe { transmute::<&UvFileStream, &mut UvFileStream>(self) };
|
||||
if self.close_on_drop {
|
||||
do self_.home_for_io |self_| {
|
||||
let scheduler = Local::take::<Scheduler>();
|
||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||
let task_cell = Cell::new(task);
|
||||
do self_.fd.close(&self.loop_) |_,_| {
|
||||
let scheduler = Local::take::<Scheduler>();
|
||||
scheduler.resume_blocked_task_immediately(task_cell.take());
|
||||
};
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RtioFileStream for UvFileStream {
|
||||
fn read(&mut self, buf: &mut [u8]) -> Result<int, IoError> {
|
||||
self.base_read(buf, -1)
|
||||
}
|
||||
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
|
||||
self.base_write(buf, -1)
|
||||
}
|
||||
fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result<int, IoError> {
|
||||
self.base_read(buf, offset as i64)
|
||||
}
|
||||
fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError> {
|
||||
self.base_write(buf, offset as i64)
|
||||
}
|
||||
fn seek(&mut self, pos: i64, whence: SeekStyle) -> Result<u64, IoError> {
|
||||
use libc::{SEEK_SET, SEEK_CUR, SEEK_END};
|
||||
let whence = match whence {
|
||||
SeekSet => SEEK_SET,
|
||||
SeekCur => SEEK_CUR,
|
||||
SeekEnd => SEEK_END
|
||||
};
|
||||
self.seek_common(pos, whence)
|
||||
}
|
||||
fn tell(&self) -> Result<u64, IoError> {
|
||||
use libc::SEEK_CUR;
|
||||
// this is temporary
|
||||
let self_ = unsafe { cast::transmute::<&UvFileStream, &mut UvFileStream>(self) };
|
||||
self_.seek_common(0, SEEK_CUR)
|
||||
}
|
||||
fn flush(&mut self) -> Result<(), IoError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simple_io_no_connect() {
|
||||
do run_in_newsched_task {
|
||||
@ -1498,3 +1719,60 @@ fn test_timer_sleep_simple() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn file_test_uvio_full_simple_impl() {
|
||||
use str::StrSlice; // why does this have to be explicitly imported to work?
|
||||
// compiler was complaining about no trait for str that
|
||||
// does .as_bytes() ..
|
||||
use path::Path;
|
||||
use rt::io::{Open, Create, ReadWrite, Read};
|
||||
unsafe {
|
||||
let io = Local::unsafe_borrow::<IoFactoryObject>();
|
||||
let write_val = "hello uvio!";
|
||||
let path = "./tmp/file_test_uvio_full.txt";
|
||||
{
|
||||
let create_fm = Create;
|
||||
let create_fa = ReadWrite;
|
||||
let mut fd = (*io).fs_open(&Path(path), create_fm, create_fa).unwrap();
|
||||
let write_buf = write_val.as_bytes();
|
||||
fd.write(write_buf);
|
||||
}
|
||||
{
|
||||
let ro_fm = Open;
|
||||
let ro_fa = Read;
|
||||
let mut fd = (*io).fs_open(&Path(path), ro_fm, ro_fa).unwrap();
|
||||
let mut read_vec = [0, .. 1028];
|
||||
let nread = fd.read(read_vec).unwrap();
|
||||
let read_val = str::from_bytes(read_vec.slice(0, nread as uint));
|
||||
assert!(read_val == write_val.to_owned());
|
||||
}
|
||||
(*io).fs_unlink(&Path(path));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn file_test_uvio_full_simple() {
|
||||
do run_in_newsched_task {
|
||||
file_test_uvio_full_simple_impl();
|
||||
}
|
||||
}
|
||||
|
||||
fn uvio_naive_print(input: &str) {
|
||||
use str::StrSlice;
|
||||
unsafe {
|
||||
use libc::{STDOUT_FILENO};
|
||||
let io = Local::unsafe_borrow::<IoFactoryObject>();
|
||||
{
|
||||
let mut fd = (*io).fs_from_raw_fd(STDOUT_FILENO, false);
|
||||
let write_buf = input.as_bytes();
|
||||
fd.write(write_buf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn file_test_uvio_write_to_stdout() {
|
||||
do run_in_newsched_task {
|
||||
uvio_naive_print("jubilation\n");
|
||||
}
|
||||
}
|
||||
|
@ -617,7 +617,54 @@ pub unsafe fn ip6_port(addr: *sockaddr_in6) -> c_uint {
|
||||
return rust_uv_ip6_port(addr);
|
||||
}
|
||||
|
||||
pub unsafe fn fs_open(loop_ptr: *uv_loop_t, req: *uv_fs_t, path: *c_char, flags: int, mode: int,
|
||||
cb: *u8) -> c_int {
|
||||
#[fixed_stack_segment]; #[inline(never)];
|
||||
|
||||
rust_uv_fs_open(loop_ptr, req, path, flags as c_int, mode as c_int, cb)
|
||||
}
|
||||
|
||||
pub unsafe fn fs_unlink(loop_ptr: *uv_loop_t, req: *uv_fs_t, path: *c_char,
|
||||
cb: *u8) -> c_int {
|
||||
#[fixed_stack_segment]; #[inline(never)];
|
||||
|
||||
rust_uv_fs_unlink(loop_ptr, req, path, cb)
|
||||
}
|
||||
pub unsafe fn fs_write(loop_ptr: *uv_loop_t, req: *uv_fs_t, fd: c_int, buf: *c_void,
|
||||
len: uint, offset: i64, cb: *u8) -> c_int {
|
||||
#[fixed_stack_segment]; #[inline(never)];
|
||||
|
||||
rust_uv_fs_write(loop_ptr, req, fd, buf, len as c_uint, offset, cb)
|
||||
}
|
||||
pub unsafe fn fs_read(loop_ptr: *uv_loop_t, req: *uv_fs_t, fd: c_int, buf: *c_void,
|
||||
len: uint, offset: i64, cb: *u8) -> c_int {
|
||||
#[fixed_stack_segment]; #[inline(never)];
|
||||
|
||||
rust_uv_fs_read(loop_ptr, req, fd, buf, len as c_uint, offset, cb)
|
||||
}
|
||||
pub unsafe fn fs_close(loop_ptr: *uv_loop_t, req: *uv_fs_t, fd: c_int,
|
||||
cb: *u8) -> c_int {
|
||||
#[fixed_stack_segment]; #[inline(never)];
|
||||
|
||||
rust_uv_fs_close(loop_ptr, req, fd, cb)
|
||||
}
|
||||
pub unsafe fn fs_req_cleanup(req: *uv_fs_t) {
|
||||
#[fixed_stack_segment]; #[inline(never)];
|
||||
|
||||
rust_uv_fs_req_cleanup(req);
|
||||
}
|
||||
|
||||
// data access helpers
|
||||
pub unsafe fn get_result_from_fs_req(req: *uv_fs_t) -> c_int {
|
||||
#[fixed_stack_segment]; #[inline(never)];
|
||||
|
||||
rust_uv_get_result_from_fs_req(req)
|
||||
}
|
||||
pub unsafe fn get_loop_from_fs_req(req: *uv_fs_t) -> *uv_loop_t {
|
||||
#[fixed_stack_segment]; #[inline(never)];
|
||||
|
||||
rust_uv_get_loop_from_fs_req(req)
|
||||
}
|
||||
pub unsafe fn get_loop_for_uv_handle<T>(handle: *T) -> *c_void {
|
||||
#[fixed_stack_segment]; #[inline(never)];
|
||||
|
||||
@ -784,6 +831,19 @@ fn rust_uv_read_start(stream: *c_void,
|
||||
fn rust_uv_timer_start(timer_handle: *uv_timer_t, cb: uv_timer_cb, timeout: libc::uint64_t,
|
||||
repeat: libc::uint64_t) -> c_int;
|
||||
fn rust_uv_timer_stop(handle: *uv_timer_t) -> c_int;
|
||||
fn rust_uv_fs_open(loop_ptr: *c_void, req: *uv_fs_t, path: *c_char,
|
||||
flags: c_int, mode: c_int, cb: *u8) -> c_int;
|
||||
fn rust_uv_fs_unlink(loop_ptr: *c_void, req: *uv_fs_t, path: *c_char,
|
||||
cb: *u8) -> c_int;
|
||||
fn rust_uv_fs_write(loop_ptr: *c_void, req: *uv_fs_t, fd: c_int,
|
||||
buf: *c_void, len: c_uint, offset: i64, cb: *u8) -> c_int;
|
||||
fn rust_uv_fs_read(loop_ptr: *c_void, req: *uv_fs_t, fd: c_int,
|
||||
buf: *c_void, len: c_uint, offset: i64, cb: *u8) -> c_int;
|
||||
fn rust_uv_fs_close(loop_ptr: *c_void, req: *uv_fs_t, fd: c_int,
|
||||
cb: *u8) -> c_int;
|
||||
fn rust_uv_fs_req_cleanup(req: *uv_fs_t);
|
||||
fn rust_uv_get_result_from_fs_req(req: *uv_fs_t) -> c_int;
|
||||
fn rust_uv_get_loop_from_fs_req(req: *uv_fs_t) -> *uv_loop_t;
|
||||
|
||||
fn rust_uv_get_stream_handle_from_connect_req(connect_req: *uv_connect_t) -> *uv_stream_t;
|
||||
fn rust_uv_get_stream_handle_from_write_req(write_req: *uv_write_t) -> *uv_stream_t;
|
||||
|
@ -517,3 +517,39 @@ extern "C" uintptr_t
|
||||
rust_uv_req_type_max() {
|
||||
return UV_REQ_TYPE_MAX;
|
||||
}
|
||||
|
||||
extern "C" int
|
||||
rust_uv_fs_open(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags,
|
||||
int mode, uv_fs_cb cb) {
|
||||
return uv_fs_open(loop, req, path, flags, mode, cb);
|
||||
}
|
||||
extern "C" int
|
||||
rust_uv_fs_unlink(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb) {
|
||||
return uv_fs_unlink(loop, req, path, cb);
|
||||
}
|
||||
extern "C" int
|
||||
rust_uv_fs_write(uv_loop_t* loop, uv_fs_t* req, uv_file fd, void* buf,
|
||||
size_t len, int64_t offset, uv_fs_cb cb) {
|
||||
return uv_fs_write(loop, req, fd, buf, len, offset, cb);
|
||||
}
|
||||
extern "C" int
|
||||
rust_uv_fs_read(uv_loop_t* loop, uv_fs_t* req, uv_file fd, void* buf,
|
||||
size_t len, int64_t offset, uv_fs_cb cb) {
|
||||
return uv_fs_read(loop, req, fd, buf, len, offset, cb);
|
||||
}
|
||||
extern "C" int
|
||||
rust_uv_fs_close(uv_loop_t* loop, uv_fs_t* req, uv_file fd, uv_fs_cb cb) {
|
||||
return uv_fs_close(loop, req, fd, cb);
|
||||
}
|
||||
extern "C" void
|
||||
rust_uv_fs_req_cleanup(uv_fs_t* req) {
|
||||
uv_fs_req_cleanup(req);
|
||||
}
|
||||
extern "C" int
|
||||
rust_uv_get_result_from_fs_req(uv_fs_t* req) {
|
||||
return req->result;
|
||||
}
|
||||
extern "C" uv_loop_t*
|
||||
rust_uv_get_loop_from_fs_req(uv_fs_t* req) {
|
||||
return req->loop;
|
||||
}
|
||||
|
@ -108,6 +108,14 @@ rust_uv_idle_delete
|
||||
rust_uv_idle_init
|
||||
rust_uv_idle_start
|
||||
rust_uv_idle_stop
|
||||
rust_uv_fs_open
|
||||
rust_uv_fs_unlink
|
||||
rust_uv_fs_write
|
||||
rust_uv_fs_read
|
||||
rust_uv_fs_close
|
||||
rust_uv_get_result_from_fs_req
|
||||
rust_uv_get_loop_from_fs_req
|
||||
rust_uv_fs_req_cleanup
|
||||
rust_dbg_lock_create
|
||||
rust_dbg_lock_destroy
|
||||
rust_dbg_lock_lock
|
||||
|
Loading…
Reference in New Issue
Block a user