diff --git a/src/lib/task.rs b/src/lib/task.rs
index c26fa632997..27b9e4030a0 100644
--- a/src/lib/task.rs
+++ b/src/lib/task.rs
@@ -5,6 +5,9 @@ native "rust" mod rustrt {
     fn unsupervise();
     fn pin_task();
     fn unpin_task();
+    fn clone_chan(*rust_chan c) -> *rust_chan;
+
+    type rust_chan;
 }
 
 /**
@@ -44,6 +47,11 @@ fn unpin() {
     rustrt::unpin_task();
 }
 
+fn clone_chan[T](chan[T] c) -> chan[T] {
+    auto cloned = rustrt::clone_chan(unsafe::reinterpret_cast(c));
+    ret unsafe::reinterpret_cast(cloned);
+}
+
 // Local Variables:
 // mode: rust;
 // fill-column: 78;
diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp
index 977494f7669..6ba749b0d5c 100644
--- a/src/rt/rust_builtin.cpp
+++ b/src/rt/rust_builtin.cpp
@@ -850,6 +850,11 @@ unpin_task(rust_task *task) {
     task->unpin();
 }
 
+extern "C" CDECL rust_chan *
+clone_chan(rust_task *task, rust_chan *chan) {
+    return chan->clone(task);
+}
+
 //
 // Local Variables:
 // mode: C++
diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp
index 92edaa77650..d04f5ee8ff5 100644
--- a/src/rt/rust_chan.cpp
+++ b/src/rt/rust_chan.cpp
@@ -71,9 +71,16 @@ void rust_chan::disassociate() {
  * Attempt to send data to the associated port.
  */
 void rust_chan::send(void *sptr) {
+    rust_scheduler *sched = kernel->sched;
+    I(sched, !port->is_proxy());
+
+    rust_port *target_port = port->referent();
+    // TODO: We can probably avoid this lock by using atomic operations in
+    // circular_buffer.
+    scoped_lock with(target_port->lock);
+
     buffer.enqueue(sptr);
 
-    rust_scheduler *sched = kernel->sched;
     if (!is_associated()) {
         W(sched, is_associated(),
           "rust_chan::transmit with no associated port.");
@@ -88,8 +95,6 @@ void rust_chan::send(void *sptr) {
                            task->get_handle(), port->as_proxy()->handle());
         buffer.dequeue(NULL);
     } else {
-        rust_port *target_port = port->referent();
-        scoped_lock with(target_port->lock);
         if (target_port->task->blocked_on(target_port)) {
             DLOG(sched, comm, "dequeued in rendezvous_ptr");
             buffer.dequeue(target_port->task->rendezvous_ptr);
diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in
index bdff7c86a9c..c6d31a47f4c 100644
--- a/src/rt/rustrt.def.in
+++ b/src/rt/rustrt.def.in
@@ -1,5 +1,6 @@
 align_of
 check_claims
+clone_chan
 debug_box
 debug_fn
 debug_obj
diff --git a/src/test/bench/task-perf/word-count.rs b/src/test/bench/task-perf/word-count.rs
index e22e77d0a53..a5b5641563b 100644
--- a/src/test/bench/task-perf/word-count.rs
+++ b/src/test/bench/task-perf/word-count.rs
@@ -17,34 +17,49 @@ import std::option::none;
 import std::str;
 import std::vec;
 import std::map;
+import std::ivec;
+
+import std::time;
+import std::u64;
+
+import std::task;
+import clone = std::task::clone_chan;
 
 fn map(str filename, map_reduce::putter emit) {
+    // log_err "mapping " + filename;
     auto f = io::file_reader(filename);
 
     while(true) {
         alt(read_word(f)) {
             case (some(?w)) {
-                emit(w, "1");
+                emit(w, 1);
             }
             case (none) {
                 break;
             }
         }
     }
+    // log_err "done mapping " + filename;
 }
 
 fn reduce(str word, map_reduce::getter get) {
+    // log_err "reducing " + word;
     auto count = 0;
 
     while(true) {
         alt(get()) {
-            case(some(_)) { count += 1 }
-            case(none) { break }
+            some(_) {
+                // log_err "received word " + word;
+                count += 1;
+            }
+            none { break }
         }
     }
 
-    auto out = io::stdout();
-    out.write_line(#fmt("%s: %d", word, count));
+    // auto out = io::stdout();
+    // out.write_line(#fmt("%s: %d", word, count));
+
+    // log_err "reduce " + word + " done.";
 }
 
 mod map_reduce {
@@ -54,74 +69,115 @@ mod map_reduce {
     export reducer;
     export map_reduce;
 
-    type putter = fn(str, str) -> ();
+    type putter = fn(str, int) -> ();
 
     type mapper = fn(str, putter);
 
-    type getter = fn() -> option[str];
+    type getter = fn() -> option[int];
 
     type reducer = fn(str, getter);
 
     tag ctrl_proto {
-        find_reducer(str, chan[chan[reduce_proto]]);
+        find_reducer(u8[], chan[chan[reduce_proto]]);
         mapper_done;
     }
 
     tag reduce_proto {
-        emit_val(str);
+        emit_val(int);
         done;
+        ref;
+        release;
     }
 
     fn start_mappers(chan[ctrl_proto] ctrl,
-                     vec[str] inputs) {
+                     vec[str] inputs) -> task[] {
+        auto tasks = ~[];
+        // log_err "starting mappers";
         for(str i in inputs) {
-            spawn map_task(ctrl, i);
+            // log_err "starting mapper for " + i;
+            tasks += ~[spawn map_task(ctrl, i)];
         }
+        // log_err "done starting mappers";
+        ret tasks;
     }
 
     fn map_task(chan[ctrl_proto] ctrl,
                 str input) {
-
+        // log_err "map_task " + input;
         auto intermediates = map::new_str_hash();
 
         fn emit(&map::hashmap[str, chan[reduce_proto]] im,
                 chan[ctrl_proto] ctrl,
-                str key, str val) {
+                str key, int val) {
+            // log_err "emitting " + key;
             auto c;
             alt(im.find(key)) {
-                case(some(?_c)) {
+                some(?_c) {
+                    // log_err "reusing saved channel for " + key;
                     c = _c
                 }
-                case(none) {
+                none {
+                    // log_err "fetching new channel for " + key;
                     auto p = port[chan[reduce_proto]]();
-                    ctrl <| find_reducer(key, chan(p));
+                    auto keyi = str::bytes_ivec(key);
+                    ctrl <| find_reducer(keyi, chan(p));
                     p |> c;
-                    im.insert(key, c);
+                    im.insert(key, clone(c));
+                    c <| ref;
                 }
             }
             c <| emit_val(val);
         }
 
         map(input, bind emit(intermediates, ctrl, _, _));
+
+        for each(@tup(str, chan[reduce_proto]) kv in intermediates.items()) {
+            // log_err "sending done to reducer for " + kv._0;
+            kv._1 <| release;
+        }
+
         ctrl <| mapper_done;
+
+        // log_err "~map_task " + input;
     }
 
     fn reduce_task(str key, chan[chan[reduce_proto]] out) {
+        // log_err "reduce_task " + key;
         auto p = port();
 
         out <| chan(p);
 
-        fn get(port[reduce_proto] p) -> option[str] {
-            auto m;
-            p |> m;
+        auto ref_count = 0;
+        auto is_done = false;
 
-            alt(m) {
-                case(emit_val(?v)) { ret some(v); }
-                case(done) { ret none; }
+        fn get(&port[reduce_proto] p, &mutable int ref_count,
+               &mutable bool is_done) -> option[int] {
+            while (!is_done || ref_count > 0) {
+                auto m;
+                p |> m;
+
+                alt(m) {
+                    emit_val(?v) {
+                        // log_err #fmt("received %d", v);
+                        ret some(v);
+                    }
+                    done {
+                        // log_err "all done";
+                        is_done = true;
+                    }
+                    ref {
+                        ref_count += 1;
+                    }
+                    release {
+                        ref_count -= 1;
+                    }
+                }
             }
+            ret none;
         }
 
-        reduce(key, bind get(p));
+        reduce(key, bind get(p, ref_count, is_done));
+        // log_err "~reduce_task " + key;
     }
 
     fn map_reduce (vec[str] inputs) {
@@ -134,7 +190,7 @@ mod map_reduce {
 
         reducers = map::new_str_hash();
 
-        start_mappers(chan(ctrl), inputs);
+        auto tasks = start_mappers(chan(ctrl), inputs);
 
         auto num_mappers = vec::len(inputs) as int;
 
@@ -143,26 +199,42 @@ mod map_reduce {
             ctrl |> m;
 
             alt(m) {
-                case(mapper_done) { num_mappers -= 1; }
-                case(find_reducer(?k, ?cc)) {
+                mapper_done {
+                    // log_err "received mapper terminated.";
+                    num_mappers -= 1;
+                }
+                find_reducer(?ki, ?cc) {
                     auto c;
+                    auto k = str::unsafe_from_bytes_ivec(ki);
+                    // log_err "finding reducer for " + k;
                     alt(reducers.find(k)) {
-                        case(some(?_c)) { c = _c; }
-                        case(none) {
+                        some(?_c) {
+                            // log_err "reusing existing reducer for " + k;
+                            c = _c;
+                        }
+                        none {
+                            // log_err "creating new reducer for " + k;
                             auto p = port();
-                            spawn reduce_task(k, chan(p));
+                            tasks += ~[spawn reduce_task(k, chan(p))];
                             p |> c;
                             reducers.insert(k, c);
                         }
                     }
-                    cc <| c;
+                    cc <| clone(c);
                 }
             }
         }
 
         for each(@tup(str, chan[reduce_proto]) kv in reducers.items()) {
+            // log_err "sending done to reducer for " + kv._0;
             kv._1 <| done;
         }
+
+        // log_err #fmt("joining %u tasks", ivec::len(tasks));
+        for (task t in tasks) {
+            task::join(t);
+        }
+        // log_err "control task done.";
     }
 }
 
@@ -174,7 +246,14 @@ fn main(vec[str] argv) {
         fail;
     }
 
+    auto start = time::precise_time_ns();
     map_reduce::map_reduce(vec::slice(argv, 1u, vec::len(argv)));
+    auto stop = time::precise_time_ns();
+
+    auto elapsed = stop - start;
+    elapsed /= 1000000u64;
+
+    log_err "MapReduce completed in " + u64::str(elapsed) + "ms";
 }
 
 fn read_word(io::reader r) -> option[str] {
diff --git a/src/test/run-pass/hashmap-memory.rs b/src/test/run-pass/hashmap-memory.rs
index a9be99cbf44..244198f9401 100644
--- a/src/test/run-pass/hashmap-memory.rs
+++ b/src/test/run-pass/hashmap-memory.rs
@@ -41,7 +41,7 @@ mod map_reduce {
 
     fn map_task(chan[ctrl_proto] ctrl,
                 str input) {
-        
+
         auto intermediates = map::new_str_hash();
 
         fn emit(&map::hashmap[str, int] im,
@@ -85,7 +85,7 @@ mod map_reduce {
         while(num_mappers > 0) {
             auto m;
             ctrl |> m;
-            
+
             alt(m) {
                 case(mapper_done) { num_mappers -= 1; }
                 case(find_reducer(?k, ?cc)) {