diff options
Diffstat (limited to 'mapreduce')
-rw-r--r-- | mapreduce/MapReduce.java | 28 |
1 files changed, 25 insertions, 3 deletions
diff --git a/mapreduce/MapReduce.java b/mapreduce/MapReduce.java index f993f09..aebd4a3 100644 --- a/mapreduce/MapReduce.java +++ b/mapreduce/MapReduce.java @@ -44,20 +44,42 @@ public class MapReduce<IN, K, V, OUT> Set<Mapper<IN, K, V>> mappers = new HashSet<>(); // TODO: for each element in inputList, create a mapper thread, set its input, and execute it // helpful methods: Supplier.get() ForkJoinPool.execute() - + for (IN input : inputList) { + Mapper<IN, K, V> mapper = mapperSupplier.get(); + mapper.setInput(input); + pool.execute(mapper); + mappers.add(mapper); + } + Map<K, List<V>> mapResults = new HashMap<>(); // TODO: for each mapper thread, join and merge the results // helpful methods: ForkJoinTask.join() + for (Mapper<IN, K, V> mapper : mappers) { + Map<K, V> map = mapper.join(); + for (K key : map.keySet()) { + mapResults.putIfAbsent(key, new LinkedList<>()); + mapResults.get(key).add(map.get(key)); + } + } Map<K, Reducer<K, V, OUT>> reducers = new HashMap<>(); // TODO: for each key in the mappers'results, create a reducer thread, set its input, and execute it // helpful methods: Supplier.get() ForkJoinPool.execute() + mapResults.forEach((key, value) -> { + Reducer<K, V, OUT> reducer = reducerSupplier.get(); + reducer.setInput(key, value); + pool.execute(reducer); + reducers.put(key, reducer); + }); Map<K, OUT> result = new HashMap<>(); // TODO: for each reducer thread, join and merge the result to the final result // helpful methods: ForkJoinTask.join() - - throw new UnsupportedOperationException("MapReduce.call() not implemented yet."); + reducers.forEach((key, reducer) -> { + result.put(key, reducer.join()); + }); + + return result; } /** |