From 0c3a128419e2b75be1fa6de6ff92d32fba11e005 Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Fri, 25 May 2012 17:45:50 -0700 Subject: [PATCH] Update word-count-generic to latest syntax and un-xfail it. Closes #1740. --- .../bench/task-perf-word-count-generic.rs | 128 +++++++++++------- 1 file changed, 76 insertions(+), 52 deletions(-) diff --git a/src/test/bench/task-perf-word-count-generic.rs b/src/test/bench/task-perf-word-count-generic.rs index d2820f32ff9..f41629a00db 100644 --- a/src/test/bench/task-perf-word-count-generic.rs +++ b/src/test/bench/task-perf-word-count-generic.rs @@ -1,5 +1,3 @@ -// xfail-test - #1038 - Can't do this safely with bare functions - /** A parallel word-frequency counting program. @@ -18,21 +16,41 @@ import option::none; import str; import std::treemap; import vec; -import std::io; +import io; +import io::{reader_util, writer_util}; import std::time; import u64; import task; -import task::joinable_task; import comm; import comm::chan; import comm::port; import comm::recv; import comm::send; +import comm::methods; + +// These used to be in task, but they disappeard. +type joinable_task = port<()>; +fn spawn_joinable(f: fn~()) -> joinable_task { + let p = port(); + let c = chan(p); + task::spawn() {|| + f(); + c.send(()); + } + p +} + +fn join(t: joinable_task) { + t.recv() +} fn map(&&filename: [u8], emit: map_reduce::putter<[u8], int>) { - let f = io::file_reader(str::from_bytes(filename)); + let f = alt io::file_reader(str::from_bytes(filename)) { + result::ok(f) { f } + result::err(e) { fail #fmt("%?", e) } + }; loop { alt read_word(f) { @@ -42,10 +60,12 @@ fn map(&&filename: [u8], emit: map_reduce::putter<[u8], int>) { } } -fn reduce(&&_word: [u8], get: map_reduce::getter) { - let count = 0; +fn reduce(&&word: [u8], get: map_reduce::getter) { + let mut count = 0; loop { alt get() { some(_) { count += 1; } none { break; } } } + + io::println(#fmt("%?\t%?", word, count)); } mod map_reduce { @@ -59,41 +79,43 @@ mod map_reduce { // FIXME: the first K1 parameter should probably be a -, but that // doesn't parse at the moment. - type mapper = fn(K1, putter); + type mapper = fn~(K1, putter); type getter = fn() -> option; - type reducer = fn(K, getter); + type reducer = fn~(K, getter); - enum ctrl_proto { - find_reducer(K, chan>>); - mapper_done; + enum ctrl_proto { + find_reducer(K, chan>>), + mapper_done } - enum reduce_proto { emit_val(V); done; ref; release; } + enum reduce_proto { emit_val(V), done, ref, release } - fn start_mappers(map: mapper, - ctrl: chan>, inputs: [K1]) -> - [joinable_task] { - let tasks = []; + fn start_mappers( + map: mapper, + ctrl: chan>, inputs: [K1]) + -> [joinable_task] + { + let mut tasks = []; for inputs.each {|i| - let m = map, c = ctrl, ii = i; - tasks += [task::spawn_joinable {|| map_task(m, c, ii)}]; + tasks += [spawn_joinable {|| map_task(map, ctrl, i)}]; } ret tasks; } - fn map_task(-map: mapper, - -ctrl: chan>, - -input: K1) { + fn map_task( + map: mapper, + ctrl: chan>, + input: K1) + { // log(error, "map_task " + input); - let intermediates = treemap::init(); + let intermediates = treemap::treemap(); - fn emit(im: treemap::treemap>>, - ctrl: chan>, key: K2, val: V) { + fn emit( + im: treemap::treemap>>, + ctrl: chan>, key: K2, val: V) + { let c; alt treemap::find(im, key) { some(_c) { c = _c; } @@ -110,16 +132,19 @@ mod map_reduce { map(input, bind emit(intermediates, ctrl, _, _)); - fn finish(_k: K, v: chan>) { + fn finish(_k: K, v: chan>) + { send(v, release); } treemap::traverse(intermediates, finish); send(ctrl, mapper_done); } - fn reduce_task(-reduce: reducer, -key: K, - -out: chan>>) { + fn reduce_task( + reduce: reducer, + key: K, + out: chan>>) + { let p = port(); send(out, chan(p)); @@ -127,8 +152,8 @@ mod map_reduce { let ref_count = 0; let is_done = false; - fn get(p: port>, - &ref_count: int, &is_done: bool) + fn get(p: port>, + &ref_count: int, &is_done: bool) -> option { while !is_done || ref_count > 0 { alt recv(p) { @@ -140,8 +165,8 @@ mod map_reduce { // #error("all done"); is_done = true; } - ref. { ref_count += 1; } - release. { ref_count -= 1; } + ref { ref_count += 1; } + release { ref_count -= 1; } } } ret none; @@ -150,19 +175,19 @@ mod map_reduce { reduce(key, bind get(p, ref_count, is_done)); } - fn map_reduce(map: mapper, reduce: reducer, - inputs: [K1]) { + fn map_reduce( + map: mapper, + reduce: reducer, + inputs: [K1]) + { let ctrl = port(); // This task becomes the master control task. It task::_spawns // to do the rest. - let reducers = treemap::init(); - - let tasks = start_mappers(map, chan(ctrl), inputs); - - let num_mappers = vec::len(inputs) as int; + let reducers = treemap::treemap(); + let mut tasks = start_mappers(map, chan(ctrl), inputs); + let mut num_mappers = vec::len(inputs) as int; while num_mappers > 0 { alt recv(ctrl) { @@ -185,7 +210,7 @@ mod map_reduce { let ch = chan(p); let r = reduce, kk = k; tasks += [ - task::spawn_joinable {|| reduce_task(r, kk, ch) } + spawn_joinable {|| reduce_task(r, kk, ch) } ]; c = recv(p); treemap::insert(reducers, k, c); @@ -196,12 +221,13 @@ mod map_reduce { } } - fn finish(_k: K, v: chan>) { + fn finish(_k: K, v: chan>) + { send(v, done); } treemap::traverse(reducers, finish); - for tasks.each {|t| task::join(t); } + for tasks.each {|t| join(t); } } } @@ -217,7 +243,7 @@ fn main(argv: [str]) { ret; } - let iargs = []; + let mut iargs = []; vec::iter_between(argv, 1u, vec::len(argv)) {|a| iargs += [str::bytes(a)]; } @@ -227,20 +253,18 @@ fn main(argv: [str]) { map_reduce::map_reduce(map, reduce, iargs); let stop = time::precise_time_ns(); - let elapsed = stop - start; - elapsed /= 1000000u64; + let elapsed = (stop - start) / 1000000u64; log(error, "MapReduce completed in " + u64::str(elapsed) + "ms"); } fn read_word(r: io::reader) -> option { - let w = ""; + let mut w = ""; while !r.eof() { let c = r.read_char(); - if is_word_char(c) { w += str::from_char(c); } else { if w != "" { ret some(w); } }