rust/src/libstd/par.rs

139 lines
4.2 KiB
Rust
Raw Normal View History

2012-05-30 12:36:29 -05:00
import comm::port;
import comm::chan;
import comm::send;
import comm::recv;
import future::future;
export map, mapi, alli, any, mapi_factory;
2012-05-30 12:36:29 -05:00
#[doc="The maximum number of tasks this module will spawn for a single
2012-06-18 15:34:15 -05:00
operation."]
2012-05-30 12:36:29 -05:00
const max_tasks : uint = 32u;
#[doc="The minimum number of elements each task will process."]
const min_granularity : uint = 1024u;
#[doc="An internal helper to map a function over a large vector and
return the intermediate results.
This is used to build most of the other parallel vector functions,
like map or alli."]
fn map_slices<A: copy send, B: copy send>(
xs: [A],
f: fn() -> fn~(uint, [A]/&) -> B)
2012-05-30 12:36:29 -05:00
-> [B] {
let len = xs.len();
if len < min_granularity {
log(info, "small slice");
// This is a small vector, fall back on the normal map.
[f()(0u, xs)]
2012-05-30 12:36:29 -05:00
}
else {
let num_tasks = uint::min(max_tasks, len / min_granularity);
let items_per_task = len / num_tasks;
let mut futures = [];
let mut base = 0u;
log(info, "spawning tasks");
while base < len {
let end = uint::min(len, base + items_per_task);
2012-06-14 20:46:33 -05:00
// FIXME: why is the ::<A, ()> annotation required here? (#2617)
2012-05-30 12:36:29 -05:00
vec::unpack_slice::<A, ()>(xs) {|p, _len|
let f = f();
let f = future::spawn() {|copy base|
2012-05-30 12:36:29 -05:00
unsafe {
let len = end - base;
let slice = (ptr::offset(p, base),
len * sys::size_of::<A>());
log(info, #fmt("pre-slice: %?", (base, slice)));
let slice : [A]/& =
2012-05-30 12:36:29 -05:00
unsafe::reinterpret_cast(slice);
log(info, #fmt("slice: %?",
(base, vec::len(slice), end - base)));
assert(vec::len(slice) == end - base);
f(base, slice)
2012-05-30 12:36:29 -05:00
}
};
vec::push(futures, f);
2012-05-30 12:36:29 -05:00
};
base += items_per_task;
}
log(info, "tasks spawned");
log(info, #fmt("num_tasks: %?", (num_tasks, futures.len())));
assert(num_tasks == futures.len());
let r = futures.map() {|ys|
ys.get()
};
assert(r.len() == futures.len());
r
}
}
#[doc="A parallel version of map."]
fn map<A: copy send, B: copy send>(xs: [A], f: fn~(A) -> B) -> [B] {
vec::concat(map_slices(xs) {||
fn~(_base: uint, slice : [A]/&, copy f) -> [B] {
vec::map(slice, f)
}
2012-05-30 12:36:29 -05:00
})
}
#[doc="A parallel version of mapi."]
fn mapi<A: copy send, B: copy send>(xs: [A], f: fn~(uint, A) -> B) -> [B] {
let slices = map_slices(xs) {||
fn~(base: uint, slice : [A]/&, copy f) -> [B] {
vec::mapi(slice) {|i, x|
f(i + base, x)
}
}
};
let r = vec::concat(slices);
log(info, (r.len(), xs.len()));
assert(r.len() == xs.len());
r
}
#[doc="A parallel version of mapi.
In this case, f is a function that creates functions to run over the
inner elements. This is to skirt the need for copy constructors."]
fn mapi_factory<A: copy send, B: copy send>(
xs: [A], f: fn() -> fn~(uint, A) -> B) -> [B] {
let slices = map_slices(xs) {||
let f = f();
fn~(base: uint, slice : [A]/&, move f) -> [B] {
vec::mapi(slice) {|i, x|
f(i + base, x)
}
2012-05-30 12:36:29 -05:00
}
};
let r = vec::concat(slices);
log(info, (r.len(), xs.len()));
assert(r.len() == xs.len());
r
}
#[doc="Returns true if the function holds for all elements in the vector."]
fn alli<A: copy send>(xs: [A], f: fn~(uint, A) -> bool) -> bool {
2012-05-30 17:45:29 -05:00
vec::all(map_slices(xs) {||
fn~(base: uint, slice : [A]/&, copy f) -> bool {
2012-05-30 17:45:29 -05:00
vec::alli(slice) {|i, x|
f(i + base, x)
}
2012-05-30 12:36:29 -05:00
}
2012-05-30 17:45:29 -05:00
}) {|x| x }
2012-05-30 12:36:29 -05:00
}
#[doc="Returns true if the function holds for any elements in the vector."]
fn any<A: copy send>(xs: [A], f: fn~(A) -> bool) -> bool {
vec::any(map_slices(xs) {||
fn~(_base : uint, slice: [A]/&, copy f) -> bool {
2012-05-30 17:45:29 -05:00
vec::any(slice, f)
}
}) {|x| x }
2012-05-30 12:36:29 -05:00
}