From 8878b128baddfa4ee38e4f9c43be75abe0edcd3d Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Thu, 21 Jul 2011 12:11:05 -0700 Subject: [PATCH] More work on word-count. Updated the MapReduce protocol so that it's correct more often. It's still not perfect, but the bugs repro less often now. Also found a race condition in channel sending. The problem is that send and receive both need to refer to the _unread field in circular_buffer. For now I just grabbed the port lock to send. We can probably get around this by using atomics instead. --- src/lib/task.rs | 8 ++ src/rt/rust_builtin.cpp | 5 + src/rt/rust_chan.cpp | 11 +- src/rt/rustrt.def.in | 1 + src/test/bench/task-perf/word-count.rs | 141 +++++++++++++++++++------ src/test/run-pass/hashmap-memory.rs | 4 +- 6 files changed, 134 insertions(+), 36 deletions(-) diff --git a/src/lib/task.rs b/src/lib/task.rs index c26fa632997..27b9e4030a0 100644 --- a/src/lib/task.rs +++ b/src/lib/task.rs @@ -5,6 +5,9 @@ native "rust" mod rustrt { fn unsupervise(); fn pin_task(); fn unpin_task(); + fn clone_chan(*rust_chan c) -> *rust_chan; + + type rust_chan; } /** @@ -44,6 +47,11 @@ fn unpin() { rustrt::unpin_task(); } +fn clone_chan[T](chan[T] c) -> chan[T] { + auto cloned = rustrt::clone_chan(unsafe::reinterpret_cast(c)); + ret unsafe::reinterpret_cast(cloned); +} + // Local Variables: // mode: rust; // fill-column: 78; diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 977494f7669..6ba749b0d5c 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -850,6 +850,11 @@ unpin_task(rust_task *task) { task->unpin(); } +extern "C" CDECL rust_chan * +clone_chan(rust_task *task, rust_chan *chan) { + return chan->clone(task); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index 92edaa77650..d04f5ee8ff5 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -71,9 +71,16 @@ void rust_chan::disassociate() { * Attempt to send data to the associated port. */ void rust_chan::send(void *sptr) { + rust_scheduler *sched = kernel->sched; + I(sched, !port->is_proxy()); + + rust_port *target_port = port->referent(); + // TODO: We can probably avoid this lock by using atomic operations in + // circular_buffer. + scoped_lock with(target_port->lock); + buffer.enqueue(sptr); - rust_scheduler *sched = kernel->sched; if (!is_associated()) { W(sched, is_associated(), "rust_chan::transmit with no associated port."); @@ -88,8 +95,6 @@ void rust_chan::send(void *sptr) { task->get_handle(), port->as_proxy()->handle()); buffer.dequeue(NULL); } else { - rust_port *target_port = port->referent(); - scoped_lock with(target_port->lock); if (target_port->task->blocked_on(target_port)) { DLOG(sched, comm, "dequeued in rendezvous_ptr"); buffer.dequeue(target_port->task->rendezvous_ptr); diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index bdff7c86a9c..c6d31a47f4c 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -1,5 +1,6 @@ align_of check_claims +clone_chan debug_box debug_fn debug_obj diff --git a/src/test/bench/task-perf/word-count.rs b/src/test/bench/task-perf/word-count.rs index e22e77d0a53..a5b5641563b 100644 --- a/src/test/bench/task-perf/word-count.rs +++ b/src/test/bench/task-perf/word-count.rs @@ -17,34 +17,49 @@ import std::option::none; import std::str; import std::vec; import std::map; +import std::ivec; + +import std::time; +import std::u64; + +import std::task; +import clone = std::task::clone_chan; fn map(str filename, map_reduce::putter emit) { + // log_err "mapping " + filename; auto f = io::file_reader(filename); while(true) { alt(read_word(f)) { case (some(?w)) { - emit(w, "1"); + emit(w, 1); } case (none) { break; } } } + // log_err "done mapping " + filename; } fn reduce(str word, map_reduce::getter get) { + // log_err "reducing " + word; auto count = 0; while(true) { alt(get()) { - case(some(_)) { count += 1 } - case(none) { break } + some(_) { + // log_err "received word " + word; + count += 1; + } + none { break } } } - auto out = io::stdout(); - out.write_line(#fmt("%s: %d", word, count)); + // auto out = io::stdout(); + // out.write_line(#fmt("%s: %d", word, count)); + + // log_err "reduce " + word + " done."; } mod map_reduce { @@ -54,74 +69,115 @@ mod map_reduce { export reducer; export map_reduce; - type putter = fn(str, str) -> (); + type putter = fn(str, int) -> (); type mapper = fn(str, putter); - type getter = fn() -> option[str]; + type getter = fn() -> option[int]; type reducer = fn(str, getter); tag ctrl_proto { - find_reducer(str, chan[chan[reduce_proto]]); + find_reducer(u8[], chan[chan[reduce_proto]]); mapper_done; } tag reduce_proto { - emit_val(str); + emit_val(int); done; + ref; + release; } fn start_mappers(chan[ctrl_proto] ctrl, - vec[str] inputs) { + vec[str] inputs) -> task[] { + auto tasks = ~[]; + // log_err "starting mappers"; for(str i in inputs) { - spawn map_task(ctrl, i); + // log_err "starting mapper for " + i; + tasks += ~[spawn map_task(ctrl, i)]; } + // log_err "done starting mappers"; + ret tasks; } fn map_task(chan[ctrl_proto] ctrl, str input) { - + // log_err "map_task " + input; auto intermediates = map::new_str_hash(); fn emit(&map::hashmap[str, chan[reduce_proto]] im, chan[ctrl_proto] ctrl, - str key, str val) { + str key, int val) { + // log_err "emitting " + key; auto c; alt(im.find(key)) { - case(some(?_c)) { + some(?_c) { + // log_err "reusing saved channel for " + key; c = _c } - case(none) { + none { + // log_err "fetching new channel for " + key; auto p = port[chan[reduce_proto]](); - ctrl <| find_reducer(key, chan(p)); + auto keyi = str::bytes_ivec(key); + ctrl <| find_reducer(keyi, chan(p)); p |> c; - im.insert(key, c); + im.insert(key, clone(c)); + c <| ref; } } c <| emit_val(val); } map(input, bind emit(intermediates, ctrl, _, _)); + + for each(@tup(str, chan[reduce_proto]) kv in intermediates.items()) { + // log_err "sending done to reducer for " + kv._0; + kv._1 <| release; + } + ctrl <| mapper_done; + + // log_err "~map_task " + input; } fn reduce_task(str key, chan[chan[reduce_proto]] out) { + // log_err "reduce_task " + key; auto p = port(); out <| chan(p); - fn get(port[reduce_proto] p) -> option[str] { - auto m; - p |> m; + auto ref_count = 0; + auto is_done = false; - alt(m) { - case(emit_val(?v)) { ret some(v); } - case(done) { ret none; } + fn get(&port[reduce_proto] p, &mutable int ref_count, + &mutable bool is_done) -> option[int] { + while (!is_done || ref_count > 0) { + auto m; + p |> m; + + alt(m) { + emit_val(?v) { + // log_err #fmt("received %d", v); + ret some(v); + } + done { + // log_err "all done"; + is_done = true; + } + ref { + ref_count += 1; + } + release { + ref_count -= 1; + } + } } + ret none; } - reduce(key, bind get(p)); + reduce(key, bind get(p, ref_count, is_done)); + // log_err "~reduce_task " + key; } fn map_reduce (vec[str] inputs) { @@ -134,7 +190,7 @@ mod map_reduce { reducers = map::new_str_hash(); - start_mappers(chan(ctrl), inputs); + auto tasks = start_mappers(chan(ctrl), inputs); auto num_mappers = vec::len(inputs) as int; @@ -143,26 +199,42 @@ mod map_reduce { ctrl |> m; alt(m) { - case(mapper_done) { num_mappers -= 1; } - case(find_reducer(?k, ?cc)) { + mapper_done { + // log_err "received mapper terminated."; + num_mappers -= 1; + } + find_reducer(?ki, ?cc) { auto c; + auto k = str::unsafe_from_bytes_ivec(ki); + // log_err "finding reducer for " + k; alt(reducers.find(k)) { - case(some(?_c)) { c = _c; } - case(none) { + some(?_c) { + // log_err "reusing existing reducer for " + k; + c = _c; + } + none { + // log_err "creating new reducer for " + k; auto p = port(); - spawn reduce_task(k, chan(p)); + tasks += ~[spawn reduce_task(k, chan(p))]; p |> c; reducers.insert(k, c); } } - cc <| c; + cc <| clone(c); } } } for each(@tup(str, chan[reduce_proto]) kv in reducers.items()) { + // log_err "sending done to reducer for " + kv._0; kv._1 <| done; } + + // log_err #fmt("joining %u tasks", ivec::len(tasks)); + for (task t in tasks) { + task::join(t); + } + // log_err "control task done."; } } @@ -174,7 +246,14 @@ fn main(vec[str] argv) { fail; } + auto start = time::precise_time_ns(); map_reduce::map_reduce(vec::slice(argv, 1u, vec::len(argv))); + auto stop = time::precise_time_ns(); + + auto elapsed = stop - start; + elapsed /= 1000000u64; + + log_err "MapReduce completed in " + u64::str(elapsed) + "ms"; } fn read_word(io::reader r) -> option[str] { diff --git a/src/test/run-pass/hashmap-memory.rs b/src/test/run-pass/hashmap-memory.rs index a9be99cbf44..244198f9401 100644 --- a/src/test/run-pass/hashmap-memory.rs +++ b/src/test/run-pass/hashmap-memory.rs @@ -41,7 +41,7 @@ mod map_reduce { fn map_task(chan[ctrl_proto] ctrl, str input) { - + auto intermediates = map::new_str_hash(); fn emit(&map::hashmap[str, int] im, @@ -85,7 +85,7 @@ mod map_reduce { while(num_mappers > 0) { auto m; ctrl |> m; - + alt(m) { case(mapper_done) { num_mappers -= 1; } case(find_reducer(?k, ?cc)) {