/** A somewhat reduced test case to expose some Valgrind issues. This originally came from the word-count benchmark. */ use std; import option = std::option::t; import std::option::some; import std::option::none; import std::istr; import std::vec; import std::map; import std::task; import std::comm::chan; import std::comm::port; import std::comm::send; import std::comm::recv; import std::comm; fn map(filename: &istr, emit: map_reduce::putter) { emit(filename, ~"1"); } mod map_reduce { export putter; export mapper; export map_reduce; type putter = fn(&istr, &istr); type mapper = fn(&istr, putter); tag ctrl_proto { find_reducer([u8], chan); mapper_done; } fn start_mappers(ctrl: chan, inputs: &[istr]) { for i: istr in inputs { task::spawn(bind map_task(ctrl, i)); } } fn map_task(ctrl: chan, input: -istr) { let intermediates = map::new_str_hash(); fn emit(im: &map::hashmap, ctrl: chan, key: &istr, val: &istr) { let c; alt im.find(key) { some(_c) { c = _c } none. { let p = port(); log_err "sending find_reducer"; send(ctrl, find_reducer(istr::bytes(key), chan(p))); log_err "receiving"; c = recv(p); log_err c; im.insert(key, c); } } } map(input, bind emit(intermediates, ctrl, _, _)); send(ctrl, mapper_done); } fn map_reduce(inputs: &[istr]) { let ctrl = port(); // This task becomes the master control task. It spawns others // to do the rest. let reducers: map::hashmap; reducers = map::new_str_hash(); start_mappers(chan(ctrl), inputs); let num_mappers = vec::len(inputs) as int; while num_mappers > 0 { alt recv(ctrl) { mapper_done. { num_mappers -= 1; } find_reducer(k, cc) { let c; alt reducers.find(istr::unsafe_from_bytes(k)) { some(_c) { c = _c; } none. { c = 0; } } send(cc, c); } } } } } fn main() { map_reduce::map_reduce([~"../src/test/run-pass/hashmap-memory.rs"]); }