Implement timers.

This commit is contained in:
Donovan Preston 2012-01-27 14:04:13 -08:00
parent e48bf6f3f4
commit 3d76922f97
3 changed files with 83 additions and 9 deletions

View File

@ -27,6 +27,11 @@ native mod rustrt {
thread: thread,
req_id: u32,
chan: comm::chan<iomsg>);
fn rust_uvtmp_timer(
thread: thread,
timeout: u32,
req_id: u32,
chan: comm::chan<iomsg>);
fn rust_uvtmp_delete_buf(buf: *u8);
fn rust_uvtmp_get_req_id(cd: connect_data) -> u32;
}
@ -39,7 +44,9 @@ enum iomsg {
whatever,
connected(connect_data),
wrote(connect_data),
read(connect_data, *u8, ctypes::ssize_t)
read(connect_data, *u8, ctypes::ssize_t),
timer(u32),
exit
}
fn create_thread() -> thread {
@ -58,8 +65,7 @@ fn delete_thread(thread: thread) {
rustrt::rust_uvtmp_delete_thread(thread)
}
fn connect(thread: thread, req_id: u32,
ip: str, ch: comm::chan<iomsg>) -> connect_data {
fn connect(thread: thread, req_id: u32, ip: str, ch: comm::chan<iomsg>) -> connect_data {
str::as_buf(ip) {|ipbuf|
rustrt::rust_uvtmp_connect(thread, req_id, ipbuf, ch)
}
@ -80,6 +86,11 @@ fn read_start(thread: thread, req_id: u32,
rustrt::rust_uvtmp_read_start(thread, req_id, chan);
}
fn timer_start(thread: thread, timeout: u32, req_id: u32,
chan: comm::chan<iomsg>) {
rustrt::rust_uvtmp_timer(thread, timeout, req_id, chan);
}
fn delete_buf(buf: *u8) {
rustrt::rust_uvtmp_delete_buf(buf);
}
@ -106,7 +117,7 @@ fn test_connect() {
connect(thread, 0u32, "74.125.224.146", chan);
alt comm::recv(port) {
connected(cd) {
close_connection(thread, 0u32);
close_connection(thread, cd);
}
}
join_thread(thread);
@ -123,10 +134,10 @@ fn test_http() {
connect(thread, 0u32, "74.125.224.146", chan);
alt comm::recv(port) {
connected(cd) {
write(thread, 0u32, str::bytes("GET / HTTP/1.0\n\n"), chan);
write(thread, cd, str::bytes("GET / HTTP/1.0\n\n"), chan);
alt comm::recv(port) {
wrote(cd) {
read_start(thread, 0u32, chan);
read_start(thread, cd, chan);
let keep_going = true;
while keep_going {
alt comm::recv(port) {
@ -146,7 +157,7 @@ fn test_http() {
}
}
}
close_connection(thread, 0u32);
close_connection(thread, cd);
}
}
}

View File

@ -15,9 +15,12 @@ struct connect_data {
chan_handle chan;
};
const intptr_t whatever_tag = 0;
const intptr_t connected_tag = 1;
const intptr_t wrote_tag = 2;
const intptr_t read_tag = 3;
const intptr_t timer_tag = 4;
const intptr_t exit_tag = 5;
struct iomsg {
intptr_t tag;
@ -29,6 +32,7 @@ struct iomsg {
uint8_t *buf;
ssize_t nread;
} read_val;
uint32_t timer_req_id;
} val;
};
@ -44,6 +48,13 @@ struct read_start_data {
chan_handle chan;
};
struct timer_start_data {
rust_uvtmp_thread *thread;
uint32_t timeout;
uint32_t req_id;
chan_handle chan;
};
// FIXME: Copied from rust_builtins.cpp. Could bitrot easily
static void
send(rust_task *task, chan_handle chan, void *data) {
@ -72,7 +83,7 @@ private:
std::queue<connect_data*> close_connection_queue;
std::queue<write_data*> write_queue;
std::queue<read_start_data*> read_start_queue;
std::queue<timer_start_data*> timer_start_queue;
public:
rust_uvtmp_thread() {
@ -139,6 +150,17 @@ public:
read_start_queue.push(rd);
}
void
timer(uint32_t timeout, uint32_t req_id, chan_handle chan) {
scoped_lock with(lock);
timer_start_data *td = new timer_start_data();
td->timeout = timeout;
td->req_id = req_id;
td->chan = chan;
timer_start_queue.push(td);
}
private:
virtual void
@ -159,6 +181,7 @@ private:
close_connections();
write_buffers();
start_reads();
start_timers();
close_idle_if_stop();
}
@ -246,7 +269,7 @@ private:
void
on_write(uv_write_t *handle, write_data *wd) {
iomsg msg;
msg.tag = wrote_tag;
msg.tag = timer_tag;
msg.val.wrote_val = wd->cd;
send(task, wd->chan, &msg);
@ -299,6 +322,40 @@ private:
}
}
void
start_timers() {
assert (lock.lock_held_by_current_thread());
while (!timer_start_queue.empty()) {
timer_start_data *td = timer_start_queue.front();
timer_start_queue.pop();
td->thread = this;
uv_timer_t *timer = (uv_timer_t *)malloc(sizeof(uv_timer_t));
timer->data = td;
int result = uv_timer_init(loop, timer);
result = uv_timer_start(timer, timer_cb, td->timeout, 0);
}
}
static void
timer_cb(uv_timer_t *handle, int what) {
timer_start_data *td = (timer_start_data*)handle->data;
rust_uvtmp_thread *self = td->thread;
self->on_timer(td);
free(handle);
}
void
on_timer(timer_start_data *rd) {
iomsg msg;
msg.tag = timer_tag;
msg.val.timer_req_id = rd->req_id;
send(task, rd->chan, &msg);
delete rd;
}
void
close_idle_if_stop() {
assert(lock.lock_held_by_current_thread());
@ -353,6 +410,11 @@ rust_uvtmp_read_start(rust_uvtmp_thread *thread, uint32_t req_id,
thread->read_start(req_id, *chan);
}
extern "C" void
rust_uvtmp_timer(rust_uvtmp_thread *thread, uint32_t timeout, uint32_t req_id, chan_handle *chan) {
thread->timer(timeout, req_id, *chan);
}
extern "C" void
rust_uvtmp_delete_buf(uint8_t *buf) {
delete [] buf;

View File

@ -96,6 +96,7 @@ rust_uvtmp_connect
rust_uvtmp_close_connection
rust_uvtmp_write
rust_uvtmp_read_start
rust_uvtmp_timer
rust_uvtmp_delete_buf
rust_uvtmp_get_req_id