Split out starting a listener from accepting incoming connections.

The Listener trait takes two type parameters, the type of connection and the type of Acceptor,
and specifies only one method, listen, which consumes the listener and produces an Acceptor.

The Acceptor trait takes one type parameter, the type of connection, and defines two methods.
The accept() method waits for an incoming connection attempt and returns the result.
The incoming() method creates an iterator over incoming connections and is a default method.

Example:

let listener = TcpListener.bind(addr); // Bind to a socket
let acceptor = listener.listen(); // Start the listener
for stream in acceptor.incoming() {
    // Process incoming connections forever (or until you break out of the loop)
}
This commit is contained in:
Eric Reed 2013-08-27 10:01:17 -07:00
parent efb8924f88
commit 58b2ff9f56
6 changed files with 182 additions and 113 deletions

View File

@ -474,17 +474,43 @@ pub trait Seek {
fn seek(&mut self, pos: i64, style: SeekStyle);
}
/// A listener is a value that listens for connections
pub trait Listener<S> {
/// Wait for and accept an incoming connection
///
/// Returns `None` on timeout.
/// A listener is a value that can consume itself to start listening for connections.
/// Doing so produces some sort of Acceptor.
pub trait Listener<T, A: Acceptor<T>> {
/// Spin up the listener and start queueing incoming connections
///
/// # Failure
///
/// Raises `io_error` condition. If the condition is handled,
/// then `listen` returns `None`.
fn listen(self) -> Option<A>;
}
/// An acceptor is a value that presents incoming connections
pub trait Acceptor<T> {
/// Wait for and accept an incoming connection
///
/// # Failure
/// Raise `io_error` condition. If the condition is handled,
/// then `accept` returns `None`.
fn accept(&mut self) -> Option<S>;
fn accept(&mut self) -> Option<T>;
/// Create an iterator over incoming connections
fn incoming<'r>(&'r mut self) -> IncomingIterator<'r, Self> {
IncomingIterator { inc: self }
}
}
/// An infinite iterator over incoming connection attempts.
/// Calling `next` will block the task until a connection is attempted.
struct IncomingIterator<'self, A> {
priv inc: &'self mut A,
}
impl<'self, T, A: Acceptor<T>> Iterator<T> for IncomingIterator<'self, A> {
fn next(&mut self) -> Option<T> {
self.inc.accept()
}
}
/// Common trait for decorator types.

View File

