Move ports out into their own file, add data_message and make communication system use it (and proxies) instead of existing token scheme.

This commit is contained in:
Michael Bebenita 2010-07-28 16:46:13 -07:00 committed by Graydon Hoare
parent 4ff8e15128
commit 4246d567b7
14 changed files with 257 additions and 179 deletions

View File

@ -257,6 +257,7 @@ RUNTIME_CS := rt/sync/spin_lock.cpp \
rt/rust_dom.cpp \
rt/rust_task.cpp \
rt/rust_chan.cpp \
rt/rust_port.cpp \
rt/rust_upcall.cpp \
rt/rust_log.cpp \
rt/rust_message.cpp \
@ -270,6 +271,7 @@ RUNTIME_HDR := rt/globals.h \
rt/rust_internal.h \
rt/rust_util.h \
rt/rust_chan.h \
rt/rust_port.h \
rt/rust_dom.h \
rt/rust_task.h \
rt/rust_proxy.h \

View File

@ -1,12 +1,14 @@
#include "rust_internal.h"
#include "rust_chan.h"
rust_chan::rust_chan(rust_task *task, rust_port *port) :
task(task), port(port), buffer(task->dom, port->unit_sz), token(this) {
/**
* Create a new rust channel and associate it with the specified port.
*/
rust_chan::rust_chan(rust_task *task, maybe_proxy<rust_port> *port) :
task(task), port(port), buffer(task->dom, port->delegate()->unit_sz) {
if (port) {
port->chans.push(this);
ref();
associate(port);
}
task->log(rust_log::MEM | rust_log::COMM,
@ -16,49 +18,68 @@ rust_chan::rust_chan(rust_task *task, rust_port *port) :
}
rust_chan::~rust_chan() {
if (port) {
if (token.pending())
token.withdraw();
port->chans.swap_delete(this);
if (port && !port->is_proxy()) {
port->delegate()->chans.swap_delete(this);
}
}
/**
* Link this channel with the specified port.
*/
void rust_chan::associate(maybe_proxy<rust_port> *port) {
this->port = port;
if (!port->is_proxy()) {
this->port->delegate()->chans.push(this);
}
}
bool rust_chan::is_associated() {
return port != NULL;
}
/**
* Unlink this channel from its associated port.
*/
void rust_chan::disassociate() {
I(task->dom, port);
A(task->dom, is_associated(), "Channel must be associated with a port.");
if (token.pending())
token.withdraw();
// Delete reference to the port/
// Delete reference to the port.
port = NULL;
deref();
}
/**
* Attempt to transmit channel data to the associated port.
*/
int rust_chan::transmit() {
void rust_chan::transmit() {
rust_dom *dom = task->dom;
// TODO: Figure out how and why the port would become null.
if (port == NULL) {
dom->log(rust_log::COMM, "invalid port, transmission incomplete");
return ERROR;
if (!is_associated()) {
W(dom, is_associated(),
"rust_chan::transmit with no associated port.");
return;
}
if (buffer.is_empty()) {
dom->log(rust_log::COMM, "buffer is empty, transmission incomplete");
return ERROR;
A(dom, !buffer.is_empty(),
"rust_chan::transmit with nothing to send.");
if (port->is_proxy()) {
// TODO: Cache port task locally.
rust_proxy<rust_task> *port_task =
dom->get_task_proxy(port->delegate()->task);
data_message::send(buffer.peek(), buffer.unit_sz,
"send data", task, port_task, port->as_proxy());
buffer.dequeue(NULL);
} else {
rust_port *target_port = port->delegate();
if (target_port->task->blocked_on(target_port)) {
dom->log(rust_log::COMM, "dequeued in rendezvous_ptr");
buffer.dequeue(target_port->task->rendezvous_ptr);
target_port->task->rendezvous_ptr = 0;
target_port->task->wakeup(target_port);
return;
}
}
if(port->task->blocked_on(port)) {
buffer.dequeue(port->task->rendezvous_ptr);
port->task->wakeup(port);
}
return 0;
return;
}
//

View File

@ -1,24 +1,23 @@
#ifndef RUST_CHAN_H
#define RUST_CHAN_H
class rust_chan : public rc_base<rust_chan>, public task_owned<rust_chan> {
class rust_chan : public rc_base<rust_chan>,
public task_owned<rust_chan>,
public rust_cond {
public:
rust_chan(rust_task *task, rust_port *port);
rust_chan(rust_task *task, maybe_proxy<rust_port> *port);
~rust_chan();
rust_task *task;
rust_port *port;
maybe_proxy<rust_port> *port;
size_t idx;
circular_buffer buffer;
size_t idx; // Index into port->chans.
// Token belonging to this chan, it will be placed into a port's
// writers vector if we have something to send to the port.
rust_token token;
void associate(maybe_proxy<rust_port> *port);
void disassociate();
bool is_associated();
int transmit();
void transmit();
};
//

View File

@ -1,87 +1,13 @@
#include "rust_internal.h"
template class ptr_vec<rust_token>;
template class ptr_vec<rust_alarm>;
template class ptr_vec<rust_chan>;
rust_alarm::rust_alarm(rust_task *receiver) :
receiver(receiver)
{
}
// Ports.
rust_port::rust_port(rust_task *task, size_t unit_sz) :
task(task),
unit_sz(unit_sz),
writers(task->dom),
chans(task->dom)
{
task->log(rust_log::MEM|rust_log::COMM,
"new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%"
PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this);
}
rust_port::~rust_port()
{
task->log(rust_log::COMM|rust_log::MEM,
"~rust_port 0x%" PRIxPTR,
(uintptr_t)this);
while (chans.length() > 0)
chans.pop()->disassociate();
}
// Tokens.
rust_token::rust_token(rust_chan *chan) :
chan(chan),
idx(0),
submitted(false)
{
}
rust_token::~rust_token()
{
}
bool
rust_token::pending() const
{
return submitted;
}
void
rust_token::submit()
{
rust_port *port = chan->port;
rust_dom *dom = chan->task->dom;
I(dom, port);
I(dom, !submitted);
port->writers.push(this);
submitted = true;
}
void
rust_token::withdraw()
{
rust_task *task = chan->task;
rust_port *port = chan->port;
rust_dom *dom = task->dom;
I(dom, port);
I(dom, submitted);
if (task->blocked())
task->wakeup(this); // must be blocked on us (or dead)
port->writers.swap_delete(this);
submitted = false;
}
//
// Local Variables:
// mode: C++

View File

@ -46,6 +46,14 @@ rust_dom::delete_proxies() {
" in dom %" PRIxPTR, task_proxy, task_proxy->dom);
delete task_proxy;
}
rust_port *port;
rust_proxy<rust_port> *port_proxy;
while (_port_proxies.pop(&port, &port_proxy)) {
log(rust_log::TASK, "deleting proxy %" PRIxPTR
" in dom %" PRIxPTR, port_proxy, port_proxy->dom);
delete port_proxy;
}
}
rust_dom::~rust_dom() {
@ -217,7 +225,6 @@ rust_dom::reap_dead_tasks() {
for (size_t i = 0; i < dead_tasks.length(); ) {
rust_task *task = dead_tasks[i];
if (task->ref_count == 0) {
I(this, !task->waiting_tasks.length());
I(this, task->tasks_waiting_to_join.is_empty());
dead_tasks.swap_delete(task);
@ -270,6 +277,28 @@ rust_dom::get_task_proxy(rust_task *task) {
_task_proxies.put(task, proxy);
return proxy;
}
/**
* Gets a proxy for this port.
*
* TODO: This method needs to be synchronized since it's usually called
* during upcall_clone_chan in a different thread. However, for now
* since this usually happens before the thread actually starts,
* we may get lucky without synchronizing.
*
*/
rust_proxy<rust_port> *
rust_dom::get_port_proxy_synchronized(rust_port *port) {
rust_proxy<rust_port> *proxy = NULL;
if (_port_proxies.get(port, &proxy)) {
return proxy;
}
log(rust_log::COMM, "no proxy for 0x%" PRIxPTR, port);
proxy = new (this) rust_proxy<rust_port> (this, port, false);
_port_proxies.put(port, proxy);
return proxy;
}
/**
* Schedules a running task for execution. Only running tasks can be
* activated. Blocked tasks have to be unblocked before they can be

View File

@ -37,6 +37,7 @@ struct rust_dom
condition_variable _progress;
hash_map<rust_task *, rust_proxy<rust_task> *> _task_proxies;
hash_map<rust_port *, rust_proxy<rust_port> *> _port_proxies;
// Incoming messages from other domains.
condition_variable _incoming_message_pending;
@ -66,6 +67,7 @@ struct rust_dom
void drain_incoming_message_queue();
rust_proxy<rust_task> *get_task_proxy(rust_task *task);
void delete_proxies();
rust_proxy<rust_port> *get_port_proxy_synchronized(rust_port *port);
#ifdef __WIN32__
void win32_require(LPCTSTR fn, BOOL ok);

View File

@ -580,37 +580,11 @@ struct gc_alloc {
}
};
#include "circular_buffer.h"
#include "rust_proxy.h"
#include "rust_task.h"
struct rust_port : public rc_base<rust_port>,
public task_owned<rust_port>,
public rust_cond {
rust_task *task;
size_t unit_sz;
ptr_vec<rust_token> writers;
ptr_vec<rust_chan> chans;
rust_port(rust_task *task, size_t unit_sz);
~rust_port();
};
struct rust_token : public rust_cond {
rust_chan *chan; // Link back to the channel this token belongs to
size_t idx; // Index into port->writers.
bool submitted; // Whether token is in a port->writers.
rust_token(rust_chan *chan);
~rust_token();
bool pending() const;
void submit();
void withdraw();
};
#include "circular_buffer.h"
#include "rust_chan.h"
#include "rust_port.h"
//
// Local Variables:

View File

@ -27,6 +27,19 @@ notify_message(notification_type type, const char* label,
rust_message(label, source, target), type(type) {
}
data_message::
data_message(uint8_t *buffer, size_t buffer_sz, const char* label,
rust_task *source, rust_task *target, rust_port *port) :
rust_message(label, source, target),
_buffer_sz(buffer_sz), _port(port) {
_buffer = (uint8_t *)malloc(buffer_sz);
memcpy(_buffer, buffer, buffer_sz);
}
data_message::~data_message() {
free (_buffer);
}
/**
* Sends a message to the target task via a proxy. The message is allocated
* in the target task domain along with a proxy which points back to the
@ -63,6 +76,25 @@ void notify_message::process() {
}
}
void data_message::
send(uint8_t *buffer, size_t buffer_sz, const char* label, rust_task *source,
rust_proxy<rust_task> *target, rust_proxy<rust_port> *port) {
rust_task *target_task = target->delegate();
rust_port *target_port = port->delegate();
rust_dom *target_domain = target_task->dom;
data_message *message = new (target_domain)
data_message(buffer, buffer_sz, label, source,
target_task, target_port);
target_domain->send_message(message);
}
void data_message::process() {
_port->remote_channel->buffer.enqueue(_buffer);
_port->remote_channel->transmit();
_target->log(rust_log::COMM, "<=== received data via message ===");
}
//
// Local Variables:
// mode: C++

View File

@ -58,6 +58,30 @@ public:
rust_proxy<rust_task> *target);
};
/**
* Data messages carry a buffer.
*/
class data_message : public rust_message {
private:
uint8_t *_buffer;
size_t _buffer_sz;
rust_port *_port;
public:
data_message(uint8_t *buffer, size_t buffer_sz, const char* label,
rust_task *source, rust_task *target, rust_port *port);
~data_message();
void process();
/**
* This code executes in the sending domain's thread.
*/
static void
send(uint8_t *buffer, size_t buffer_sz, const char* label,
rust_task *source, rust_proxy<rust_task> *target,
rust_proxy<rust_port> *port);
};
//
// Local Variables:
// mode: C++

39
src/rt/rust_port.cpp Normal file
View File

@ -0,0 +1,39 @@
#include "rust_internal.h"
#include "rust_port.h"
rust_port::rust_port(rust_task *task, size_t unit_sz) :
maybe_proxy<rust_port>(this), task(task), unit_sz(unit_sz),
writers(task->dom), chans(task->dom) {
task->log(rust_log::MEM | rust_log::COMM,
"new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%"
PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this);
// Allocate a remote channel, for remote channel data.
remote_channel = new (task->dom) rust_chan(task, this);
}
rust_port::~rust_port() {
task->log(rust_log::COMM | rust_log::MEM,
"~rust_port 0x%" PRIxPTR, (uintptr_t) this);
// Disassociate channels from this port.
while (chans.is_empty() == false) {
chans.pop()->disassociate();
}
// We're the only ones holding a reference to the remote channel, so
// clean it up.
delete remote_channel;
}
//
// Local Variables:
// mode: C++
// fill-column: 78;
// indent-tabs-mode: nil
// c-basic-offset: 4
// buffer-file-coding-system: utf-8-unix
// compile-command: "make -k -C .. 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
// End:
//

31
src/rt/rust_port.h Normal file
View File

@ -0,0 +1,31 @@
#ifndef RUST_PORT_H
#define RUST_PORT_H
class rust_port : public maybe_proxy<rust_port>,
public task_owned<rust_port> {
public:
rust_task *task;
size_t unit_sz;
ptr_vec<rust_token> writers;
ptr_vec<rust_chan> chans;
// Data sent to this port from remote tasks is buffered in this channel.
rust_chan *remote_channel;
rust_port(rust_task *task, size_t unit_sz);
~rust_port();
};
//
// Local Variables:
// mode: C++
// fill-column: 78;
// indent-tabs-mode: nil
// c-basic-offset: 4
// buffer-file-coding-system: utf-8-unix
// compile-command: "make -k -C .. 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
// End:
//
#endif /* RUST_PORT_H */

View File

@ -64,7 +64,6 @@ rust_task::rust_task(rust_dom *dom, rust_task *spawner) :
cond(NULL),
supervisor(spawner),
idx(0),
waiting_tasks(dom),
rendezvous_ptr(0),
alarm(this)
{
@ -372,19 +371,6 @@ rust_task::unsupervise()
supervisor = NULL;
}
void
rust_task::notify_waiting_tasks()
{
while (waiting_tasks.length() > 0) {
log(rust_log::ALL, "notify_waiting_tasks: %d",
waiting_tasks.length());
rust_task *waiting_task = waiting_tasks.pop()->receiver;
if (!waiting_task->dead()) {
waiting_task->wakeup(this);
}
}
}
void
rust_task::notify_tasks_waiting_to_join() {
while (tasks_waiting_to_join.is_empty() == false) {

View File

@ -28,9 +28,6 @@ rust_task : public maybe_proxy<rust_task>,
size_t gc_alloc_thresh;
size_t gc_alloc_accum;
// Wait queue for tasks waiting for this task.
rust_wait_queue waiting_tasks;
// Rendezvous pointer for receiving data when blocked on a port. If we're
// trying to read data and no data is available on any incoming channel,
// we block on the port, and yield control to the scheduler. Since, we
@ -101,7 +98,6 @@ rust_task : public maybe_proxy<rust_task>,
void unsupervise();
// Notify tasks waiting for us that we are about to die.
void notify_waiting_tasks();
void notify_tasks_waiting_to_join();
uintptr_t get_fp();

View File

@ -21,11 +21,6 @@
extern "C" CDECL char const *str_buf(rust_task *task, rust_str *s);
inline bool
requires_message_passing(rust_task *sender, rust_task *receiver) {
return sender->dom != receiver->dom;
}
extern "C" void upcall_grow_task(rust_task *task, size_t n_frame_bytes) {
LOG_UPCALL_ENTRY(task);
task->grow(n_frame_bytes);
@ -96,6 +91,18 @@ extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) {
task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
"upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
I(dom, !chan->ref_count);
if (!chan->buffer.is_empty() && chan->is_associated()) {
A(dom, !chan->port->is_proxy(),
"Channels to remote ports should be flushed automatically.");
// A target port may still be reading from this channel.
// Block on this channel until it has been completely drained
// by the port.
task->block(chan);
task->yield(2);
return;
}
delete chan;
}
@ -105,14 +112,19 @@ extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) {
*/
extern "C" CDECL rust_chan *
upcall_clone_chan(rust_task *task,
maybe_proxy<rust_task> *spawnee_proxy,
maybe_proxy<rust_task> *target,
rust_chan *chan) {
LOG_UPCALL_ENTRY(task);
rust_task *spawnee = spawnee_proxy->delegate();
task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
"spawnee: 0x%" PRIxPTR ", chan: 0x%" PRIxPTR,
(uintptr_t) spawnee, (uintptr_t) chan);
return new (spawnee->dom) rust_chan(spawnee, chan->port);
task->log(rust_log::UPCALL | rust_log::COMM,
"target: 0x%" PRIxPTR ", chan: 0x%" PRIxPTR,
target, chan);
rust_task *target_task = target->delegate();
maybe_proxy<rust_port> *port = chan->port;
if (target->is_proxy()) {
port = target_task->dom->get_port_proxy_synchronized(
chan->port->as_delegate());
}
return new (target_task->dom) rust_chan(target_task, port);
}
extern "C" CDECL void upcall_yield(rust_task *task) {
@ -145,20 +157,20 @@ upcall_join(rust_task *task, maybe_proxy<rust_task> *target) {
}
/**
* Sends an chunk of data along the specified channel.
* Buffers a chunk of data in the specified channel.
*
* sptr: pointer to a chunk of data to send
* sptr: pointer to a chunk of data to buffer
*/
extern "C" CDECL void
upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
LOG_UPCALL_ENTRY(task);
task->log(rust_log::UPCALL | rust_log::COMM,
"chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d",
(uintptr_t) chan, (uintptr_t) sptr, chan->port->unit_sz);
(uintptr_t) chan, (uintptr_t) sptr,
chan->port->delegate()->unit_sz);
chan->buffer.enqueue(sptr);
chan->transmit();
task->log(rust_log::COMM, "=== WROTE DATA ===>");
task->log(rust_log::COMM, "=== sent data ===>");
}
extern "C" CDECL void
@ -174,7 +186,11 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
rust_chan *chan = port->chans[i];
if (chan->buffer.is_empty() == false) {
chan->buffer.dequeue(dptr);
task->log(rust_log::COMM, "<=== READ DATA ===");
if (chan->buffer.is_empty() && chan->task->blocked()) {
chan->task->wakeup(chan);
delete chan;
}
task->log(rust_log::COMM, "<=== read data ===");
return;
}
}
@ -183,6 +199,7 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
// on the port. Remember the rendezvous location so that any sender
// task can write to it before waking up this task.
task->log(rust_log::COMM, "<=== waiting for rendezvous data ===");
task->rendezvous_ptr = dptr;
task->block(port);
task->yield(3);