diff options
author | sotech117 <michael_foiani@brown.edu> | 2023-12-10 14:46:20 -0500 |
---|---|---|
committer | sotech117 <michael_foiani@brown.edu> | 2023-12-10 14:46:20 -0500 |
commit | e1f0edd5c3fb32ce1fd5436b9653a7ebdcc14edf (patch) | |
tree | dfa869e0edc65bad73c842ee0e264053d39b266d | |
parent | cb491e82b5ce3dcb7e3c41973a46cb7dcbaa9008 (diff) |
1st iteration
-rw-r--r-- | InvertedIndex.java | 18 | ||||
-rw-r--r-- | WordCount.java | 24 | ||||
-rw-r--r-- | mapreduce/MapReduce.java | 28 |
3 files changed, 57 insertions, 13 deletions
diff --git a/InvertedIndex.java b/InvertedIndex.java index bc75ba6..b2e1fd0 100644 --- a/InvertedIndex.java +++ b/InvertedIndex.java @@ -1,3 +1,4 @@ +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -12,12 +13,15 @@ public class InvertedIndex { inputs.add(new Document(filename, FileParser.parse(filename))); } // TODO: instantiate a MapReduce object with correct input, key, value, and output types + MapReduce<Document, String, String, List<String>> mapReduce = new MapReduce<>(); // TODO: set the mapper and reducer suppliers, and set the inputs + mapReduce.setMapperSupplier(Mapper::new); + mapReduce.setReducerSupplier(Reducer::new); + mapReduce.setInput(inputs); // TODO: execute the MapReduce object and return the result - - throw new UnsupportedOperationException("InvertedIndex.run() not implemented yet."); + return mapReduce.call(); } class Document { @@ -38,7 +42,11 @@ public class InvertedIndex { @Override public Map<String, String> compute() { // TODO: implement the Map function for inverted index - throw new UnsupportedOperationException("InvertedIndex map function not implemented yet."); + Map<String, String> map = new HashMap<>(); + for (String word : input.words) { + map.put(word, input.name); + } + return map; } } @@ -49,7 +57,9 @@ public class InvertedIndex { @Override public List<String> compute() { // TODO: implement the Reduce function for inverted index - throw new UnsupportedOperationException("InvertedIndex reduce function not implemented yet."); + List<String> list = new LinkedList<>(); + list.addAll(valueList); + return list; } } diff --git a/WordCount.java b/WordCount.java index 3c224d4..feb1b3c 100644 --- a/WordCount.java +++ b/WordCount.java @@ -1,3 +1,4 @@ +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -20,32 +21,43 @@ public class WordCount { List<List<String>> inputs = FileParser.split(text, numSplites); // TODO: instantiate a MapReduce object with correct input, key, value, and output types + MapReduce<List<String>, String, Long, Long> mapReduce = new MapReduce<>(); // TODO: set the mapper and reducer suppliers, and set the inputs + mapReduce.setMapperSupplier(Mapper::new); + mapReduce.setReducerSupplier(Reducer::new); + mapReduce.setInput(inputs); // TODO: execute the MapReduce object and return the result - - throw new UnsupportedOperationException("WordCount.run() not implemented yet."); + return mapReduce.call(); } - class Mapper + static class Mapper extends mapreduce.Mapper<List<String>, String, Long> { @Override public Map<String, Long> compute() { // TODO: implement the Map function for word count - throw new UnsupportedOperationException("WordCount map function not implemented yet."); + Map<String, Long> map = new HashMap<>(); + for (String word : input) { + map.merge(word, 1L, Long::sum); + } + return map; } } - class Reducer + static class Reducer extends mapreduce.Reducer<String, Long, Long> { @Override public Long compute() { // TODO: implement the Reduce function for word count - throw new UnsupportedOperationException("WordCount reduce function not implemented yet."); + long count = 0; + for (Long value : valueList) { + count += value; + } + return count; } } diff --git a/mapreduce/MapReduce.java b/mapreduce/MapReduce.java index f993f09..aebd4a3 100644 --- a/mapreduce/MapReduce.java +++ b/mapreduce/MapReduce.java @@ -44,20 +44,42 @@ public class MapReduce<IN, K, V, OUT> Set<Mapper<IN, K, V>> mappers = new HashSet<>(); // TODO: for each element in inputList, create a mapper thread, set its input, and execute it // helpful methods: Supplier.get() ForkJoinPool.execute() - + for (IN input : inputList) { + Mapper<IN, K, V> mapper = mapperSupplier.get(); + mapper.setInput(input); + pool.execute(mapper); + mappers.add(mapper); + } + Map<K, List<V>> mapResults = new HashMap<>(); // TODO: for each mapper thread, join and merge the results // helpful methods: ForkJoinTask.join() + for (Mapper<IN, K, V> mapper : mappers) { + Map<K, V> map = mapper.join(); + for (K key : map.keySet()) { + mapResults.putIfAbsent(key, new LinkedList<>()); + mapResults.get(key).add(map.get(key)); + } + } Map<K, Reducer<K, V, OUT>> reducers = new HashMap<>(); // TODO: for each key in the mappers'results, create a reducer thread, set its input, and execute it // helpful methods: Supplier.get() ForkJoinPool.execute() + mapResults.forEach((key, value) -> { + Reducer<K, V, OUT> reducer = reducerSupplier.get(); + reducer.setInput(key, value); + pool.execute(reducer); + reducers.put(key, reducer); + }); Map<K, OUT> result = new HashMap<>(); // TODO: for each reducer thread, join and merge the result to the final result // helpful methods: ForkJoinTask.join() - - throw new UnsupportedOperationException("MapReduce.call() not implemented yet."); + reducers.forEach((key, reducer) -> { + result.put(key, reducer.join()); + }); + + return result; } /** |