@ -11,12 +11,13 @@
use option::{Option, Some, None};
use result::{Ok, Err};
use rt::io::net::ip::SocketAddr;
use rt::io::{Reader, Writer, Listener};
use rt::io::{Reader, Writer, Listener, Acceptor};
use rt::io::{io_error, read_error, EndOfFile};
use rt::rtio::{IoFactory, IoFactoryObject,
RtioSocket, RtioTcpListener,
RtioTcpListenerObject, RtioTcpStream,
RtioTcpStreamObject};
RtioSocket,
RtioTcpListener, RtioTcpListenerObject,
RtioTcpAcceptor, RtioTcpAcceptorObject,
RtioTcpStream, RtioTcpStreamObject};
use rt::local::Local;
pub struct TcpStream(~RtioTcpStreamObject);
@ -124,13 +125,27 @@ impl TcpListener {
}
}
impl Listener<TcpStream> for TcpListener {
impl Listener<TcpStream, TcpAcceptor> for TcpListener {
fn listen(self) -> Option<TcpAcceptor> {
match (**self).listen() {
Ok(acceptor) => Some(TcpAcceptor(acceptor)),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
}
}
}
pub struct TcpAcceptor(~RtioTcpAcceptorObject);
impl Acceptor<TcpStream> for TcpAcceptor {
fn accept(&mut self) -> Option<TcpStream> {
match (**self).accept() {
Ok(s) => Some(TcpStream::new(s)),
Err(ioerr) => {
io_error::cond.raise(ioerr);
return None;
None
}
}
}
@ -184,8 +199,8 @@ mod test {
let addr = next_test_ip4();
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
let mut stream = acceptor.accept();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == 99);
@ -204,8 +219,8 @@ mod test {
let addr = next_test_ip6();
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
let mut stream = acceptor.accept();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == 99);
@ -224,8 +239,8 @@ mod test {
let addr = next_test_ip4();
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
let mut stream = acceptor.accept();
let mut buf = [0];
let nread = stream.read(buf);
assert!(nread.is_none());
@ -244,8 +259,8 @@ mod test {
let addr = next_test_ip6();
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
let mut stream = acceptor.accept();
let mut buf = [0];
let nread = stream.read(buf);
assert!(nread.is_none());
@ -265,8 +280,8 @@ mod test {
let addr = next_test_ip4();
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
let mut stream = acceptor.accept();
let mut buf = [0];
let nread = stream.read(buf);
assert!(nread.is_none());
@ -288,8 +303,8 @@ mod test {
let addr = next_test_ip6();
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
let mut stream = acceptor.accept();
let mut buf = [0];
let nread = stream.read(buf);
assert!(nread.is_none());
@ -311,8 +326,8 @@ mod test {
let addr = next_test_ip4();
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
let mut stream = acceptor.accept();
let buf = [0];
loop {
let mut stop = false;
@ -341,8 +356,8 @@ mod test {
let addr = next_test_ip6();
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
let mut stream = acceptor.accept();
let buf = [0];
loop {
let mut stop = false;
@ -371,9 +386,8 @@ mod test {
let max = 10;
do spawntask {
let mut listener = TcpListener::bind(addr);
do max.times {
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
for ref mut stream in acceptor.incoming().take(max) {
let mut buf = [0];
stream.read(buf);
assert_eq!(buf[0], 99);
@ -396,9 +410,8 @@ mod test {
let max = 10;
do spawntask {
let mut listener = TcpListener::bind(addr);
do max.times {
let mut stream = listener.accept();
let mut acceptor = TcpListener::bind(addr).listen();
for ref mut stream in acceptor.incoming().take(max) {
let mut buf = [0];
stream.read(buf);
assert_eq!(buf[0], 99);
@ -421,10 +434,9 @@ mod test {
static MAX: int = 10;
do spawntask {
let mut listener = TcpListener::bind(addr);
for i in range(0, MAX) {
let stream = Cell::new(listener.accept());
rtdebug!("accepted");
let mut acceptor = TcpListener::bind(addr).listen();
for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
let stream = Cell::new(stream);
// Start another task to handle the connection
do spawntask {
let mut stream = stream.take();
@ -460,10 +472,9 @@ mod test {
static MAX: int = 10;
do spawntask {
let mut listener = TcpListener::bind(addr);
for i in range(0, MAX) {
let stream = Cell::new(listener.accept());
rtdebug!("accepted");
let mut acceptor = TcpListener::bind(addr).listen();
for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
let stream = Cell::new(stream);
// Start another task to handle the connection
do spawntask {
let mut stream = stream.take();
@ -499,10 +510,9 @@ mod test {
static MAX: int = 10;
do spawntask {
let mut listener = TcpListener::bind(addr);
for _ in range(0, MAX) {
let stream = Cell::new(listener.accept());
rtdebug!("accepted");
let mut acceptor = TcpListener::bind(addr).listen();
for stream in acceptor.incoming().take(MAX as uint) {
let stream = Cell::new(stream);
// Start another task to handle the connection
do spawntask_later {
let mut stream = stream.take();
@ -537,10 +547,9 @@ mod test {
static MAX: int = 10;
do spawntask {
let mut listener = TcpListener::bind(addr);
for _ in range(0, MAX) {
let stream = Cell::new(listener.accept());
rtdebug!("accepted");
let mut acceptor = TcpListener::bind(addr).listen();
for stream in acceptor.incoming().take(MAX as uint) {
let stream = Cell::new(stream);
// Start another task to handle the connection
do spawntask_later {
let mut stream = stream.take();
@ -573,10 +582,7 @@ mod test {
fn socket_name(addr: SocketAddr) {
do run_in_newsched_task {
do spawntask {
let listener = TcpListener::bind(addr);
assert!(listener.is_some());
let mut listener = listener.unwrap();
let mut listener = TcpListener::bind(addr).unwrap();
// Make sure socket_name gives
// us the socket we binded to.
@ -592,9 +598,9 @@ mod test {
fn peer_name(addr: SocketAddr) {
do run_in_newsched_task {
do spawntask {
let mut listener = TcpListener::bind(addr);
let mut acceptor = TcpListener::bind(addr).listen();
listener.accept();
acceptor.accept();
}
do spawntask {

View File

@ -40,6 +40,12 @@ impl UnixListener {
}
}
impl Listener<UnixStream> for UnixListener {
impl Listener<UnixStream, UnixAcceptor> for UnixListener {
fn listen(self) -> Option<UnixAcceptor> { fail!() }
}
pub struct UnixAcceptor;
impl Acceptor<UnixStream> for UnixAcceptor {
fn accept(&mut self) -> Option<UnixStream> { fail!() }
}

View File

@ -17,7 +17,7 @@
//! # XXX Seek and Close
use option::*;
use super::{Reader, Writer, Listener};
use super::{Reader, Writer, Listener, Acceptor};
use super::{standard_error, PreviousIoError, io_error, read_error, IoError};
fn prev_io_error() -> IoError {
@ -62,10 +62,22 @@ impl<R: Reader> Reader for Option<R> {
}
}
impl<L: Listener<S>, S> Listener<S> for Option<L> {
fn accept(&mut self) -> Option<S> {
impl<T, A: Acceptor<T>, L: Listener<T, A>> Listener<T, A> for Option<L> {
fn listen(self) -> Option<A> {
match self {
Some(listener) => listener.listen(),
None => {
io_error::cond.raise(prev_io_error());
None
}
}
}
}
impl<T, A: Acceptor<T>> Acceptor<T> for Option<A> {
fn accept(&mut self) -> Option<T> {
match *self {
Some(ref mut listener) => listener.accept(),
Some(ref mut acceptor) => acceptor.accept(),
None => {
io_error::cond.raise(prev_io_error());
None

View File

@ -26,6 +26,7 @@ pub type EventLoopObject = uvio::UvEventLoop;
pub type RemoteCallbackObject = uvio::UvRemoteCallback;
pub type IoFactoryObject = uvio::UvIoFactory;
pub type RtioTcpStreamObject = uvio::UvTcpStream;
pub type RtioTcpAcceptorObject = uvio::UvTcpAcceptor;
pub type RtioTcpListenerObject = uvio::UvTcpListener;
pub type RtioUdpSocketObject = uvio::UvUdpSocket;
pub type RtioTimerObject = uvio::UvTimer;
@ -75,6 +76,10 @@ pub trait IoFactory {
}
pub trait RtioTcpListener : RtioSocket {
fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError>;
}
pub trait RtioTcpAcceptor : RtioSocket {
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>;
fn accept_simultaneously(&mut self) -> Result<(), IoError>;
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>;

View File

@ -599,9 +599,7 @@ impl IoFactory for UvIoFactory {
}
pub struct UvTcpListener {
watcher: TcpWatcher,
listening: bool,
incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>,
watcher : TcpWatcher,
home: SchedHandle,
}
@ -611,15 +609,8 @@ impl HomingIO for UvTcpListener {
impl UvTcpListener {
fn new(watcher: TcpWatcher, home: SchedHandle) -> UvTcpListener {
UvTcpListener {
watcher: watcher,
listening: false,
incoming_streams: Tube::new(),
home: home,
}
UvTcpListener { watcher: watcher, home: home }
}
fn watcher(&self) -> TcpWatcher { self.watcher }
}
impl Drop for UvTcpListener {
@ -628,10 +619,10 @@ impl Drop for UvTcpListener {
let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) };
do self_.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.watcher().as_stream().close {
let task = Cell::new(task);
do self_.watcher.as_stream().close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
scheduler.resume_blocked_task_immediately(task.take());
}
}
}
@ -641,50 +632,71 @@ impl Drop for UvTcpListener {
impl RtioSocket for UvTcpListener {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
do self.home_for_io |self_| {
socket_name(Tcp, self_.watcher)
socket_name(Tcp, self_.watcher)
}
}
}
impl RtioTcpListener for UvTcpListener {
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
do self.home_for_io |self_| {
if !self_.listening {
self_.listening = true;
let incoming_streams_cell = Cell::new(self_.incoming_streams.clone());
do self_.watcher().listen |mut server, status| {
let stream = match status {
fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError> {
do self.home_for_io_consume |self_| {
let mut acceptor = ~UvTcpAcceptor::new(self_);
let incoming = Cell::new(acceptor.incoming.clone());
do acceptor.listener.watcher.listen |mut server, status| {
do incoming.with_mut_ref |incoming| {
let inc = match status {
Some(_) => Err(standard_error(OtherIoError)),
None => {
let client = TcpWatcher::new(&server.event_loop());
// XXX: needs to be surfaced in interface
server.accept(client.as_stream());
let inc = TcpWatcher::new(&server.event_loop());
// first accept call in the callback guarenteed to succeed
server.accept(inc.as_stream());
let home = get_handle_to_current_scheduler!();
Ok(~UvTcpStream { watcher: client, home: home })
Ok(~UvTcpStream { watcher: inc, home: home })
}
};
let mut incoming_streams = incoming_streams_cell.take();
incoming_streams.send(stream);
incoming_streams_cell.put_back(incoming_streams);
incoming.send(inc);
}
}
self_.incoming_streams.recv()
};
Ok(acceptor)
}
}
}
pub struct UvTcpAcceptor {
listener: UvTcpListener,
incoming: Tube<Result<~RtioTcpStreamObject, IoError>>,
}
impl HomingIO for UvTcpAcceptor {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
}
impl UvTcpAcceptor {
fn new(listener: UvTcpListener) -> UvTcpAcceptor {
UvTcpAcceptor { listener: listener, incoming: Tube::new() }
}
}
impl RtioSocket for UvTcpAcceptor {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
do self.home_for_io |self_| {
socket_name(Tcp, self_.listener.watcher)
}
}
}
impl RtioTcpAcceptor for UvTcpAcceptor {
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
self.incoming.recv()
}
fn accept_simultaneously(&mut self) -> Result<(), IoError> {
do self.home_for_io |self_| {
let r = unsafe {
uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int)
uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 1 as c_int)
};
match status_to_maybe_uv_error(self_.watcher(), r) {
match status_to_maybe_uv_error(self_.listener.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
@ -694,10 +706,10 @@ impl RtioTcpListener for UvTcpListener {
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
do self.home_for_io |self_| {
let r = unsafe {
uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int)
uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 0 as c_int)
};
match status_to_maybe_uv_error(self_.watcher(), r) {
match status_to_maybe_uv_error(self_.listener.watcher, r) {
Some(err) => Err(uv_error_to_io_error(err)),
None => Ok(())
}
@ -1440,8 +1452,9 @@ fn test_simple_tcp_server_and_client() {
do spawntask {
unsafe {
let io: *mut IoFactoryObject = Local::unsafe_borrow();
let mut listener = (*io).tcp_bind(addr).unwrap();
let mut stream = listener.accept().unwrap();
let listener = (*io).tcp_bind(addr).unwrap();
let mut acceptor = listener.listen().unwrap();
let mut stream = acceptor.accept().unwrap();
let mut buf = [0, .. 2048];
let nread = stream.read(buf).unwrap();
assert_eq!(nread, 8);
@ -1498,11 +1511,10 @@ fn test_simple_tcp_server_and_client_on_diff_threads() {
};
let server_fn: ~fn() = || {
let io: *mut IoFactoryObject = unsafe {
Local::unsafe_borrow()
};
let mut listener = unsafe { (*io).tcp_bind(server_addr).unwrap() };
let mut stream = listener.accept().unwrap();
let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() };
let listener = unsafe { (*io).tcp_bind(server_addr).unwrap() };
let mut acceptor = listener.listen().unwrap();
let mut stream = acceptor.accept().unwrap();
let mut buf = [0, .. 2048];
let nread = stream.read(buf).unwrap();
assert_eq!(nread, 8);
@ -1583,8 +1595,9 @@ fn test_read_and_block() {
do spawntask {
let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() };
let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
let mut stream = listener.accept().unwrap();
let listener = unsafe { (*io).tcp_bind(addr).unwrap() };
let mut acceptor = listener.listen().unwrap();
let mut stream = acceptor.accept().unwrap();
let mut buf = [0, .. 2048];
let expected = 32;
@ -1639,8 +1652,9 @@ fn test_read_read_read() {
do spawntask {
unsafe {
let io: *mut IoFactoryObject = Local::unsafe_borrow();
let mut listener = (*io).tcp_bind(addr).unwrap();
let mut stream = listener.accept().unwrap();
let listener = (*io).tcp_bind(addr).unwrap();
let mut acceptor = listener.listen().unwrap();
let mut stream = acceptor.accept().unwrap();
let buf = [1, .. 2048];
let mut total_bytes_written = 0;
while total_bytes_written < MAX {