rust/src/rt/rust_port_selector.cpp

94 lines
2.6 KiB
C++
Raw Normal View History

2012-02-15 00:23:16 -06:00
#include "rust_port.h"
#include "rust_port_selector.h"
#include "rust_task.h"
2012-02-15 00:23:16 -06:00
rust_port_selector::rust_port_selector()
: ports(NULL), n_ports(0) {
}
void
rust_port_selector::select(rust_task *task, rust_port **dptr,
2012-02-15 21:29:18 -06:00
rust_port **ports,
size_t n_ports, uintptr_t *yield) {
2012-02-15 00:23:16 -06:00
assert(this->ports == NULL);
assert(this->n_ports == 0);
assert(dptr != NULL);
assert(ports != NULL);
assert(n_ports != 0);
assert(yield != NULL);
2012-02-15 00:23:16 -06:00
*yield = false;
size_t locks_taken = 0;
bool found_msg = false;
// Take each port's lock as we iterate through them because
// if none of them contain a usable message then we need to
// block the task before any of them can try to send another
// message.
2012-02-15 00:57:27 -06:00
// Start looking for ports from a different index each time.
size_t j = isaac_rand(&task->sched_loop->rctx);
2012-02-15 00:23:16 -06:00
for (size_t i = 0; i < n_ports; i++) {
2012-02-15 21:29:18 -06:00
size_t k = (i + j) % n_ports;
rust_port *port = ports[k];
assert(port != NULL);
2012-02-15 00:23:16 -06:00
2012-02-15 21:29:18 -06:00
port->lock.lock();
locks_taken++;
2012-02-15 00:23:16 -06:00
2012-02-15 21:29:18 -06:00
if (port->buffer.size() > 0) {
*dptr = port;
found_msg = true;
break;
}
2012-02-15 00:23:16 -06:00
}
if (!found_msg) {
2012-02-15 21:29:18 -06:00
this->ports = ports;
this->n_ports = n_ports;
assert(task->rendezvous_ptr == NULL);
2012-02-15 21:29:18 -06:00
task->rendezvous_ptr = (uintptr_t*)dptr;
task->block(this, "waiting for select rendezvous");
2012-04-01 01:12:06 -05:00
// Blocking the task might fail if the task has already been
// killed, but in the event of both failure and success the
// task needs to yield. On success, it yields and waits to be
// unblocked. On failure it yields and is then fails the task.
*yield = true;
2012-02-15 00:23:16 -06:00
}
for (size_t i = 0; i < locks_taken; i++) {
2012-02-15 21:29:18 -06:00
size_t k = (i + j) % n_ports;
rust_port *port = ports[k];
port->lock.unlock();
2012-02-15 00:23:16 -06:00
}
}
void
rust_port_selector::msg_sent_on(rust_port *port) {
rust_task *task = port->task;
port->lock.must_not_have_lock();
2012-02-15 00:23:16 -06:00
// Prevent two ports from trying to wake up the task
// simultaneously
scoped_lock with(task->lifecycle_lock);
2012-02-15 00:23:16 -06:00
if (task->blocked_on(this)) {
2012-02-15 21:29:18 -06:00
for (size_t i = 0; i < n_ports; i++) {
if (port == ports[i]) {
// This was one of the ports we were waiting on
ports = NULL;
n_ports = 0;
*task->rendezvous_ptr = (uintptr_t) port;
task->rendezvous_ptr = NULL;
task->wakeup_inner(this);
2012-02-15 21:29:18 -06:00
return;
}
}
2012-02-15 00:23:16 -06:00
}
}