Switch map-reduce control protocol to use pipes. This exposed a bug in the pipe compiler, which is now fixed.

Use hashmaps in MapReduce

Tweak word-count difficulty
This commit is contained in:
Eric Holk 2012-07-11 14:50:39 -07:00
parent d5b8bbb4b2
commit 1a276dba52
2 changed files with 101 additions and 65 deletions

View File

@ -56,34 +56,11 @@ impl methods for message {
// Return the type parameters actually used by this message
fn get_params() -> ~[ast::ty_param] {
let mut used = ~[];
alt self {
message(_, tys, this, _, next_tys) {
let parms = this.ty_params;
for vec::append(tys, next_tys).each |ty| {
alt ty.node {
ast::ty_path(path, _) {
if path.idents.len() == 1 {
let id = path.idents[0];
let found = parms.find(|p| id == p.ident);
alt found {
some(p) {
if !used.contains(p) {
vec::push(used, p);
}
}
none { }
}
}
}
_ { }
}
}
message(_, _, this, _, _) {
this.ty_params
}
}
used
}
fn gen_send(cx: ext_ctxt) -> @ast::item {

View File

@ -14,7 +14,8 @@ import option = option;
import option::some;
import option::none;
import str;
import std::treemap;
import std::map;
import std::map::hashmap;
import vec;
import io;
import io::{reader_util, writer_util};
@ -30,10 +31,30 @@ import comm::recv;
import comm::send;
import comm::methods;
macro_rules! move {
{ $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } }
}
trait word_reader {
fn read_word() -> option<str>;
}
trait hash_key {
fn hash() -> uint;
fn eq(self) -> bool;
}
fn mk_hash<K: const hash_key, V: copy>() -> map::hashmap<K, V> {
fn hashfn<K: const hash_key>(k: K) -> uint { k.hash() }
map::hashmap(hashfn::<K>, |x, y| x.eq(y))
}
impl of hash_key for str {
fn hash() -> uint { str::hash(self) }
fn eq(&&x: str) -> bool { str::eq(self, x) }
}
// These used to be in task, but they disappeard.
type joinable_task = port<()>;
fn spawn_joinable(+f: fn~()) -> joinable_task {
@ -79,6 +100,23 @@ fn reduce(&&word: str, get: map_reduce::getter<int>) {
io::println(#fmt("%s\t%?", word, count));
}
class box<T> {
let mut contents: option<T>;
new(+x: T) { self.contents = some(x); }
fn swap(f: fn(+T) -> T) {
let mut tmp = none;
self.contents <-> tmp;
self.contents = some(f(option::unwrap(tmp)));
}
fn unwrap() -> T {
let mut tmp = none;
self.contents <-> tmp;
option::unwrap(tmp)
}
}
mod map_reduce {
export putter;
export getter;
@ -99,54 +137,74 @@ mod map_reduce {
mapper_done
}
proto! ctrl_proto {
open: send<K: copy send, V: copy send> {
find_reducer(K) -> reducer_response<K, V>,
mapper_done -> terminated
}
reducer_response: recv<K: copy send, V: copy send> {
reducer(chan<reduce_proto<V>>) -> open<K, V>
}
terminated: send { }
}
enum reduce_proto<V: copy send> { emit_val(V), done, ref, release }
fn start_mappers<K1: copy send, K2: copy send, V: copy send>(
fn start_mappers<K1: copy send, K2: const copy send hash_key,
V: copy send>(
map: mapper<K1, K2, V>,
ctrl: chan<ctrl_proto<K2, V>>, inputs: ~[K1])
&ctrls: ~[ctrl_proto::server::open<K2, V>],
inputs: ~[K1])
-> ~[joinable_task]
{
let mut tasks = ~[];
for inputs.each |i| {
let (ctrl, ctrl_server) = ctrl_proto::init();
let ctrl = box(ctrl);
vec::push(tasks, spawn_joinable(|| map_task(map, ctrl, i) ));
vec::push(ctrls, ctrl_server);
}
ret tasks;
}
fn map_task<K1: copy send, K2: copy send, V: copy send>(
fn map_task<K1: copy send, K2: const copy send hash_key, V: copy send>(
map: mapper<K1, K2, V>,
ctrl: chan<ctrl_proto<K2, V>>,
ctrl: box<ctrl_proto::client::open<K2, V>>,
input: K1)
{
// log(error, "map_task " + input);
let intermediates = treemap::treemap();
let intermediates = mk_hash();
fn emit<K2: copy send, V: copy send>(
im: treemap::treemap<K2, chan<reduce_proto<V>>>,
ctrl: chan<ctrl_proto<K2, V>>, key: K2, val: V)
{
let c;
alt treemap::find(im, key) {
some(_c) { c = _c; }
do map(input) |key, val| {
let mut c = none;
alt intermediates.find(key) {
some(_c) { c = some(_c); }
none {
let p = port();
send(ctrl, find_reducer(key, chan(p)));
c = recv(p);
treemap::insert(im, key, c);
send(c, ref);
do ctrl.swap |ctrl| {
let ctrl = ctrl_proto::client::find_reducer(ctrl, key);
alt pipes::recv(ctrl) {
ctrl_proto::reducer(c_, ctrl) {
c = some(c_);
move!{ctrl}
}
}
}
intermediates.insert(key, c.get());
send(c.get(), ref);
}
}
send(c, emit_val(val));
send(c.get(), emit_val(val));
}
map(input, {|a,b|emit(intermediates, ctrl, a, b)});
fn finish<K: copy send, V: copy send>(_k: K, v: chan<reduce_proto<V>>)
{
send(v, release);
}
treemap::traverse(intermediates, finish);
send(ctrl, mapper_done);
for intermediates.each_value |v| { send(v, release) }
ctrl_proto::client::mapper_done(ctrl.unwrap());
}
fn reduce_task<K: copy send, V: copy send>(
@ -184,30 +242,32 @@ mod map_reduce {
reduce(key, || get(p, ref_count, is_done) );
}
fn map_reduce<K1: copy send, K2: copy send, V: copy send>(
fn map_reduce<K1: copy send, K2: const copy send hash_key, V: copy send>(
map: mapper<K1, K2, V>,
reduce: reducer<K2, V>,
inputs: ~[K1])
{
let ctrl = port();
let mut ctrl = ~[];
// This task becomes the master control task. It task::_spawns
// to do the rest.
let reducers = treemap::treemap();
let mut tasks = start_mappers(map, chan(ctrl), inputs);
let reducers = mk_hash();
let mut tasks = start_mappers(map, ctrl, inputs);
let mut num_mappers = vec::len(inputs) as int;
while num_mappers > 0 {
alt recv(ctrl) {
mapper_done {
let (_ready, message, ctrls) = pipes::select(ctrl);
alt option::unwrap(message) {
ctrl_proto::mapper_done(_) {
// #error("received mapper terminated.");
num_mappers -= 1;
ctrl = ctrls;
}
find_reducer(k, cc) {
ctrl_proto::find_reducer(k, cc) {
let c;
// log(error, "finding reducer for " + k);
alt treemap::find(reducers, k) {
alt reducers.find(k) {
some(_c) {
// log(error,
// "reusing existing reducer for " + k);
@ -221,19 +281,17 @@ mod map_reduce {
vec::push(tasks,
spawn_joinable(|| reduce_task(r, kk, ch) ));
c = recv(p);
treemap::insert(reducers, k, c);
reducers.insert(k, c);
}
}
send(cc, c);
ctrl = vec::append_one(
ctrls,
ctrl_proto::server::reducer(move!{cc}, c));
}
}
}
fn finish<K: copy send, V: copy send>(_k: K, v: chan<reduce_proto<V>>)
{
send(v, done);
}
treemap::traverse(reducers, finish);
for reducers.each_value |v| { send(v, done) }
for tasks.each |t| { join(t); }
}
@ -254,7 +312,7 @@ fn main(argv: ~[str]) {
}
else {
let num_readers = 50;
let words_per_reader = 1000;
let words_per_reader = 600;
vec::from_fn(
num_readers,
|_i| fn~() -> word_reader {
@ -301,7 +359,8 @@ class random_word_reader: word_reader {
fn read_word() -> option<str> {
if self.remaining > 0 {
self.remaining -= 1;
some(self.rng.gen_str(5))
let len = self.rng.gen_uint_range(1, 4);
some(self.rng.gen_str(len))
}
else { none }
}