diff --git a/src/Makefile b/src/Makefile index d94e3e250f2..4449418e0f1 100644 --- a/src/Makefile +++ b/src/Makefile @@ -244,7 +244,10 @@ BOOT_CMXS := $(BOOT_MLS:.ml=.cmx) BOOT_OBJS := $(BOOT_MLS:.ml=.o) BOOT_CMIS := $(BOOT_MLS:.ml=.cmi) -RUNTIME_CS := rt/rust.cpp \ +RUNTIME_CS := rt/sync/spin_lock.cpp \ + rt/sync/lock_free_queue.cpp \ + rt/sync/condition_variable.cpp \ + rt/rust.cpp \ rt/rust_builtin.cpp \ rt/rust_crate.cpp \ rt/rust_crate_cache.cpp \ @@ -256,12 +259,19 @@ RUNTIME_CS := rt/rust.cpp \ rt/rust_upcall.cpp \ rt/rust_log.cpp \ rt/rust_timer.cpp \ + rt/circular_buffer.cpp \ rt/isaac/randport.cpp -RUNTIME_HDR := rt/rust.h \ +RUNTIME_HDR := rt/globals.h \ + rt/rust.h \ rt/rust_dwarf.h \ rt/rust_internal.h \ - rt/rust_util.h + rt/rust_util.h \ + rt/rust_chan.h \ + rt/rust_dom.h \ + rt/rust_task.h \ + rt/rust_proxy.h \ + rt/circular_buffer.h RUNTIME_INCS := -Irt/isaac -Irt/uthash RUNTIME_OBJS := $(RUNTIME_CS:.cpp=$(CFG_OBJ_SUFFIX)) @@ -363,6 +373,8 @@ TEST_XFAILS_X86 := $(MUT_BOX_XFAILS) \ test/run-pass/task-comm.rs \ test/run-pass/vec-alloc-append.rs \ test/run-pass/vec-slice.rs \ + test/run-pass/task-comm-3.rs \ + test/run-pass/task-comm-4.rs \ test/compile-fail/bad-recv.rs \ test/compile-fail/bad-send.rs \ test/compile-fail/infinite-tag-type-recursion.rs \ @@ -452,6 +464,11 @@ TEST_XFAILS_LLVM := $(addprefix test/run-pass/, \ tail-cps.rs \ tail-direct.rs \ task-comm.rs \ + task-comm-0.rs \ + task-comm-1.rs \ + task-comm-2.rs \ + task-comm-3.rs \ + task-comm-4.rs \ threads.rs \ tup.rs \ type-sizes.rs \ diff --git a/src/boot/be/abi.ml b/src/boot/be/abi.ml index c3e722936c0..e4bb3c9cdea 100644 --- a/src/boot/be/abi.ml +++ b/src/boot/be/abi.ml @@ -13,7 +13,7 @@ let rc_base_field_refcnt = 0;; let task_field_refcnt = rc_base_field_refcnt;; -let task_field_stk = task_field_refcnt + 1;; +let task_field_stk = task_field_refcnt + 2;; let task_field_runtime_sp = task_field_stk + 1;; let task_field_rust_sp = task_field_runtime_sp + 1;; let task_field_gc_alloc_chain = task_field_rust_sp + 1;; diff --git a/src/rt/circular_buffer.cpp b/src/rt/circular_buffer.cpp new file mode 100644 index 00000000000..0e1979c1371 --- /dev/null +++ b/src/rt/circular_buffer.cpp @@ -0,0 +1,118 @@ +/* + * A simple resizable circular buffer. + */ + +#include "rust_internal.h" + +circular_buffer::circular_buffer(rust_dom *dom, size_t unit_sz) : + dom(dom), + _buffer_sz(INITIAL_CIRCULAR_BUFFFER_SIZE_IN_UNITS * unit_sz), + unit_sz(unit_sz), + _next(0), + _unread(0), + _buffer((uint8_t *)dom->calloc(_buffer_sz)) { + + A(dom, unit_sz, "Unit size must be larger than zero."); + + dom->log(rust_log::MEM | rust_log::COMM, + "new circular_buffer(buffer_sz=%d, unread=%d)" + "-> circular_buffer=0x%" PRIxPTR, + _buffer_sz, _unread, this); + + A(dom, _buffer, "Failed to allocate buffer."); +} + +circular_buffer::~circular_buffer() { + dom->log(rust_log::MEM | rust_log::COMM, + "~circular_buffer 0x%" PRIxPTR, + this); + I(dom, _buffer); + // I(dom, unread == 0); + dom->free(_buffer); +} + +/** + * Copies the unread data from this buffer to the "dst" address. + */ +void +circular_buffer::transfer(void *dst) { + I(dom, dst); + uint8_t *ptr = (uint8_t *) dst; + for (size_t i = 0; i < _unread; i += unit_sz) { + memcpy(&ptr[i], &_buffer[_next + i % _buffer_sz], unit_sz); + } +} + +/** + * Copies the data at the "src" address into this buffer. The buffer is + * grown if it isn't large enough. + */ +void +circular_buffer::enqueue(void *src) { + I(dom, src); + I(dom, _unread <= _buffer_sz); + + // Grow if necessary. + if (_unread == _buffer_sz) { + I(dom, _buffer_sz <= MAX_CIRCULAR_BUFFFER_SIZE); + void *tmp = dom->malloc(_buffer_sz << 1); + transfer(tmp); + _buffer_sz <<= 1; + dom->free(_buffer); + _buffer = (uint8_t *)tmp; + } + + dom->log(rust_log::MEM | rust_log::COMM, + "circular_buffer enqueue " + "unread: %d, buffer_sz: %d, unit_sz: %d", + _unread, _buffer_sz, unit_sz); + + I(dom, _unread < _buffer_sz); + I(dom, _unread + unit_sz <= _buffer_sz); + + // Copy data + size_t i = (_next + _unread) % _buffer_sz; + memcpy(&_buffer[i], src, unit_sz); + _unread += unit_sz; + + dom->log(rust_log::MEM | rust_log::COMM, + "circular_buffer pushed data at index: %d", i); +} + +/** + * Copies data from this buffer to the "dst" address. The buffer is + * shrunk if possible. + */ +void +circular_buffer::dequeue(void *dst) { + I(dom, dst); + I(dom, unit_sz > 0); + I(dom, _unread >= unit_sz); + I(dom, _unread <= _buffer_sz); + I(dom, _buffer); + size_t i = _next; + memcpy(dst, &_buffer[i], unit_sz); + dom->log(rust_log::MEM | rust_log::COMM, + "shifted data from index %d", i); + _unread -= unit_sz; + _next += unit_sz; + I(dom, _next <= _buffer_sz); + if (_next == _buffer_sz) { + _next = 0; + } + + // Shrink if possible. + if (_buffer_sz >= INITIAL_CIRCULAR_BUFFFER_SIZE_IN_UNITS * unit_sz && + _unread <= _buffer_sz / 4) { + void *tmp = dom->malloc(_buffer_sz / 2); + transfer(tmp); + _buffer_sz >>= 1; + dom->free(_buffer); + _buffer = (uint8_t *)tmp; + } +} + +bool +circular_buffer::is_empty() { + return _unread == 0; +} diff --git a/src/rt/circular_buffer.h b/src/rt/circular_buffer.h new file mode 100644 index 00000000000..c0c0da5eb1a --- /dev/null +++ b/src/rt/circular_buffer.h @@ -0,0 +1,30 @@ +/* + * + */ + +#ifndef CIRCULAR_BUFFER_H +#define CIRCULAR_BUFFER_H + +class +circular_buffer : public dom_owned { + static const size_t INITIAL_CIRCULAR_BUFFFER_SIZE_IN_UNITS = 8; + static const size_t MAX_CIRCULAR_BUFFFER_SIZE = 1 << 24; + +public: + rust_dom *dom; + circular_buffer(rust_dom *dom, size_t unit_sz); + ~circular_buffer(); + void transfer(void *dst); + void enqueue(void *src); + void dequeue(void *dst); + bool is_empty(); + +private: + size_t _buffer_sz; + size_t unit_sz; + size_t _next; + size_t _unread; + uint8_t *_buffer; +}; + +#endif /* CIRCULAR_BUFFER_H */ diff --git a/src/rt/globals.h b/src/rt/globals.h new file mode 100644 index 00000000000..f8025a406b4 --- /dev/null +++ b/src/rt/globals.h @@ -0,0 +1,33 @@ +#ifndef GLOBALS_H +#define GLOBALS_H + +#define __STDC_LIMIT_MACROS 1 +#define __STDC_CONSTANT_MACROS 1 +#define __STDC_FORMAT_MACROS 1 + +#include +#include +#include + +#include +#include + +#if defined(__WIN32__) +extern "C" { +#include +#include +#include +} +#elif defined(__GNUC__) +#include +#include +#include +#include +#include +#include +#include +#else +#error "Platform not supported." +#endif + +#endif /* GLOBALS_H */ diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index 235eb8d0d98..00e709c9748 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -1,7 +1,6 @@ #include "rust_internal.h" #include "util/array_list.h" - // #define TRACK_ALLOCATIONS // For debugging, keeps track of live allocations, so you can find out // exactly what leaked. @@ -100,52 +99,6 @@ rust_srv::clone() return new rust_srv(); } - -int -rust_main_loop(rust_dom *dom) -{ - // Make sure someone is watching, to pull us out of infinite loops. - rust_timer timer(*dom); - - int rval; - rust_task *task; - - dom->log(rust_log::DOM, - "running main-loop on domain 0x%" PRIxPTR, dom); - dom->logptr("exit-task glue", - dom->root_crate->get_exit_task_glue()); - - while ((task = dom->sched()) != NULL) { - I(dom, task->running()); - - dom->log(rust_log::TASK, - "activating task 0x%" PRIxPTR ", sp=0x%" PRIxPTR, - (uintptr_t)task, task->rust_sp); - - dom->interrupt_flag = 0; - - dom->activate(task); - - dom->log(rust_log::TASK, - "returned from task 0x%" PRIxPTR - " in state '%s', sp=0x%" PRIxPTR, - (uintptr_t)task, - dom->state_vec_name(task->state), - task->rust_sp); - - I(dom, task->rust_sp >= (uintptr_t) &task->stk->data[0]); - I(dom, task->rust_sp < task->stk->limit); - - dom->reap_dead_tasks(); - } - - dom->log(rust_log::DOM, "finished main-loop (dom.rval = %d)", dom->rval); - rval = dom->rval; - - return rval; -} - - struct command_line_args { @@ -243,7 +196,7 @@ rust_start(uintptr_t main_fn, rust_crate const *crate, int argc, char **argv) (uintptr_t)&main_args, sizeof(main_args)); - ret = rust_main_loop(&dom); + ret = dom.start_main_loop(); } #if !defined(__WIN32__) diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 339452c5129..0b492de84a1 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -19,7 +19,7 @@ str_alloc(rust_task *task, size_t n_bytes) extern "C" CDECL rust_str* last_os_error(rust_task *task) { rust_dom *dom = task->dom; - dom->log(rust_log::TASK, "last_os_error()"); + task->log(rust_log::TASK, "last_os_error()"); #if defined(__WIN32__) LPTSTR buf; @@ -95,7 +95,7 @@ extern "C" CDECL rust_vec* vec_alloc(rust_task *task, type_desc *t, size_t n_elts) { rust_dom *dom = task->dom; - dom->log(rust_log::MEM, + task->log(rust_log::MEM, "vec_alloc %" PRIdPTR " elements of size %" PRIdPTR, n_elts, t->size); size_t fill = n_elts * t->size; diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index 38f93a7da9f..6aa9121aa14 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -1,29 +1,29 @@ - #include "rust_internal.h" #include "rust_chan.h" rust_chan::rust_chan(rust_task *task, rust_port *port) : - task(task), - port(port), - buffer(task->dom, port->unit_sz), - token(this) -{ - if (port) + task(task), port(port), buffer(task->dom, port->unit_sz), token(this) { + + if (port) { port->chans.push(this); + ref(); + } + + task->log(rust_log::MEM | rust_log::COMM, + "new rust_chan(task=0x%" PRIxPTR + ", port=0x%" PRIxPTR ") -> chan=0x%" + PRIxPTR, (uintptr_t) task, (uintptr_t) port, (uintptr_t) this); } -rust_chan::~rust_chan() -{ +rust_chan::~rust_chan() { if (port) { if (token.pending()) token.withdraw(); - port->chans.swapdel(this); + port->chans.swap_delete(this); } } -void -rust_chan::disassociate() -{ +void rust_chan::disassociate() { I(task->dom, port); if (token.pending()) @@ -31,4 +31,32 @@ rust_chan::disassociate() // Delete reference to the port/ port = NULL; + + deref(); +} + +/** + * Attempt to transmit channel data to the associated port. + */ +int rust_chan::transmit() { + rust_dom *dom = task->dom; + + // TODO: Figure out how and why the port would become null. + if (port == NULL) { + dom->log(rust_log::COMM, "invalid port, transmission incomplete"); + return ERROR; + } + + if (buffer.is_empty()) { + dom->log(rust_log::COMM, "buffer is empty, transmission incomplete"); + return ERROR; + } + + if(port->task->blocked_on(port)) { + buffer.dequeue(port->task->rendezvous_ptr); + port->task->wakeup(port); + } + + return 0; + } diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h index a56ba0ca481..3e32d83837b 100644 --- a/src/rt/rust_chan.h +++ b/src/rt/rust_chan.h @@ -9,7 +9,7 @@ public: rust_task *task; rust_port *port; - circ_buf buffer; + circular_buffer buffer; size_t idx; // Index into port->chans. // Token belonging to this chan, it will be placed into a port's @@ -17,6 +17,8 @@ public: rust_token token; void disassociate(); + + int transmit(); }; #endif /* RUST_CHAN_H */ diff --git a/src/rt/rust_comm.cpp b/src/rt/rust_comm.cpp index 58b9ef4c424..37929b8b05a 100644 --- a/src/rt/rust_comm.cpp +++ b/src/rt/rust_comm.cpp @@ -10,109 +10,6 @@ rust_alarm::rust_alarm(rust_task *receiver) : { } - -// Circular buffers. - -circ_buf::circ_buf(rust_dom *dom, size_t unit_sz) : - dom(dom), - alloc(INIT_CIRC_BUF_UNITS * unit_sz), - unit_sz(unit_sz), - next(0), - unread(0), - data((uint8_t *)dom->calloc(alloc)) -{ - I(dom, unit_sz); - dom->log(rust_log::MEM|rust_log::COMM, - "new circ_buf(alloc=%d, unread=%d) -> circ_buf=0x%" PRIxPTR, - alloc, unread, this); - I(dom, data); -} - -circ_buf::~circ_buf() -{ - dom->log(rust_log::MEM|rust_log::COMM, - "~circ_buf 0x%" PRIxPTR, - this); - I(dom, data); - // I(dom, unread == 0); - dom->free(data); -} - -void -circ_buf::transfer(void *dst) -{ - size_t i; - uint8_t *d = (uint8_t *)dst; - I(dom, dst); - for (i = 0; i < unread; i += unit_sz) - memcpy(&d[i], &data[next + i % alloc], unit_sz); -} - -void -circ_buf::push(void *src) -{ - size_t i; - void *tmp; - - I(dom, src); - I(dom, unread <= alloc); - - /* Grow if necessary. */ - if (unread == alloc) { - I(dom, alloc <= MAX_CIRC_BUF_SIZE); - tmp = dom->malloc(alloc << 1); - transfer(tmp); - alloc <<= 1; - dom->free(data); - data = (uint8_t *)tmp; - } - - dom->log(rust_log::MEM|rust_log::COMM, - "circ buf push, unread=%d, alloc=%d, unit_sz=%d", - unread, alloc, unit_sz); - - I(dom, unread < alloc); - I(dom, unread + unit_sz <= alloc); - - i = (next + unread) % alloc; - memcpy(&data[i], src, unit_sz); - - dom->log(rust_log::MEM|rust_log::COMM, "pushed data at index %d", i); - unread += unit_sz; -} - -void -circ_buf::shift(void *dst) -{ - size_t i; - void *tmp; - - I(dom, dst); - I(dom, unit_sz > 0); - I(dom, unread >= unit_sz); - I(dom, unread <= alloc); - I(dom, data); - i = next; - memcpy(dst, &data[i], unit_sz); - dom->log(rust_log::MEM|rust_log::COMM, "shifted data from index %d", i); - unread -= unit_sz; - next += unit_sz; - I(dom, next <= alloc); - if (next == alloc) - next = 0; - - /* Shrink if necessary. */ - if (alloc >= INIT_CIRC_BUF_UNITS * unit_sz && - unread <= alloc / 4) { - tmp = dom->malloc(alloc / 2); - transfer(tmp); - alloc >>= 1; - dom->free(data); - data = (uint8_t *)tmp; - } -} - - // Ports. rust_port::rust_port(rust_task *task, size_t unit_sz) : @@ -121,18 +18,16 @@ rust_port::rust_port(rust_task *task, size_t unit_sz) : writers(task->dom), chans(task->dom) { - rust_dom *dom = task->dom; - dom->log(rust_log::MEM|rust_log::COMM, - "new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%" - PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this); + task->log(rust_log::MEM|rust_log::COMM, + "new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%" + PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this); } rust_port::~rust_port() { - rust_dom *dom = task->dom; - dom->log(rust_log::COMM|rust_log::MEM, - "~rust_port 0x%" PRIxPTR, - (uintptr_t)this); + task->log(rust_log::COMM|rust_log::MEM, + "~rust_port 0x%" PRIxPTR, + (uintptr_t)this); while (chans.length() > 0) chans.pop()->disassociate(); } @@ -182,7 +77,7 @@ rust_token::withdraw() if (task->blocked()) task->wakeup(this); // must be blocked on us (or dead) - port->writers.swapdel(this); + port->writers.swap_delete(this); submitted = false; } diff --git a/src/rt/rust_crate_cache.cpp b/src/rt/rust_crate_cache.cpp index 2db0eb458d7..650e3bb1ef3 100644 --- a/src/rt/rust_crate_cache.cpp +++ b/src/rt/rust_crate_cache.cpp @@ -251,7 +251,7 @@ rust_crate_cache::flush() { if (s) { dom->log(rust_log::CACHE, "rust_crate_cache::flush() deref rust_sym %" - PRIdPTR " (rc=%" PRIdPTR ")", i, s->refcnt); + PRIdPTR " (rc=%" PRIdPTR ")", i, s->ref_count); s->deref(); } rust_syms[i] = NULL; @@ -262,7 +262,7 @@ rust_crate_cache::flush() { if (s) { dom->log(rust_log::CACHE, "rust_crate_cache::flush() deref c_sym %" - PRIdPTR " (rc=%" PRIdPTR ")", i, s->refcnt); + PRIdPTR " (rc=%" PRIdPTR ")", i, s->ref_count); s->deref(); } c_syms[i] = NULL; @@ -272,7 +272,7 @@ rust_crate_cache::flush() { lib *l = libs[i]; if (l) { dom->log(rust_log::CACHE, "rust_crate_cache::flush() deref lib %" - PRIdPTR " (rc=%" PRIdPTR ")", i, l->refcnt); + PRIdPTR " (rc=%" PRIdPTR ")", i, l->ref_count); l->deref(); } libs[i] = NULL; diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp index 3b5e23b20c5..39124491a28 100644 --- a/src/rt/rust_dom.cpp +++ b/src/rt/rust_dom.cpp @@ -4,6 +4,24 @@ template class ptr_vec; +rust_message::rust_message(rust_dom *dom) : dom(dom) { + +} + +void rust_message::process() { + +} + +kill_task_message::kill_task_message(rust_dom *dom, rust_task *task) : + rust_message(dom), _task(task) { + +} + +void kill_task_message::process() { + _task->ref_count--; + _task->kill(); +} + rust_dom::rust_dom(rust_srv *srv, rust_crate const *root_crate) : interrupt_flag(0), root_crate(root_crate), @@ -80,6 +98,18 @@ rust_dom::activate(rust_task *task) { curr_task = NULL; } +void +rust_dom::log(rust_task *task, uint32_t type_bits, char const *fmt, ...) { + char buf[256]; + if (_log.is_tracing(type_bits)) { + va_list args; + va_start(args, fmt); + vsnprintf(buf, sizeof(buf), fmt, args); + _log.trace_ln(task, type_bits, buf); + va_end(args); + } +} + void rust_dom::log(uint32_t type_bits, char const *fmt, ...) { char buf[256]; @@ -87,7 +117,7 @@ rust_dom::log(uint32_t type_bits, char const *fmt, ...) { va_list args; va_start(args, fmt); vsnprintf(buf, sizeof(buf), fmt, args); - _log.trace_ln(type_bits, buf); + _log.trace_ln(NULL, type_bits, buf); va_end(args); } } @@ -189,7 +219,7 @@ rust_dom::remove_task_from_state_vec(ptr_vec *v, rust_task *task) "removing task 0x%" PRIxPTR " in state '%s' from vec 0x%" PRIxPTR, (uintptr_t)task, state_vec_name(v), (uintptr_t)v); I(this, (*v)[task->idx] == task); - v->swapdel(task); + v->swap_delete(task); } const char * @@ -203,25 +233,67 @@ rust_dom::state_vec_name(ptr_vec *v) return "dead"; } +/** + * Delete any dead tasks. + */ void -rust_dom::reap_dead_tasks() -{ +rust_dom::reap_dead_tasks() { for (size_t i = 0; i < dead_tasks.length(); ) { - rust_task *t = dead_tasks[i]; - if (t == root_task || t->refcnt == 0) { - I(this, !t->waiting_tasks.length()); - dead_tasks.swapdel(t); + rust_task *task = dead_tasks[i]; +// log(rust_log::TASK, "dead task 0x%" PRIxPTR " with ref_count: %d", +// task, task->ref_count); + if (task->ref_count == 0) { + I(this, !task->waiting_tasks.length()); + dead_tasks.swap_delete(task); log(rust_log::TASK, - "deleting unreferenced dead task 0x%" PRIxPTR, t); - delete t; + "deleting unreferenced dead task 0x%" PRIxPTR, task); + delete task; continue; } ++i; } } + +/** + * Enqueues a message in this domain's incoming message queue. It's the + * responsibility of the receiver to free the message once it's processed. + */ +void rust_dom::send_message(rust_message *message) { + log(rust_log::COMM, "enqueueing message 0x%" PRIxPTR + " in queue 0x%" PRIxPTR, + message, + &_incoming_message_queue); + _incoming_message_queue.enqueue(message); + _incoming_message_pending.signal(); +} + +/** + * Drains and processes incoming pending messages. + */ +void rust_dom::drain_incoming_message_queue() { + rust_message *message; + while ((message = (rust_message *) _incoming_message_queue.dequeue())) { + log(rust_log::COMM, "read 0x%" PRIxPTR + " from queue 0x%" PRIxPTR, + message, + &_incoming_message_queue); + log(rust_log::COMM, "processing incoming message 0x%" PRIxPTR, + message); + message->process(); + delete message; + } +} + +/** + * Schedules a running task for execution. Only running tasks can be + * activated. Blocked tasks have to be unblocked before they can be + * activated. + * + * Returns NULL if no tasks can be scheduled. + */ rust_task * -rust_dom::sched() +rust_dom::schedule_task() { I(this, this); // FIXME: in the face of failing tasks, this is not always right. @@ -231,11 +303,88 @@ rust_dom::sched() i %= running_tasks.length(); return (rust_task *)running_tasks[i]; } - log(rust_log::DOM|rust_log::TASK, - "no schedulable tasks"); + // log(rust_log::DOM|rust_log::TASK, "no schedulable tasks"); return NULL; } +/** + * Starts the main scheduler loop which performs task scheduling for this + * domain. + * + * Returns once no more tasks can be scheduled. + */ +int +rust_dom::start_main_loop() +{ + // Make sure someone is watching, to pull us out of infinite loops. + rust_timer timer(this); + + log(rust_log::DOM, "running main-loop on domain 0x%" PRIxPTR, this); + logptr("exit-task glue", root_crate->get_exit_task_glue()); + + while (n_live_tasks() > 0) { + rust_task *scheduled_task = schedule_task(); + + // If we cannot schedule a task because all other live tasks + // are blocked, wait on a condition variable which is signaled + // if progress is made in other domains. + + if (scheduled_task == NULL) { + log(rust_log::TASK, + "all tasks are blocked, waiting for progress ..."); + _progress.wait(); + continue; + } + + I(this, scheduled_task->running()); + + log(rust_log::TASK, + "activating task 0x%" PRIxPTR ", sp=x%" PRIxPTR, + (uintptr_t)scheduled_task, scheduled_task->rust_sp); + + interrupt_flag = 0; + + activate(scheduled_task); + + log(rust_log::TASK, + "returned from task 0x%" PRIxPTR + " in state '%s', sp=0x%" PRIxPTR, + (uintptr_t)scheduled_task, + state_vec_name(scheduled_task->state), + scheduled_task->rust_sp); + + I(this, scheduled_task->rust_sp >= + (uintptr_t) &scheduled_task->stk->data[0]); + I(this, scheduled_task->rust_sp < scheduled_task->stk->limit); + + drain_incoming_message_queue(); + + reap_dead_tasks(); + } + + log(rust_log::DOM, "terminated scheduler loop, reaping dead tasks ..."); + + while (dead_tasks.length() > 0) { + log(rust_log::DOM, + "waiting for %d dead tasks to become dereferenced ...", + dead_tasks.length()); + + log(rust_log::DOM, + "waiting for %" PRIxPTR, dead_tasks[0]); + + if (_incoming_message_queue.is_empty()) { + _incoming_message_pending.wait(); + } else { + drain_incoming_message_queue(); + } + reap_dead_tasks(); + } + + log(rust_log::DOM, "finished main-loop (dom.rval = %d)", rval); + return rval; +} + + rust_crate_cache * rust_dom::get_cache(rust_crate const *crate) { log(rust_log::CACHE, diff --git a/src/rt/rust_dom.h b/src/rt/rust_dom.h new file mode 100644 index 00000000000..38d04dbfb53 --- /dev/null +++ b/src/rt/rust_dom.h @@ -0,0 +1,92 @@ +/* + * rust_dom.h + */ + +#ifndef RUST_DOM_H +#define RUST_DOM_H + +#include "sync/lock_free_queue.h" + +class rust_message : public lock_free_queue_node, + public dom_owned { +public: + rust_dom *dom; + rust_message(rust_dom *dom); + virtual void process(); +}; + +class kill_task_message : public rust_message { + rust_task *_task; +public: + kill_task_message(rust_dom *dom, rust_task *task); + void process(); +}; + +struct rust_dom +{ + // Fields known to the compiler: + uintptr_t interrupt_flag; + + // Fields known only by the runtime: + + // NB: the root crate must remain in memory until the root of the + // tree of domains exits. All domains within this tree have a + // copy of this root_crate value and use it for finding utility + // glue. + rust_crate const *root_crate; + rust_log _log; + rust_srv *srv; + ptr_vec running_tasks; + ptr_vec blocked_tasks; + ptr_vec dead_tasks; + ptr_vec caches; + randctx rctx; + rust_task *root_task; + rust_task *curr_task; + int rval; + + condition_variable _progress; + + // Incoming messages from other domains. + condition_variable _incoming_message_pending; + lock_free_queue _incoming_message_queue; + +#ifndef __WIN32__ + pthread_attr_t attr; +#endif + + rust_dom(rust_srv *srv, rust_crate const *root_crate); + ~rust_dom(); + + void activate(rust_task *task); + void log(rust_task *task, uint32_t logbit, char const *fmt, ...); + void log(uint32_t logbit, char const *fmt, ...); + rust_log & get_log(); + void logptr(char const *msg, uintptr_t ptrval); + template + void logptr(char const *msg, T* ptrval); + void fail(); + void *malloc(size_t sz); + void *calloc(size_t sz); + void *realloc(void *data, size_t sz); + void free(void *p); + + void send_message(rust_message *message); + void drain_incoming_message_queue(); + +#ifdef __WIN32__ + void win32_require(LPCTSTR fn, BOOL ok); +#endif + + rust_crate_cache *get_cache(rust_crate const *crate); + size_t n_live_tasks(); + void add_task_to_state_vec(ptr_vec *v, rust_task *task); + void remove_task_from_state_vec(ptr_vec *v, rust_task *task); + const char *state_vec_name(ptr_vec *v); + + void reap_dead_tasks(); + rust_task *schedule_task(); + int start_main_loop(); +}; + +#endif /* RUST_DOM_H */ diff --git a/src/rt/rust_internal.h b/src/rt/rust_internal.h index f877cefc71d..d962e894153 100644 --- a/src/rt/rust_internal.h +++ b/src/rt/rust_internal.h @@ -5,6 +5,8 @@ #define __STDC_CONSTANT_MACROS 1 #define __STDC_FORMAT_MACROS 1 +#define ERROR 0 + #include #include #include @@ -36,13 +38,18 @@ extern "C" { #error "Platform not supported." #endif +#include "sync/condition_variable.h" + #ifndef __i386__ #error "Target CPU not supported." #endif -#define I(dom, e) ((e) ? (void)0 : \ +#define I(dom, e) ((e) ? (void)0 : \ (dom)->srv->fatal(#e, __FILE__, __LINE__)) +#define A(dom, e, s) ((e) ? (void)0 : \ + (dom)->srv->fatal(#e " : " #s, __FILE__, __LINE__)) + struct rust_task; struct rust_port; class rust_chan; @@ -50,7 +57,7 @@ struct rust_token; struct rust_dom; class rust_crate; class rust_crate_cache; -class lockfree_queue; +// class lockfree_queue; struct stk_seg; struct type_desc; @@ -66,14 +73,14 @@ template struct rc_base { - size_t refcnt; + size_t ref_count; void ref() { - ++refcnt; + ++ref_count; } void deref() { - if (--refcnt == 0) { + if (--ref_count == 0) { delete (T*)this; } } @@ -122,71 +129,29 @@ public: return fill; } + bool is_empty() { + return fill == 0; + } + T *& operator[](size_t offset); void push(T *p); T *pop(); void trim(size_t fill); - void swapdel(T* p); + void swap_delete(T* p); }; -struct -rust_dom -{ - // Fields known to the compiler: - uintptr_t interrupt_flag; +#include "rust_dom.h" - // Fields known only by the runtime: +template inline T +check_null(rust_dom *dom, T value, char const *expr, + char const *file, size_t line) { + if (value == NULL) { + dom->srv->fatal(expr, file, line); + } + return value; +} - // NB: the root crate must remain in memory until the root of the - // tree of domains exits. All domains within this tree have a - // copy of this root_crate value and use it for finding utility - // glue. - rust_crate const *root_crate; - rust_log _log; - rust_srv *srv; - // uint32_t logbits; - ptr_vec running_tasks; - ptr_vec blocked_tasks; - ptr_vec dead_tasks; - ptr_vec caches; - randctx rctx; - rust_task *root_task; - rust_task *curr_task; - int rval; - lockfree_queue *incoming; // incoming messages from other threads - -#ifndef __WIN32__ - pthread_attr_t attr; -#endif - - rust_dom(rust_srv *srv, rust_crate const *root_crate); - ~rust_dom(); - - void activate(rust_task *task); - void log(uint32_t logbit, char const *fmt, ...); - rust_log & get_log(); - void logptr(char const *msg, uintptr_t ptrval); - template - void logptr(char const *msg, T* ptrval); - void fail(); - void *malloc(size_t sz); - void *calloc(size_t sz); - void *realloc(void *data, size_t sz); - void free(void *p); - -#ifdef __WIN32__ - void win32_require(LPCTSTR fn, BOOL ok); -#endif - - rust_crate_cache *get_cache(rust_crate const *crate); - size_t n_live_tasks(); - void add_task_to_state_vec(ptr_vec *v, rust_task *task); - void remove_task_from_state_vec(ptr_vec *v, rust_task *task); - const char *state_vec_name(ptr_vec *v); - - void reap_dead_tasks(); - rust_task *sched(); -}; +#define CHECK_NULL(dom, e) (check_null(dom, e, #e, __FILE__, __LINE__)) inline void *operator new(size_t sz, void *mem) { return mem; @@ -217,7 +182,7 @@ rust_timer // For now it's just the most basic "thread that can interrupt // its associated domain-thread" device, so that we have // *some* form of task-preemption. - rust_dom &dom; + rust_dom *dom; uintptr_t exit_flag; #if defined(__WIN32__) @@ -227,7 +192,7 @@ rust_timer pthread_t thread; #endif - rust_timer(rust_dom &dom); + rust_timer(rust_dom *dom); ~rust_timer(); }; @@ -608,94 +573,8 @@ struct gc_alloc { } }; -struct -rust_task : public rc_base, - public dom_owned, - public rust_cond -{ - // Fields known to the compiler. - stk_seg *stk; - uintptr_t runtime_sp; // Runtime sp while task running. - uintptr_t rust_sp; // Saved sp when not running. - gc_alloc *gc_alloc_chain; // Linked list of GC allocations. - rust_dom *dom; - rust_crate_cache *cache; - - // Fields known only to the runtime. - ptr_vec *state; - rust_cond *cond; - uintptr_t* dptr; // Rendezvous pointer for send/recv. - rust_task *supervisor; // Parent-link for failure propagation. - size_t idx; - size_t gc_alloc_thresh; - size_t gc_alloc_accum; - - // Wait queue for tasks waiting for this task. - rust_wait_queue waiting_tasks; - rust_alarm alarm; - - rust_task(rust_dom *dom, - rust_task *spawner); - ~rust_task(); - - void start(uintptr_t exit_task_glue, - uintptr_t spawnee_fn, - uintptr_t args, - size_t callsz); - void grow(size_t n_frame_bytes); - bool running(); - bool blocked(); - bool blocked_on(rust_cond *cond); - bool dead(); - - void link_gc(gc_alloc *gcm); - void unlink_gc(gc_alloc *gcm); - void *malloc(size_t sz, type_desc *td=0); - void *realloc(void *data, size_t sz, bool gc_mem=false); - void free(void *p, bool gc_mem=false); - - const char *state_str(); - void transition(ptr_vec *svec, ptr_vec *dvec); - - void block(rust_cond *on); - void wakeup(rust_cond *from); - void die(); - void unblock(); - - void check_active() { I(dom, dom->curr_task == this); } - void check_suspended() { I(dom, dom->curr_task != this); } - - // Swap in some glue code to run when we have returned to the - // task's context (assuming we're the active task). - void run_after_return(size_t nargs, uintptr_t glue); - - // Swap in some glue code to run when we're next activated - // (assuming we're the suspended task). - void run_on_resume(uintptr_t glue); - - // Save callee-saved registers and return to the main loop. - void yield(size_t nargs); - - // Fail this task (assuming caller-on-stack is different task). - void kill(); - - // Fail self, assuming caller-on-stack is this task. - void fail(size_t nargs); - - // Run the gc glue on the task stack. - void gc(size_t nargs); - - // Disconnect from our supervisor. - void unsupervise(); - - // Notify tasks waiting for us that we are about to die. - void notify_waiting_tasks(); - - uintptr_t get_fp(); - uintptr_t get_previous_fp(uintptr_t fp); - frame_glue_fns *get_frame_glue_fns(uintptr_t fp); - rust_crate_cache * get_crate_cache(rust_crate const *curr_crate); -}; +#include "rust_proxy.h" +#include "rust_task.h" struct rust_port : public rc_base, public task_owned, @@ -722,31 +601,29 @@ struct rust_token : public rust_cond { void withdraw(); }; +#include "circular_buffer.h" -struct circ_buf : public dom_owned { - static const size_t INIT_CIRC_BUF_UNITS = 8; - static const size_t MAX_CIRC_BUF_SIZE = 1 << 24; - - rust_dom *dom; - size_t alloc; - size_t unit_sz; - size_t next; - size_t unread; - uint8_t *data; - - circ_buf(rust_dom *dom, size_t unit_sz); - ~circ_buf(); - - void transfer(void *dst); - void push(void *src); - void shift(void *dst); -}; +//struct circ_buf : public dom_owned { +// static const size_t INIT_CIRC_BUF_UNITS = 8; +// static const size_t MAX_CIRC_BUF_SIZE = 1 << 24; +// +// rust_dom *dom; +// size_t alloc; +// size_t unit_sz; +// size_t next; +// size_t unread; +// uint8_t *data; +// +// circ_buf(rust_dom *dom, size_t unit_sz); +// ~circ_buf(); +// +// void transfer(void *dst); +// void push(void *src); +// void shift(void *dst); +//}; #include "rust_chan.h" -int -rust_main_loop(rust_dom *dom); - // // Local Variables: // mode: C++ diff --git a/src/rt/rust_log.cpp b/src/rt/rust_log.cpp index 5cdf315c7c6..b67876ebe6a 100644 --- a/src/rt/rust_log.cpp +++ b/src/rt/rust_log.cpp @@ -4,9 +4,13 @@ */ #include "rust_internal.h" +#include "sync/spin_lock.h" +#include "util/array_list.h" +#include -static uint32_t read_type_bit_mask() { - uint32_t bits = rust_log::ULOG | rust_log::ERR; +static uint32_t +read_type_bit_mask() { + uint32_t bits = rust_log::ULOG | rust_log::ERR | rust_log::ALL; char *env_str = getenv("RUST_LOG"); if (env_str) { bits = 0; @@ -27,92 +31,167 @@ static uint32_t read_type_bit_mask() { return bits; } -rust_log::ansi_color rust_log::get_type_color(log_type type) { - switch (type) { - case ERR: - return rust_log::RED; - case UPCALL: - return rust_log::GREEN; - case COMM: - return rust_log::MAGENTA; - case DOM: - case TASK: - return rust_log::LIGHTTEAL; - case MEM: - return rust_log::YELLOW; - default: - return rust_log::WHITE; - } +rust_log::ansi_color +get_type_color(rust_log::log_type type) { + rust_log::ansi_color color = rust_log::WHITE; + if (type & rust_log::ERR) + color = rust_log::RED; + if (type & rust_log::MEM) + color = rust_log::YELLOW; + if (type & rust_log::UPCALL) + color = rust_log::GREEN; + if (type & rust_log::COMM) + color = rust_log::MAGENTA; + if (type & rust_log::DOM) + color = rust_log::LIGHTTEAL; + if (type & rust_log::TASK) + color = rust_log::LIGHTTEAL; + return color; } -static const char * _foreground_colors[] = { "[30m", "[1;30m", "[37m", - "[31m", "[1;31m", "[32m", - "[1;32m", "[33m", "[33m", - "[34m", "[1;34m", "[35m", - "[1;35m", "[36m", "[1;36m" }; +static const char * _foreground_colors[] = { "[37m", + "[31m", "[1;31m", + "[32m", "[1;32m", + "[33m", "[1;33m", + "[31m", "[1;31m", + "[35m", "[1;35m", + "[36m", "[1;36m" }; + +/** + * Synchronizes access to the underlying logging mechanism. + */ +static spin_lock _log_lock; + rust_log::rust_log(rust_srv *srv, rust_dom *dom) : - _srv(srv), _dom(dom), _type_bit_mask(read_type_bit_mask()), - _use_colors(getenv("RUST_COLOR_LOG")), _indent(0) { + _srv(srv), + _dom(dom), + _type_bit_mask(read_type_bit_mask()), + _use_colors(getenv("RUST_COLOR_LOG")), + _indent(0) { } rust_log::~rust_log() { } -void rust_log::trace_ln(char *message) { - char buffer[512]; - if (_use_colors) { - snprintf(buffer, sizeof(buffer), "\x1b%s0x%08" PRIxPTR "\x1b[0m: ", - _foreground_colors[1 + ((uintptr_t) _dom % 2687 % (LIGHTTEAL - - 1))], (uintptr_t) _dom); - } else { - snprintf(buffer, sizeof(buffer), "0x%08" PRIxPTR ": ", - (uintptr_t) _dom); - } +const uint16_t +hash(uintptr_t ptr) { + // Robert Jenkins' 32 bit integer hash function + ptr = (ptr + 0x7ed55d16) + (ptr << 12); + ptr = (ptr ^ 0xc761c23c) ^ (ptr >> 19); + ptr = (ptr + 0x165667b1) + (ptr << 5); + ptr = (ptr + 0xd3a2646c) ^ (ptr << 9); + ptr = (ptr + 0xfd7046c5) + (ptr << 3); + ptr = (ptr ^ 0xb55a4f09) ^ (ptr >> 16); + return (uint16_t) ptr; +} - for (uint32_t i = 0; i < _indent; i++) { - strncat(buffer, "\t", sizeof(buffer) - strlen(buffer) - 1); +const char * +get_color(uintptr_t ptr) { + return _foreground_colors[hash(ptr) % rust_log::LIGHTTEAL]; +} + +char * +copy_string(char *dst, const char *src, size_t length) { + return strncpy(dst, src, length) + length; +} + +char * +append_string(char *buffer, const char *format, ...) { + if (buffer != NULL && format) { + va_list args; + va_start(args, format); + vsprintf(buffer + strlen(buffer), format, args); + va_end(args); } - strncat(buffer, message, sizeof(buffer) - strlen(buffer) - 1); + return buffer; +} + +char * +append_string(char *buffer, rust_log::ansi_color color, + const char *format, ...) { + if (buffer != NULL && format) { + append_string(buffer, "\x1b%s", _foreground_colors[color]); + va_list args; + va_start(args, format); + vsprintf(buffer + strlen(buffer), format, args); + va_end(args); + append_string(buffer, "\x1b[0m"); + } + return buffer; +} + +void +rust_log::trace_ln(char *prefix, char *message) { + char buffer[1024] = ""; + _log_lock.lock(); + append_string(buffer, "%-34s", prefix); + for (uint32_t i = 0; i < _indent; i++) { + append_string(buffer, " "); + } + append_string(buffer, "%s", message); _srv->log(buffer); + _log_lock.unlock(); +} + +void +rust_log::trace_ln(rust_task *task, char *message) { +#if defined(__WIN32__) + uint32_t thread_id = 0; +#else + uint32_t thread_id = (uint32_t) pthread_self(); +#endif + char prefix[1024] = ""; + append_string(prefix, "0x%08" PRIxPTR ":0x%08" PRIxPTR ":", + thread_id, (uintptr_t) _dom); + if (task) { + append_string(prefix, "0x%08" PRIxPTR ":", (uintptr_t) task); + } + trace_ln(prefix, message); } /** * Traces a log message if the specified logging type is not filtered. */ -void rust_log::trace_ln(uint32_t type_bits, char *message) { - trace_ln(get_type_color((rust_log::log_type) type_bits), type_bits, - message); +void +rust_log::trace_ln(rust_task *task, uint32_t type_bits, char *message) { + trace_ln(task, get_type_color((rust_log::log_type) type_bits), + type_bits, message); } /** * Traces a log message using the specified ANSI color code. */ -void rust_log::trace_ln(ansi_color color, uint32_t type_bits, char *message) { +void +rust_log::trace_ln(rust_task *task, ansi_color color, + uint32_t type_bits, char *message) { if (is_tracing(type_bits)) { if (_use_colors) { - char buffer[512]; - snprintf(buffer, sizeof(buffer), "\x1b%s%s\x1b[0m", - _foreground_colors[color], message); - trace_ln(buffer); + char buffer[512] = ""; + append_string(buffer, color, "%s", message); + trace_ln(task, buffer); } else { - trace_ln(message); + trace_ln(task, message); } } } -bool rust_log::is_tracing(uint32_t type_bits) { +bool +rust_log::is_tracing(uint32_t type_bits) { return type_bits & _type_bit_mask; } -void rust_log::indent() { +void +rust_log::indent() { _indent++; } -void rust_log::outdent() { +void +rust_log::outdent() { _indent--; } -void rust_log::reset_indent(uint32_t indent) { +void +rust_log::reset_indent(uint32_t indent) { _indent = indent; } diff --git a/src/rt/rust_log.h b/src/rt/rust_log.h index bd32c1550e4..06712066b1b 100644 --- a/src/rt/rust_log.h +++ b/src/rt/rust_log.h @@ -1,22 +1,18 @@ -#ifndef RUST_LOG_H_ -#define RUST_LOG_H_ +#ifndef RUST_LOG_H +#define RUST_LOG_H class rust_dom; +class rust_task; + + class rust_log { - rust_srv *_srv; - rust_dom *_dom; - uint32_t _type_bit_mask; - bool _use_colors; - uint32_t _indent; - void trace_ln(char *message); + public: rust_log(rust_srv *srv, rust_dom *dom); virtual ~rust_log(); enum ansi_color { - BLACK, - GRAY, WHITE, RED, LIGHTRED, @@ -51,10 +47,19 @@ public: void indent(); void outdent(); void reset_indent(uint32_t indent); - void trace_ln(uint32_t type_bits, char *message); - void trace_ln(ansi_color color, uint32_t type_bits, char *message); + void trace_ln(char *prefix, char *message); + void trace_ln(rust_task *task, uint32_t type_bits, char *message); + void trace_ln(rust_task *task, ansi_color color, uint32_t type_bits, char *message); bool is_tracing(uint32_t type_bits); - static ansi_color get_type_color(log_type type); + +private: + rust_srv *_srv; + rust_dom *_dom; + uint32_t _type_bit_mask; + bool _use_labels; + bool _use_colors; + uint32_t _indent; + void trace_ln(rust_task *task, char *message); }; -#endif /* RUST_LOG_H_ */ +#endif /* RUST_LOG_H */ diff --git a/src/rt/rust_proxy.h b/src/rt/rust_proxy.h new file mode 100644 index 00000000000..8059dd89d42 --- /dev/null +++ b/src/rt/rust_proxy.h @@ -0,0 +1,31 @@ +/** + * A proxy object is a wrapper around other Rust objects. One use of the proxy + * object is to mitigate access between tasks in different thread domains. + */ + +#ifndef RUST_PROXY_H +#define RUST_PROXY_H + +template struct +rust_proxy_delegate : public rc_base { +protected: + T *_delegate; +public: + rust_proxy_delegate(T * delegate) : _delegate(delegate) { + } + T *delegate() { return _delegate; } +}; + +template struct +rust_proxy : public rust_proxy_delegate, + public dom_owned > { +public: + rust_dom *dom; + rust_proxy(rust_dom *dom, T *delegate) : + rust_proxy_delegate (delegate), + dom(dom) { + delegate->ref(); + } +}; + +#endif /* RUST_PROXY_H */ diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 7c92c4ca7cd..357edbf187d 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -53,6 +53,7 @@ align_down(uintptr_t sp) rust_task::rust_task(rust_dom *dom, rust_task *spawner) : + rust_proxy_delegate(this), stk(new_stk(dom, 0)), runtime_sp(0), rust_sp(stk->limit), @@ -61,20 +62,24 @@ rust_task::rust_task(rust_dom *dom, rust_task *spawner) : cache(NULL), state(&dom->running_tasks), cond(NULL), - dptr(0), supervisor(spawner), idx(0), waiting_tasks(dom), + rendezvous_ptr(0), alarm(this) { dom->logptr("new task", (uintptr_t)this); + + if (spawner == NULL) { + ref_count = 0; + } } rust_task::~rust_task() { dom->log(rust_log::MEM|rust_log::TASK, "~rust_task 0x%" PRIxPTR ", refcnt=%d", - (uintptr_t)this, refcnt); + (uintptr_t)this, ref_count); /* for (uintptr_t fp = get_fp(); fp; fp = get_previous_fp(fp)) { @@ -98,8 +103,8 @@ rust_task::~rust_task() /* FIXME: tighten this up, there are some more assertions that hold at task-lifecycle events. */ - I(dom, refcnt == 0 || - (refcnt == 1 && this == dom->root_task)); + I(dom, ref_count == 0 || + (ref_count == 1 && this == dom->root_task)); del_stk(dom, stk); if (cache) @@ -275,9 +280,9 @@ rust_task::run_after_return(size_t nargs, uintptr_t glue) uintptr_t *retpc = ((uintptr_t *) sp) - 1; dom->log(rust_log::TASK|rust_log::MEM, - "run_after_return: overwriting retpc=0x%" PRIxPTR - " @ runtime_sp=0x%" PRIxPTR - " with glue=0x%" PRIxPTR, + "run_after_return: overwriting retpc=x%" PRIxPTR + " @ runtime_sp=x%" PRIxPTR + " with glue=x%" PRIxPTR, *retpc, sp, glue); // Move the current return address (which points into rust code) @@ -296,9 +301,9 @@ rust_task::run_on_resume(uintptr_t glue) uintptr_t* rsp = (uintptr_t*) rust_sp; rsp += n_callee_saves; dom->log(rust_log::TASK|rust_log::MEM, - "run_on_resume: overwriting retpc=0x%" PRIxPTR - " @ rust_sp=0x%" PRIxPTR - " with glue=0x%" PRIxPTR, + "run_on_resume: overwriting retpc=x%" PRIxPTR + " @ rust_sp=x%" PRIxPTR + " with glue=x%" PRIxPTR, *rsp, rsp, glue); *rsp = glue; } @@ -306,8 +311,8 @@ rust_task::run_on_resume(uintptr_t glue) void rust_task::yield(size_t nargs) { - dom->log(rust_log::TASK, - "task 0x%" PRIxPTR " yielding", this); + log(rust_log::TASK, + "task 0x%" PRIxPTR " yielding", this); run_after_return(nargs, dom->root_crate->get_yield_glue()); } @@ -322,11 +327,13 @@ rust_task::kill() { // Note the distinction here: kill() is when you're in an upcall // from task A and want to force-fail task B, you do B->kill(). // If you want to fail yourself you do self->fail(upcall_nargs). - dom->log(rust_log::TASK, "killing task 0x%" PRIxPTR, this); + log(rust_log::TASK, "killing task 0x%" PRIxPTR, this); // Unblock the task so it can unwind. unblock(); + if (this == dom->root_task) dom->fail(); + run_on_resume(dom->root_crate->get_unwind_glue()); } @@ -369,9 +376,12 @@ void rust_task::notify_waiting_tasks() { while (waiting_tasks.length() > 0) { - rust_task *t = waiting_tasks.pop()->receiver; - if (!t->dead()) - t->wakeup(this); + log(rust_log::ALL, "notify_waiting_tasks: %d", + waiting_tasks.length()); + rust_task *waiting_task = waiting_tasks.pop()->receiver; + if (!waiting_task->dead()) { + waiting_task->wakeup(this); + } } } @@ -532,6 +542,9 @@ void rust_task::wakeup(rust_cond *from) { transition(&dom->blocked_tasks, &dom->running_tasks); + // TODO: Signaling every time the task is awaken is kind of silly, + // do this a nicer way. + dom->_progress.signal(); I(dom, cond == from); } @@ -565,6 +578,18 @@ rust_task::get_crate_cache(rust_crate const *curr_crate) return cache; } +void +rust_task::log(uint32_t type_bits, char const *fmt, ...) { + char buf[256]; + if (dom->get_log().is_tracing(type_bits)) { + va_list args; + va_start(args, fmt); + vsnprintf(buf, sizeof(buf), fmt, args); + dom->get_log().trace_ln(this, type_bits, buf); + va_end(args); + } +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h new file mode 100644 index 00000000000..879cb61ae3c --- /dev/null +++ b/src/rt/rust_task.h @@ -0,0 +1,107 @@ +/* + * + */ + +#ifndef RUST_TASK_H +#define RUST_TASK_H +struct +rust_task : public rust_proxy_delegate, + public dom_owned, + public rust_cond +{ + // Fields known to the compiler. + stk_seg *stk; + uintptr_t runtime_sp; // Runtime sp while task running. + uintptr_t rust_sp; // Saved sp when not running. + gc_alloc *gc_alloc_chain; // Linked list of GC allocations. + rust_dom *dom; + rust_crate_cache *cache; + + // Fields known only to the runtime. + ptr_vec *state; + rust_cond *cond; + rust_task *supervisor; // Parent-link for failure propagation. + size_t idx; + size_t gc_alloc_thresh; + size_t gc_alloc_accum; + + // Wait queue for tasks waiting for this task. + rust_wait_queue waiting_tasks; + + // Rendezvous pointer for receiving data when blocked on a port. If we're + // trying to read data and no data is available on any incoming channel, + // we block on the port, and yield control to the scheduler. Since, we + // were not able to read anJything, we remember the location where the + // result should go in the rendezvous_ptr, and let the sender write to + // that location before waking us up. + uintptr_t* rendezvous_ptr; + + rust_alarm alarm; + + rust_task(rust_dom *dom, + rust_task *spawner); + ~rust_task(); + + void start(uintptr_t exit_task_glue, + uintptr_t spawnee_fn, + uintptr_t args, + size_t callsz); + void grow(size_t n_frame_bytes); + bool running(); + bool blocked(); + bool blocked_on(rust_cond *cond); + bool dead(); + + void link_gc(gc_alloc *gcm); + void unlink_gc(gc_alloc *gcm); + void *malloc(size_t sz, type_desc *td=0); + void *realloc(void *data, size_t sz, bool gc_mem=false); + void free(void *p, bool gc_mem=false); + + const char *state_str(); + void transition(ptr_vec *svec, ptr_vec *dvec); + + void block(rust_cond *on); + void wakeup(rust_cond *from); + void die(); + void unblock(); + + void check_active() { I(dom, dom->curr_task == this); } + void check_suspended() { I(dom, dom->curr_task != this); } + + void log(uint32_t type_bits, char const *fmt, ...); + + // Swap in some glue code to run when we have returned to the + // task's context (assuming we're the active task). + void run_after_return(size_t nargs, uintptr_t glue); + + // Swap in some glue code to run when we're next activated + // (assuming we're the suspended task). + void run_on_resume(uintptr_t glue); + + // Save callee-saved registers and return to the main loop. + void yield(size_t nargs); + + // Fail this task (assuming caller-on-stack is different task). + void kill(); + + // Fail self, assuming caller-on-stack is this task. + void fail(size_t nargs); + + // Run the gc glue on the task stack. + void gc(size_t nargs); + + // Disconnect from our supervisor. + void unsupervise(); + + // Notify tasks waiting for us that we are about to die. + void notify_waiting_tasks(); + + uintptr_t get_fp(); + uintptr_t get_previous_fp(uintptr_t fp); + frame_glue_fns *get_frame_glue_fns(uintptr_t fp); + rust_crate_cache * get_crate_cache(rust_crate const *curr_crate); +}; + + +#endif /* RUST_TASK_H */ diff --git a/src/rt/rust_timer.cpp b/src/rt/rust_timer.cpp index fdee30758fc..269942f5f80 100644 --- a/src/rt/rust_timer.cpp +++ b/src/rt/rust_timer.cpp @@ -1,4 +1,3 @@ - #include "rust_internal.h" #include "valgrind.h" @@ -27,12 +26,11 @@ static void * #else #error "Platform not supported" #endif -timer_loop(void *ptr) -{ +timer_loop(void *ptr) { // We were handed the rust_timer that owns us. rust_timer *timer = (rust_timer *)ptr; - rust_dom &dom = timer->dom; - dom.log(rust_log::TIMER, "in timer 0x%" PRIxPTR, (uintptr_t)timer); + rust_dom *dom = timer->dom; + dom->log(rust_log::TIMER, "in timer 0x%" PRIxPTR, (uintptr_t)timer); size_t ms = TIME_SLICE_IN_MS; if (!RUNNING_ON_VALGRIND) ms = 1; @@ -43,12 +41,10 @@ timer_loop(void *ptr) #else usleep(ms * 1000); #endif - dom.log(rust_log::TIMER, - "timer 0x%" PRIxPTR - " interrupting domain 0x%" PRIxPTR, - (uintptr_t)timer, - (uintptr_t)&dom); - dom.interrupt_flag = 1; + dom->log(rust_log::TIMER, "timer 0x%" PRIxPTR + " interrupting domain 0x%" PRIxPTR, (uintptr_t) timer, + (uintptr_t) dom); + dom->interrupt_flag = 1; } #if defined(__WIN32__) ExitThread(0); @@ -58,10 +54,9 @@ timer_loop(void *ptr) return 0; } - -rust_timer::rust_timer(rust_dom &dom) : dom(dom), exit_flag(0) -{ - dom.log(rust_log::TIMER, "creating timer for domain 0x%" PRIxPTR, &dom); +rust_timer::rust_timer(rust_dom *dom) : + dom(dom), exit_flag(0) { + dom->log(rust_log::TIMER, "creating timer for domain 0x%" PRIxPTR, dom); #if defined(__WIN32__) thread = CreateThread(NULL, 0, timer_loop, this, 0, NULL); dom.win32_require("CreateThread", thread != NULL); @@ -76,13 +71,11 @@ rust_timer::rust_timer(rust_dom &dom) : dom(dom), exit_flag(0) #endif } -rust_timer::~rust_timer() -{ +rust_timer::~rust_timer() { exit_flag = 1; #if defined(__WIN32__) - dom.win32_require("WaitForSingleObject", - WaitForSingleObject(thread, INFINITE) - == WAIT_OBJECT_0); + dom->win32_require("WaitForSingleObject", + WaitForSingleObject(thread, INFINITE) == WAIT_OBJECT_0); #else pthread_join(thread, NULL); #endif diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 99df91a6dc0..1aaf89fba97 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -1,127 +1,120 @@ - #include "rust_internal.h" - // Upcalls. #ifdef __GNUC__ #define LOG_UPCALL_ENTRY(task) \ (task)->dom->get_log().reset_indent(0); \ - (task)->dom->log(rust_log::UPCALL, \ - "upcall task: 0x%" PRIxPTR \ - " retpc: 0x%" PRIxPTR, \ - (task), __builtin_return_address(0)); \ + (task)->log(rust_log::UPCALL, \ + "> UPCALL %s - task: 0x%" PRIxPTR \ + " retpc: x%" PRIxPTR, \ + __FUNCTION__, \ + (task), __builtin_return_address(0)); \ (task)->dom->get_log().indent(); #else #define LOG_UPCALL_ENTRY(task) \ (task)->dom->get_log().reset_indent(0); \ - (task)->dom->log(rust_log::UPCALL, \ - "upcall task: 0x%" PRIxPTR (task)); \ + (task)->log(rust_log::UPCALL, \ + "> UPCALL task: x%" PRIxPTR (task)); \ (task)->dom->get_log().indent(); #endif extern "C" CDECL char const *str_buf(rust_task *task, rust_str *s); -extern "C" void -upcall_grow_task(rust_task *task, size_t n_frame_bytes) -{ +inline bool +requires_message_passing(rust_task *sender, rust_task *receiver) { + return sender->dom != receiver->dom; +} + +extern "C" void upcall_grow_task(rust_task *task, size_t n_frame_bytes) { LOG_UPCALL_ENTRY(task); task->grow(n_frame_bytes); } -extern "C" CDECL void -upcall_log_int(rust_task *task, int32_t i) -{ +extern "C" CDECL void upcall_log_int(rust_task *task, int32_t i) { LOG_UPCALL_ENTRY(task); - task->dom->log(rust_log::UPCALL|rust_log::ULOG, - "upcall log_int(0x%" PRIx32 " = %" PRId32 " = '%c')", - i, i, (char)i); + task->log(rust_log::UPCALL | rust_log::ULOG, + "upcall log_int(0x%" PRIx32 " = %" PRId32 " = '%c')", i, i, + (char) i); } -extern "C" CDECL void -upcall_log_str(rust_task *task, rust_str *str) -{ +extern "C" CDECL void upcall_log_str(rust_task *task, rust_str *str) { LOG_UPCALL_ENTRY(task); const char *c = str_buf(task, str); - task->dom->log(rust_log::UPCALL|rust_log::ULOG, - "upcall log_str(\"%s\")", - c); + task->log(rust_log::UPCALL | rust_log::ULOG, "upcall log_str(\"%s\")", c); } -extern "C" CDECL void -upcall_trace_word(rust_task *task, uintptr_t i) -{ +extern "C" CDECL void upcall_trace_word(rust_task *task, uintptr_t i) { LOG_UPCALL_ENTRY(task); - task->dom->log(rust_log::UPCALL|rust_log::TRACE, - "trace: 0x%" PRIxPTR "", - i, i, (char)i); + task->log(rust_log::UPCALL | rust_log::TRACE, "trace: 0x%" PRIxPTR "", i, + i, (char) i); } -extern "C" CDECL void -upcall_trace_str(rust_task *task, char const *c) -{ +extern "C" CDECL void upcall_trace_str(rust_task *task, char const *c) { LOG_UPCALL_ENTRY(task); - task->dom->log(rust_log::UPCALL|rust_log::TRACE, - "trace: %s", - c); + task->log(rust_log::UPCALL | rust_log::TRACE, "trace: %s", c); } extern "C" CDECL rust_port* -upcall_new_port(rust_task *task, size_t unit_sz) -{ +upcall_new_port(rust_task *task, size_t unit_sz) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::COMM, - "upcall_new_port(task=0x%" PRIxPTR ", unit_sz=%d)", - (uintptr_t)task, unit_sz); + task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, + "upcall_new_port(task=0x%" PRIxPTR ", unit_sz=%d)", + (uintptr_t) task, unit_sz); return new (dom) rust_port(task, unit_sz); } -extern "C" CDECL void -upcall_del_port(rust_task *task, rust_port *port) -{ +extern "C" CDECL void upcall_del_port(rust_task *task, rust_port *port) { LOG_UPCALL_ENTRY(task); - task->dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::COMM, - "upcall del_port(0x%" PRIxPTR ")", (uintptr_t)port); - I(task->dom, !port->refcnt); + task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, + "upcall del_port(0x%" PRIxPTR ")", (uintptr_t) port); + I(task->dom, !port->ref_count); delete port; } +/** + * Creates a new channel, pointed to a specified port. + */ extern "C" CDECL rust_chan* -upcall_new_chan(rust_task *task, rust_port *port) -{ +upcall_new_chan(rust_task *task, rust_port *port) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::COMM, - "upcall_new_chan(task=0x%" PRIxPTR ", port=0x%" PRIxPTR ")", - (uintptr_t)task, port); + task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, + "upcall_new_chan(task=0x%" PRIxPTR ", port=0x%" PRIxPTR ")", + (uintptr_t) task, port); I(dom, port); return new (dom) rust_chan(task, port); } -extern "C" CDECL void -upcall_del_chan(rust_task *task, rust_chan *chan) -{ +/** + * Called whenever the channel's ref count drops to zero. + */ +extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::COMM, - "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t)chan); - I(dom, !chan->refcnt); + task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, + "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); + I(dom, !chan->ref_count); delete chan; } +/** + * Clones a channel and stores it in the spawnee's domain. Each spawned task + * has it's own copy of the channel. + */ extern "C" CDECL rust_chan * -upcall_clone_chan(rust_task *task, rust_task *owner, rust_chan *chan) -{ +upcall_clone_chan(rust_task *task, + rust_proxy_delegate *spawnee_proxy, + rust_chan *chan) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::COMM, - "upcall clone_chan(owner 0x%" PRIxPTR ", chan 0x%" PRIxPTR ")", - (uintptr_t)owner, (uintptr_t)chan); - return new (owner->dom) rust_chan(owner, chan->port); + rust_task *spawnee = spawnee_proxy->delegate(); + task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, + "spawnee: 0x%" PRIxPTR ", chan: 0x%" PRIxPTR, + (uintptr_t) spawnee, (uintptr_t) chan); + return new (spawnee->dom) rust_chan(spawnee, chan->port); } - /* * Buffering protocol: * @@ -143,74 +136,64 @@ upcall_clone_chan(rust_task *task, rust_task *owner, rust_chan *chan) * - Set blocked writer to running * */ +// +//static int +//attempt_transmission(rust_dom *dom, rust_chan *src, rust_task *dst) { +// I(dom, src); +// I(dom, dst); +// +// rust_port *port = src->port; +// if (!port) { +// dom->log(rust_log::COMM, "src died, transmission incomplete"); +// return 0; +// } +// +// circular_buffer *buf = &src->buffer; +// if (buf->is_empty()) { +// dom->log(rust_log::COMM, "buffer empty, transmission incomplete"); +// return 0; +// } +// +// if (!dst->blocked_on(port)) { +// dom->log(rust_log::COMM, +// "dst in non-reading state, transmission incomplete"); +// return 0; +// } +// +// uintptr_t *dptr = dst->dptr; +// dom->log(rust_log::COMM, "receiving %d bytes into dst_task=0x%" PRIxPTR +// ", dptr=0x%" PRIxPTR, port->unit_sz, dst, dptr); +// buf->dequeue(dptr); +// +// // Wake up the sender if its waiting for the send operation. +// rust_task *sender = src->task; +// rust_token *token = &src->token; +// if (sender->blocked_on(token)) +// sender->wakeup(token); +// +// // Wake up the receiver, there is new data. +// dst->wakeup(port); +// +// dom->log(rust_log::COMM, "transmission complete"); +// return 1; +//} -static int -attempt_transmission(rust_dom *dom, - rust_chan *src, - rust_task *dst) -{ - I(dom, src); - I(dom, dst); - - rust_port *port = src->port; - if (!port) { - dom->log(rust_log::COMM, - "src died, transmission incomplete"); - return 0; - } - - circ_buf *buf = &src->buffer; - if (buf->unread == 0) { - dom->log(rust_log::COMM, - "buffer empty, transmission incomplete"); - return 0; - } - - if (!dst->blocked_on(port)) { - dom->log(rust_log::COMM, - "dst in non-reading state, transmission incomplete"); - return 0; - } - - uintptr_t *dptr = dst->dptr; - dom->log(rust_log::COMM, - "receiving %d bytes into dst_task=0x%" PRIxPTR - ", dptr=0x%" PRIxPTR, - port->unit_sz, dst, dptr); - buf->shift(dptr); - - // Wake up the sender if its waiting for the send operation. - rust_task *sender = src->task; - rust_token *token = &src->token; - if (sender->blocked_on(token)) - sender->wakeup(token); - - // Wake up the receiver, there is new data. - dst->wakeup(port); - - dom->log(rust_log::COMM, "transmission complete"); - return 1; -} - -extern "C" CDECL void -upcall_yield(rust_task *task) -{ +extern "C" CDECL void upcall_yield(rust_task *task) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::COMM, "upcall yield()"); + task->log(rust_log::UPCALL | rust_log::COMM, "upcall yield()"); task->yield(1); } -extern "C" CDECL void -upcall_join(rust_task *task, rust_task *other) -{ +extern "C" CDECL void upcall_join(rust_task *task, + rust_proxy_delegate *proxy) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::COMM, - "upcall join(other=0x%" PRIxPTR ")", - (uintptr_t)other); + task->log(rust_log::UPCALL | rust_log::COMM, + "join proxy 0x%" PRIxPTR " -> task = 0x%" PRIxPTR, + proxy, proxy->delegate()); - // If the other task is already dying, we dont have to wait for it. + rust_task *other = proxy->delegate(); + + // If the other task is already dying, we don't have to wait for it. if (!other->dead()) { other->waiting_tasks.push(&task->alarm); task->block(other); @@ -218,106 +201,91 @@ upcall_join(rust_task *task, rust_task *other) } } +/** + * Sends an chunk of data along the specified channel. + * + * sptr: pointer to a chunk of data to send + */ extern "C" CDECL void -upcall_send(rust_task *task, rust_chan *chan, void *sptr) -{ +upcall_send(rust_task *task, rust_chan *chan, void *sptr) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::COMM, - "upcall send(chan=0x%" PRIxPTR ", sptr=0x%" PRIxPTR ")", - (uintptr_t)chan, - (uintptr_t)sptr); + task->log(rust_log::UPCALL | rust_log::COMM, + "chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d", + (uintptr_t) chan, (uintptr_t) sptr, chan->port->unit_sz); - I(dom, chan); - I(dom, sptr); - - rust_port *port = chan->port; - dom->log(rust_log::MEM|rust_log::COMM, - "send to port", (uintptr_t)port); - I(dom, port); - - rust_token *token = &chan->token; - dom->log(rust_log::MEM|rust_log::COMM, - "sending via token 0x%" PRIxPTR, - (uintptr_t)token); - - if (port->task) { - chan->buffer.push(sptr); - task->block(token); - attempt_transmission(dom, chan, port->task); - if (chan->buffer.unread && !token->pending()) - token->submit(); - } else { - dom->log(rust_log::COMM|rust_log::ERR, - "port has no task (possibly throw?)"); - } - - if (!task->running()) - task->yield(3); + chan->buffer.enqueue(sptr); + chan->transmit(); + task->log(rust_log::COMM, "=== WROTE DATA ===>"); } extern "C" CDECL void -upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) -{ +upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::COMM, - "upcall recv(dptr=0x%" PRIxPTR ", port=0x%" PRIxPTR ")", - (uintptr_t)dptr, - (uintptr_t)port); + task->log(rust_log::UPCALL | rust_log::COMM, + "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR + ", size: 0x%" PRIxPTR ", chan_no: %d", + (uintptr_t) port, (uintptr_t) dptr, port->unit_sz, + port->chans.length()); - I(dom, port); - I(dom, port->task); - I(dom, task); - I(dom, port->task == task); + for (uint32_t i = 0; i < port->chans.length(); i++) { + rust_chan *chan = port->chans[i]; + if (chan->buffer.is_empty() == false) { + chan->buffer.dequeue(dptr); + task->log(rust_log::COMM, "<=== READ DATA ==="); + return; + } + } + // No data was buffered on any incoming channel, so block this task + // on the port. Remember the rendezvous location so that any sender + // task can write to it before waking up this task. + + task->rendezvous_ptr = dptr; task->block(port); - - if (port->writers.length() > 0) { - I(dom, task->dom); - size_t i = rand(&dom->rctx); - i %= port->writers.length(); - rust_token *token = port->writers[i]; - rust_chan *chan = token->chan; - if (attempt_transmission(dom, chan, task)) - token->withdraw(); - } else { - dom->log(rust_log::COMM, - "no writers sending to port", (uintptr_t)port); - } - - if (!task->running()) { - task->dptr = dptr; - task->yield(3); - } + task->yield(3); } -extern "C" CDECL void -upcall_fail(rust_task *task, char const *expr, char const *file, size_t line) -{ +extern "C" CDECL void upcall_fail(rust_task *task, char const *expr, + char const *file, size_t line) { LOG_UPCALL_ENTRY(task); - task->dom->log(rust_log::UPCALL|rust_log::ERR, - "upcall fail '%s', %s:%" PRIdPTR, - expr, file, line); + task->log(rust_log::UPCALL | rust_log::ERR, + "upcall fail '%s', %s:%" PRIdPTR, expr, file, line); task->fail(4); } +/** + * Called whenever a task's ref count drops to zero. + */ extern "C" CDECL void -upcall_kill(rust_task *task, rust_task *target) -{ +upcall_kill(rust_task *task, rust_proxy_delegate *target_proxy) { LOG_UPCALL_ENTRY(task); - task->dom->log(rust_log::UPCALL|rust_log::TASK, - "upcall kill target=0x%" PRIxPTR, target); - target->kill(); + rust_task *target_task = target_proxy->delegate(); + if (target_proxy != target_task) { + task->dom->free(target_proxy); + } + task->log(rust_log::UPCALL | rust_log::TASK, + "kill task 0x%" PRIxPTR ", ref count %d", + target_task, + target_task->ref_count); + + if (requires_message_passing(task, target_task)) { + rust_dom *target_domain = target_task->dom; + target_domain->send_message( + new (target_domain) + kill_task_message(target_domain, target_task)); + } else { + target_task->kill(); + } } +/** + * Called by the exit glue when the task terminates. + */ extern "C" CDECL void -upcall_exit(rust_task *task) -{ +upcall_exit(rust_task *task) { LOG_UPCALL_ENTRY(task); - - rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::TASK, "upcall exit"); + task->log(rust_log::UPCALL | rust_log::TASK, + "task ref_count: %d", task->ref_count); task->die(); task->notify_waiting_tasks(); task->yield(1); @@ -341,11 +309,13 @@ upcall_malloc(rust_task *task, size_t nbytes, type_desc *td) return (uintptr_t) p; } +/** + * Called whenever an object's ref count drops to zero. + */ extern "C" CDECL void upcall_free(rust_task *task, void* ptr, uintptr_t is_gc) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; dom->log(rust_log::UPCALL|rust_log::MEM, "upcall free(0x%" PRIxPTR ")", @@ -371,22 +341,19 @@ upcall_mark(rust_task *task, void* ptr) } extern "C" CDECL rust_str * -upcall_new_str(rust_task *task, char const *s, size_t fill) -{ +upcall_new_str(rust_task *task, char const *s, size_t fill) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::MEM, - "upcall new_str('%s', %" PRIdPTR ")", s, fill); size_t alloc = next_power_of_two(sizeof(rust_str) + fill); void *mem = dom->malloc(alloc); if (!mem) { task->fail(3); return NULL; } - rust_str *st = new (mem) rust_str(dom, alloc, fill, (uint8_t const *)s); - dom->log(rust_log::UPCALL|rust_log::MEM, - "upcall new_str('%s', %" PRIdPTR ") = 0x%" PRIxPTR, - s, fill, st); + rust_str *st = new (mem) rust_str(dom, alloc, fill, (uint8_t const *) s); + task->log(rust_log::UPCALL | rust_log::MEM, + "upcall new_str('%s', %" PRIdPTR ") = 0x%" PRIxPTR, + s, fill, st); return st; } @@ -405,34 +372,32 @@ upcall_new_vec(rust_task *task, size_t fill, type_desc *td) return NULL; } rust_vec *v = new (mem) rust_vec(dom, alloc, 0, NULL); - dom->log(rust_log::UPCALL|rust_log::MEM, - "upcall new_vec(%" PRIdPTR ") = 0x%" PRIxPTR, - fill, v); + task->log(rust_log::UPCALL | rust_log::MEM, + "upcall new_vec(%" PRIdPTR ") = 0x%" PRIxPTR, fill, v); return v; } - extern "C" CDECL rust_str * upcall_vec_grow(rust_task *task, rust_vec *v, size_t n_bytes, uintptr_t is_gc) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::MEM, + task->log(rust_log::UPCALL|rust_log::MEM, "upcall vec_grow(%" PRIxPTR ", %" PRIdPTR "), alloc=%" PRIdPTR ", fill=%" PRIdPTR, v, n_bytes, v->alloc, v->fill); size_t alloc = next_power_of_two(sizeof(rust_vec) + v->fill + n_bytes); - if (v->refcnt == 1) { + if (v->ref_count == 1) { // Fastest path: already large enough. if (v->alloc >= alloc) { - dom->log(rust_log::UPCALL|rust_log::MEM, "no-growth path"); + task->log(rust_log::UPCALL | rust_log::MEM, "no-growth path"); return v; } // Second-fastest path: can at least realloc. - dom->log(rust_log::UPCALL|rust_log::MEM, "realloc path"); - v = (rust_vec*)dom->realloc(v, alloc); + task->log(rust_log::UPCALL | rust_log::MEM, "realloc path"); + v = (rust_vec*) dom->realloc(v, alloc); if (!v) { task->fail(4); return NULL; @@ -441,7 +406,7 @@ upcall_vec_grow(rust_task *task, rust_vec *v, size_t n_bytes, uintptr_t is_gc) } else { // Slowest path: make a new vec. - dom->log(rust_log::UPCALL|rust_log::MEM, "new vec path"); + task->log(rust_log::UPCALL | rust_log::MEM, "new vec path"); void *mem = dom->malloc(alloc); if (!mem) { task->fail(4); @@ -454,121 +419,100 @@ upcall_vec_grow(rust_task *task, rust_vec *v, size_t n_bytes, uintptr_t is_gc) return v; } - static rust_crate_cache::c_sym * -fetch_c_sym(rust_task *task, - rust_crate const *curr_crate, - size_t lib_num, - size_t c_sym_num, - char const *library, - char const *symbol) -{ +fetch_c_sym(rust_task *task, rust_crate const *curr_crate, size_t lib_num, + size_t c_sym_num, char const *library, char const *symbol) { rust_crate_cache *cache = task->get_crate_cache(curr_crate); rust_crate_cache::lib *l = cache->get_lib(lib_num, library); return cache->get_c_sym(c_sym_num, l, symbol); } -extern "C" CDECL uintptr_t -upcall_require_rust_sym(rust_task *task, - rust_crate const *curr_crate, - size_t lib_num, // # of lib - size_t c_sym_num, // # of C sym "rust_crate" in lib - size_t rust_sym_num, // # of rust sym - char const *library, - char const **path) -{ +extern "C" CDECL uintptr_t upcall_require_rust_sym(rust_task *task, + rust_crate const *curr_crate, size_t lib_num, // # of lib + size_t c_sym_num, // # of C sym "rust_crate" in lib + size_t rust_sym_num, // # of rust sym + char const *library, char const **path) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::CACHE, - "upcall require rust sym: lib #%" PRIdPTR - " = %s, c_sym #%" PRIdPTR - ", rust_sym #%" PRIdPTR - ", curr_crate = 0x%" PRIxPTR, - lib_num, library, c_sym_num, rust_sym_num, - curr_crate); + task->log(rust_log::UPCALL | rust_log::CACHE, + "upcall require rust sym: lib #%" PRIdPTR + " = %s, c_sym #%" PRIdPTR + ", rust_sym #%" PRIdPTR + ", curr_crate = 0x%" PRIxPTR, lib_num, library, c_sym_num, + rust_sym_num, curr_crate); for (char const **c = crate_rel(curr_crate, path); *c; ++c) { - dom->log(rust_log::UPCALL, " + %s", crate_rel(curr_crate, *c)); + task->log(rust_log::UPCALL, " + %s", crate_rel(curr_crate, *c)); } - dom->log(rust_log::UPCALL|rust_log::CACHE, - "require C symbol 'rust_crate' from lib #%" PRIdPTR,lib_num); + task->log(rust_log::UPCALL | rust_log::CACHE, + "require C symbol 'rust_crate' from lib #%" PRIdPTR, lib_num); rust_crate_cache::c_sym *c = - fetch_c_sym(task, curr_crate, lib_num, c_sym_num, - library, "rust_crate"); + fetch_c_sym(task, curr_crate, lib_num, c_sym_num, library, + "rust_crate"); - dom->log(rust_log::UPCALL|rust_log::CACHE, - "require rust symbol inside crate"); - rust_crate_cache::rust_sym *s = - task->cache->get_rust_sym(rust_sym_num, dom, curr_crate, c, path); + task->log(rust_log::UPCALL | rust_log::CACHE, + "require rust symbol inside crate"); + rust_crate_cache::rust_sym *s = task->cache->get_rust_sym(rust_sym_num, + dom, + curr_crate, c, + path); uintptr_t addr = s->get_val(); if (addr) { - dom->log(rust_log::UPCALL|rust_log::CACHE, - "found-or-cached addr: 0x%" PRIxPTR, addr); + task->log(rust_log::UPCALL | rust_log::CACHE, + "found-or-cached addr: 0x%" PRIxPTR, addr); } else { - dom->log(rust_log::UPCALL|rust_log::CACHE, - "failed to resolve symbol"); + task->log(rust_log::UPCALL | rust_log::CACHE, + "failed to resolve symbol"); task->fail(7); } return addr; } -extern "C" CDECL uintptr_t -upcall_require_c_sym(rust_task *task, - rust_crate const *curr_crate, - size_t lib_num, // # of lib - size_t c_sym_num, // # of C sym - char const *library, - char const *symbol) -{ +extern "C" CDECL uintptr_t upcall_require_c_sym(rust_task *task, + rust_crate const *curr_crate, size_t lib_num, // # of lib + size_t c_sym_num, // # of C sym + char const *library, char const *symbol) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::CACHE, - "upcall require c sym: lib #%" PRIdPTR - " = %s, c_sym #%" PRIdPTR - " = %s" - ", curr_crate = 0x%" PRIxPTR, - lib_num, library, c_sym_num, symbol, curr_crate); + task->log(rust_log::UPCALL | rust_log::CACHE, + "upcall require c sym: lib #%" PRIdPTR + " = %s, c_sym #%" PRIdPTR + " = %s" + ", curr_crate = 0x%" PRIxPTR, lib_num, library, c_sym_num, + symbol, curr_crate); - rust_crate_cache::c_sym *c = - fetch_c_sym(task, curr_crate, lib_num, c_sym_num, library, symbol); + rust_crate_cache::c_sym *c = fetch_c_sym(task, curr_crate, lib_num, + c_sym_num, library, symbol); uintptr_t addr = c->get_val(); if (addr) { - dom->log(rust_log::UPCALL|rust_log::CACHE, - "found-or-cached addr: 0x%" PRIxPTR, addr); + task->log(rust_log::UPCALL | rust_log::CACHE, + "found-or-cached addr: 0x%" PRIxPTR, addr); } else { - dom->log(rust_log::UPCALL|rust_log::CACHE, - "failed to resolve symbol"); + task->log(rust_log::UPCALL | rust_log::CACHE, + "failed to resolve symbol"); task->fail(6); } return addr; } extern "C" CDECL type_desc * -upcall_get_type_desc(rust_task *task, - rust_crate const *curr_crate, - size_t size, - size_t align, - size_t n_descs, - type_desc const **descs) -{ +upcall_get_type_desc(rust_task *task, rust_crate const *curr_crate, + size_t size, size_t align, size_t n_descs, type_desc const **descs) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::CACHE, - "upcall get_type_desc with size=%" PRIdPTR - ", align=%" PRIdPTR ", %" PRIdPTR " descs", - size, align, n_descs); + task->log(rust_log::UPCALL | rust_log::CACHE, + "upcall get_type_desc with size=%" PRIdPTR + ", align=%" PRIdPTR ", %" PRIdPTR " descs", size, align, + n_descs); rust_crate_cache *cache = task->get_crate_cache(curr_crate); type_desc *td = cache->get_type_desc(size, align, n_descs, descs); - dom->log(rust_log::UPCALL|rust_log::CACHE, - "returning tydesc 0x%" PRIxPTR, td); + task->log(rust_log::UPCALL | rust_log::CACHE, + "returning tydesc 0x%" PRIxPTR, td); return td; } - #if defined(__WIN32__) static DWORD WINAPI rust_thread_start(void *ptr) #elif defined(__GNUC__) @@ -578,10 +522,10 @@ static void *rust_thread_start(void *ptr) #endif { // We were handed the domain we are supposed to run. - rust_dom *dom = (rust_dom *)ptr; + rust_dom *dom = (rust_dom *) ptr; // Start a new rust main loop for this thread. - rust_main_loop(dom); + dom->start_main_loop(); rust_srv *srv = dom->srv; delete dom; @@ -591,81 +535,77 @@ static void *rust_thread_start(void *ptr) } extern "C" CDECL rust_task * -upcall_new_task(rust_task *spawner) -{ +upcall_new_task(rust_task *spawner) { LOG_UPCALL_ENTRY(spawner); rust_dom *dom = spawner->dom; rust_task *task = new (dom) rust_task(dom, spawner); - dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::TASK, + dom->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK, "upcall new_task(spawner 0x%" PRIxPTR ") = 0x%" PRIxPTR, spawner, task); return task; } extern "C" CDECL rust_task * -upcall_start_task(rust_task *spawner, - rust_task *task, - uintptr_t exit_task_glue, - uintptr_t spawnee_fn, - size_t callsz) -{ +upcall_start_task(rust_task *spawner, rust_task *task, + uintptr_t exit_task_glue, uintptr_t spawnee_fn, size_t callsz) { LOG_UPCALL_ENTRY(spawner); rust_dom *dom = spawner->dom; - dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::TASK, + dom->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK, "upcall start_task(task 0x%" PRIxPTR " exit_task_glue 0x%" PRIxPTR ", spawnee 0x%" PRIxPTR - ", callsz %" PRIdPTR ")", - task, exit_task_glue, spawnee_fn, callsz); + ", callsz %" PRIdPTR ")", task, exit_task_glue, spawnee_fn, + callsz); task->start(exit_task_glue, spawnee_fn, spawner->rust_sp, callsz); return task; } -extern "C" CDECL rust_task * -upcall_new_thread(rust_task *task) -{ +extern "C" CDECL rust_proxy_delegate * +upcall_new_thread(rust_task *task) { LOG_UPCALL_ENTRY(task); rust_dom *old_dom = task->dom; rust_dom *new_dom = new rust_dom(old_dom->srv->clone(), old_dom->root_crate); - new_dom->log(rust_log::UPCALL|rust_log::MEM, - "upcall new_thread() = 0x%" PRIxPTR, - new_dom->root_task); - return new_dom->root_task; + + task->log(rust_log::UPCALL | rust_log::MEM, + "upcall new_thread() = dom 0x%" PRIxPTR " task 0x%" PRIxPTR, + new_dom, new_dom->root_task); + rust_proxy *proxy = + new (old_dom) rust_proxy(old_dom, new_dom->root_task); + task->log(rust_log::UPCALL | rust_log::MEM, + "new proxy = 0x%" PRIxPTR " -> task = 0x%" PRIxPTR, + proxy, proxy->delegate()); + return proxy; } -extern "C" CDECL rust_task * +extern "C" CDECL rust_proxy_delegate * upcall_start_thread(rust_task *spawner, - rust_task *root_task, - uintptr_t exit_task_glue, - uintptr_t spawnee_fn, - size_t callsz) -{ + rust_proxy_delegate *root_task_proxy, + uintptr_t exit_task_glue, uintptr_t spawnee_fn, size_t callsz) { LOG_UPCALL_ENTRY(spawner); rust_dom *dom = spawner->dom; - dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::TASK, + rust_task *root_task = root_task_proxy->delegate(); + dom->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK, "upcall start_thread(exit_task_glue 0x%" PRIxPTR ", spawnee 0x%" PRIxPTR - ", callsz %" PRIdPTR ")", - exit_task_glue, spawnee_fn, callsz); + ", callsz %" PRIdPTR ")", exit_task_glue, spawnee_fn, callsz); root_task->start(exit_task_glue, spawnee_fn, spawner->rust_sp, callsz); #if defined(__WIN32__) HANDLE thread; thread = CreateThread(NULL, 0, rust_thread_start, root_task->dom, - 0, NULL); + 0, NULL); dom->win32_require("CreateThread", thread != NULL); #else pthread_t thread; pthread_create(&thread, &dom->attr, rust_thread_start, - (void *)root_task->dom); + (void *) root_task->dom); #endif - - return 0; + return root_task_proxy; } // diff --git a/src/rt/rust_util.h b/src/rt/rust_util.h index 6f34dad928a..62ac7de2d25 100644 --- a/src/rt/rust_util.h +++ b/src/rt/rust_util.h @@ -5,7 +5,7 @@ template rc_base::rc_base() : - refcnt(1) + ref_count(1) { } @@ -85,7 +85,7 @@ ptr_vec::trim(size_t sz) template void -ptr_vec::swapdel(T *item) +ptr_vec::swap_delete(T *item) { /* Swap the endpoint into i and decr fill. */ I(dom, data); diff --git a/src/rt/sync/condition_variable.cpp b/src/rt/sync/condition_variable.cpp new file mode 100644 index 00000000000..d403257243d --- /dev/null +++ b/src/rt/sync/condition_variable.cpp @@ -0,0 +1,66 @@ +#include "../globals.h" + +/* + * Conditional variable. Implemented using pthreads condition variables, and + * using events on windows. + */ + +#include "condition_variable.h" + +// #define TRACE + +condition_variable::condition_variable() { +#if defined(__WIN32__) + _event = CreateEvent(NULL, FALSE, FALSE, NULL); +#else + pthread_cond_init(&_cond, NULL); + pthread_mutex_init(&_mutex, NULL); +#endif +} + +condition_variable::~condition_variable() { +#if defined(__WIN32__) + CloseHandle(_event); +#else + pthread_cond_destroy(&_cond); + pthread_mutex_destroy(&_mutex); +#endif +} + +/** + * Wait indefinitely until condition is signaled. + */ +void condition_variable::wait() { +#ifdef TRACE + printf("waiting on condition_variable: 0x%" PRIxPTR "\n", + (uintptr_t)this); +#endif +#if defined(__WIN32__) + WaitForSingleObject(_event, INFINITE); +#else + pthread_mutex_lock(&_mutex); + pthread_cond_wait(&_cond, &_mutex); + pthread_mutex_unlock(&_mutex); +#endif +#ifdef TRACE + printf("resumed on condition_variable: 0x%" PRIxPTR "\n", + (uintptr_t)this); +#endif +} + +/** + * Signal condition, and resume the waiting thread. + */ +void condition_variable::signal() { +#if defined(__WIN32__) + SetEvent(_event); +#else + pthread_mutex_lock(&_mutex); + pthread_cond_signal(&_cond); + pthread_mutex_unlock(&_mutex); +#endif +#ifdef TRACE + printf("signal condition_variable: 0x%" PRIxPTR "\n", + (uintptr_t)this); +#endif +} diff --git a/src/rt/sync/condition_variable.h b/src/rt/sync/condition_variable.h new file mode 100644 index 00000000000..f847ef990bd --- /dev/null +++ b/src/rt/sync/condition_variable.h @@ -0,0 +1,19 @@ +#ifndef CONDITION_VARIABLE_H +#define CONDITION_VARIABLE_H + +class condition_variable { +#if defined(__WIN32__) + HANDLE _event; +#else + pthread_cond_t _cond; + pthread_mutex_t _mutex; +#endif +public: + condition_variable(); + virtual ~condition_variable(); + + void wait(); + void signal(); +}; + +#endif /* CONDITION_VARIABLE_H */ diff --git a/src/rt/sync/lock_free_queue.cpp b/src/rt/sync/lock_free_queue.cpp index 9d1081de88b..69241eced55 100644 --- a/src/rt/sync/lock_free_queue.cpp +++ b/src/rt/sync/lock_free_queue.cpp @@ -5,33 +5,46 @@ * dequeue() is not allowed to interrupt itself. */ +#include "../globals.h" #include "lock_free_queue.h" -lock_free_queue::lock_free_queue() : - tail(this) { +lock_free_queue_node::lock_free_queue_node() : next(NULL) { + } -void lock_free_queue::enqueue(lock_free_queue_node *item) { - item->next = (lock_free_queue_node *) 0; - lock_free_queue_node *last = tail; - tail = item; - while (last->next) +lock_free_queue::lock_free_queue() : _tail(this) { + +} + +void +lock_free_queue::enqueue(lock_free_queue_node *item) { + item->next = (lock_free_queue_node *) NULL; + lock_free_queue_node *last = _tail; + _tail = item; + while (last->next) { last = last->next; + } last->next = item; } -lock_free_queue_node *lockfree_queue::dequeue() { +lock_free_queue_node * +lock_free_queue::dequeue() { lock_free_queue_node *item = next; if (item && !(next = item->next)) { - tail = (lock_free_queue_node *) this; + _tail = (lock_free_queue_node *) this; if (item->next) { lock_free_queue_node *lost = item->next; lock_free_queue_node *help; do { help = lost->next; enqueue(lost); - } while ((lost = help) != (lock_free_queue_node *) 0); + } while ((lost = help) != (lock_free_queue_node *) NULL); } } return item; } + +bool +lock_free_queue::is_empty() { + return next == NULL; +} diff --git a/src/rt/sync/lock_free_queue.h b/src/rt/sync/lock_free_queue.h index fba4aa9ae66..1f09ec5225c 100644 --- a/src/rt/sync/lock_free_queue.h +++ b/src/rt/sync/lock_free_queue.h @@ -2,14 +2,18 @@ #define LOCK_FREE_QUEUE_H class lock_free_queue_node { +public: lock_free_queue_node *next; + lock_free_queue_node(); }; -class lock_free_queue { +class lock_free_queue : lock_free_queue_node { + lock_free_queue_node *_tail; public: lock_free_queue(); void enqueue(lock_free_queue_node *item); lock_free_queue_node *dequeue(); + bool is_empty(); }; #endif /* LOCK_FREE_QUEUE_H */ diff --git a/src/rt/sync/spin_lock.cpp b/src/rt/sync/spin_lock.cpp index 11a5cb201af..4a113d1aef9 100644 --- a/src/rt/sync/spin_lock.cpp +++ b/src/rt/sync/spin_lock.cpp @@ -1,9 +1,10 @@ +#include "../globals.h" +#include "spin_lock.h" + /* * Your average spin lock. */ -#include "globals.h" - // #define TRACE spin_lock::spin_lock() { diff --git a/src/rt/sync/spin_lock.h b/src/rt/sync/spin_lock.h index 3684c23a68d..f15416a2d4e 100644 --- a/src/rt/sync/spin_lock.h +++ b/src/rt/sync/spin_lock.h @@ -1,5 +1,5 @@ -#ifndef UNFAIR_TICKET_LOCK_H -#define UNFAIR_TICKET_LOCK_H +#ifndef SPIN_LOCK_H +#define SPIN_LOCK_H class spin_lock { unsigned ticket; @@ -11,4 +11,4 @@ public: void unlock(); }; -#endif /* UNFAIR_TICKET_LOCK_H */ +#endif /* SPIN_LOCK_H */ diff --git a/src/rt/util/array_list.h b/src/rt/util/array_list.h index 0d112575ce7..04fd833f8b6 100644 --- a/src/rt/util/array_list.h +++ b/src/rt/util/array_list.h @@ -13,38 +13,44 @@ public: array_list(); ~array_list(); size_t size(); - void append(T value); + int32_t append(T value); T replace(T old_value, T new_value); - size_t index_of(T value); + int32_t index_of(T value); T & operator[](size_t index); }; -template array_list::array_list() { +template +array_list::array_list() { _capacity = INITIAL_CAPACITY; _data = (T *) malloc(sizeof(T) * _capacity); } -template array_list::~array_list() { +template +array_list::~array_list() { delete _data; } -template size_t array_list::size() { +template size_t +array_list::size() { return _size; } -template void array_list::append(T value) { +template int32_t +array_list::append(T value) { if (_size == _capacity) { _capacity = _capacity * 2; _data = (T *) realloc(_data, _capacity * sizeof(T)); } - _data[_size++] = value; + _data[_size ++] = value; + return _size - 1; } /** * Replaces the old_value in the list with the new_value. * Returns the old_value if the replacement succeeded, or NULL otherwise. */ -template T array_list::replace(T old_value, T new_value) { +template T +array_list::replace(T old_value, T new_value) { int index = index_of(old_value); if (index < 0) { return NULL; @@ -53,7 +59,8 @@ template T array_list::replace(T old_value, T new_value) { return old_value; } -template size_t array_list::index_of(T value) { +template int32_t +array_list::index_of(T value) { for (size_t i = 0; i < _size; i++) { if (_data[i] == value) { return i; @@ -62,7 +69,8 @@ template size_t array_list::index_of(T value) { return -1; } -template T & array_list::operator[](size_t index) { +template T & +array_list::operator[](size_t index) { return _data[index]; } diff --git a/src/test/run-pass/task-comm-0.rs b/src/test/run-pass/task-comm-0.rs new file mode 100644 index 00000000000..5992ba5cf6f --- /dev/null +++ b/src/test/run-pass/task-comm-0.rs @@ -0,0 +1,19 @@ +io fn main() -> () { + test05(); +} + +io fn test05_start(chan[int] ch) { + ch <| 10; + ch <| 20; + ch <| 30; +} + +io fn test05() { + let port[int] po = port(); + let chan[int] ch = chan(po); + spawn test05_start(chan(po)); + let int value <- po; + value <- po; + value <- po; + log value; +} diff --git a/src/test/run-pass/task-comm-1.rs b/src/test/run-pass/task-comm-1.rs new file mode 100644 index 00000000000..48983b71e1a --- /dev/null +++ b/src/test/run-pass/task-comm-1.rs @@ -0,0 +1,13 @@ +fn main() -> () { + test00(); +} + +fn start() { + log "Started / Finished Task."; +} + +fn test00() { + let task t = spawn thread start(); + join t; + log "Completing."; +} \ No newline at end of file diff --git a/src/test/run-pass/task-comm-2.rs b/src/test/run-pass/task-comm-2.rs new file mode 100644 index 00000000000..9151c7b12c3 --- /dev/null +++ b/src/test/run-pass/task-comm-2.rs @@ -0,0 +1,34 @@ +fn main() -> () { + log "===== THREADS ====="; + test00(true); + log "====== TASKS ======"; + test00(false); +} + +fn start(int task_number) { + log "Started task."; + let int i = 0; + while (i < 10000) { + i = i + 1; + } + log "Finished task."; +} + +fn test00(bool create_threads) { + let int number_of_tasks = 32; + + let int i = 0; + let vec[task] tasks = vec(); + while (i < number_of_tasks) { + i = i + 1; + if (create_threads) { + tasks += vec(spawn thread start(i)); + } else { + tasks += vec(spawn start(i)); + } + } + + for (task t in tasks) { + join t; + } +} \ No newline at end of file diff --git a/src/test/run-pass/task-comm-3.rs b/src/test/run-pass/task-comm-3.rs new file mode 100644 index 00000000000..6dd620cc762 --- /dev/null +++ b/src/test/run-pass/task-comm-3.rs @@ -0,0 +1,59 @@ +io fn main() -> () { + log "===== THREADS ====="; + test00(false); +} + +io fn test00_start(chan[int] ch, int message, int count) { + log "Starting test00_start"; + let int i = 0; + while (i < count) { + log "Sending Message"; + ch <| message; + i = i + 1; + } + log "Ending test00_start"; +} + +io fn test00(bool is_multithreaded) { + let int number_of_tasks = 1; + let int number_of_messages = 0; + log "Creating tasks"; + + let port[int] po = port(); + let chan[int] ch = chan(po); + + let int i = 0; + + // Create and spawn tasks... + let vec[task] tasks = vec(); + while (i < number_of_tasks) { + i = i + 1; + if (is_multithreaded) { + tasks += vec( + spawn thread test00_start(ch, i, number_of_messages)); + } else { + tasks += vec(spawn test00_start(ch, i, number_of_messages)); + } + } + + // Read from spawned tasks... + let int sum = 0; + for (task t in tasks) { + i = 0; + while (i < number_of_messages) { + let int value <- po; + sum += value; + i = i + 1; + } + } + + // Join spawned tasks... + for (task t in tasks) { + join t; + } + + log "Completed: Final number is: "; + check (sum + 1 == number_of_messages * + (number_of_tasks * number_of_tasks + number_of_tasks) / 2); + log sum; +} \ No newline at end of file diff --git a/src/test/run-pass/task-comm-4.rs b/src/test/run-pass/task-comm-4.rs new file mode 100644 index 00000000000..42ba6992364 --- /dev/null +++ b/src/test/run-pass/task-comm-4.rs @@ -0,0 +1,10 @@ +io fn main() -> () { + test00(); +} + +io fn test00() { + let port[int] p = port(); + let chan[int] c = chan(p); + c <| 42; + let int r <- p; +} \ No newline at end of file diff --git a/src/test/run-pass/task-comm.rs b/src/test/run-pass/task-comm.rs index 4a21b4e4388..ef71c6e1bcf 100644 --- a/src/test/run-pass/task-comm.rs +++ b/src/test/run-pass/task-comm.rs @@ -1,17 +1,19 @@ - -io fn main() -> () { - test00(true); +fn main() -> () { + // test00(true); // test01(); // test02(); // test03(); // test04(); + // test05(); + test06(); } io fn test00_start(chan[int] ch, int message, int count) { log "Starting test00_start"; let int i = 0; while (i < count) { + log "Sending Message"; ch <| message; i = i + 1; } @@ -19,7 +21,7 @@ io fn test00_start(chan[int] ch, int message, int count) { } io fn test00(bool is_multithreaded) { - let int number_of_tasks = 4; + let int number_of_tasks = 1; let int number_of_messages = 64; log "Creating tasks"; @@ -109,12 +111,50 @@ fn test04() { log "Finishing up."; } - - - - - - +io fn test05_start(chan[int] ch) { + ch <| 10; + ch <| 20; + ch <| 30; + ch <| 30; + ch <| 30; +} + +io fn test05() { + let port[int] po = port(); + let chan[int] ch = chan(po); + spawn thread test05_start(ch); + let int value <- po; + value <- po; + value <- po; + log value; +} + +fn test06_start(int task_number) { + log "Started task."; + let int i = 0; + while (i < 100000000) { + i = i + 1; + } + log "Finished task."; +} + +fn test06() { + let int number_of_tasks = 32; + log "Creating tasks"; + + let int i = 0; + + let vec[task] tasks = vec(); + while (i < number_of_tasks) { + i = i + 1; + tasks += vec(spawn thread test06_start(i)); + // tasks += vec(spawn test06_start(i)); + } + + for (task t in tasks) { + join t; + } +}