aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2023-12-10 14:46:20 -0500
committersotech117 <michael_foiani@brown.edu>2023-12-10 14:46:20 -0500
commite1f0edd5c3fb32ce1fd5436b9653a7ebdcc14edf (patch)
treedfa869e0edc65bad73c842ee0e264053d39b266d
parentcb491e82b5ce3dcb7e3c41973a46cb7dcbaa9008 (diff)
1st iteration
-rw-r--r--InvertedIndex.java18
-rw-r--r--WordCount.java24
-rw-r--r--mapreduce/MapReduce.java28
3 files changed, 57 insertions, 13 deletions
diff --git a/InvertedIndex.java b/InvertedIndex.java
index bc75ba6..b2e1fd0 100644
--- a/InvertedIndex.java
+++ b/InvertedIndex.java
@@ -1,3 +1,4 @@
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -12,12 +13,15 @@ public class InvertedIndex {
inputs.add(new Document(filename, FileParser.parse(filename)));
}
// TODO: instantiate a MapReduce object with correct input, key, value, and output types
+ MapReduce<Document, String, String, List<String>> mapReduce = new MapReduce<>();
// TODO: set the mapper and reducer suppliers, and set the inputs
+ mapReduce.setMapperSupplier(Mapper::new);
+ mapReduce.setReducerSupplier(Reducer::new);
+ mapReduce.setInput(inputs);
// TODO: execute the MapReduce object and return the result
-
- throw new UnsupportedOperationException("InvertedIndex.run() not implemented yet.");
+ return mapReduce.call();
}
class Document {
@@ -38,7 +42,11 @@ public class InvertedIndex {
@Override
public Map<String, String> compute() {
// TODO: implement the Map function for inverted index
- throw new UnsupportedOperationException("InvertedIndex map function not implemented yet.");
+ Map<String, String> map = new HashMap<>();
+ for (String word : input.words) {
+ map.put(word, input.name);
+ }
+ return map;
}
}
@@ -49,7 +57,9 @@ public class InvertedIndex {
@Override
public List<String> compute() {
// TODO: implement the Reduce function for inverted index
- throw new UnsupportedOperationException("InvertedIndex reduce function not implemented yet.");
+ List<String> list = new LinkedList<>();
+ list.addAll(valueList);
+ return list;
}
}
diff --git a/WordCount.java b/WordCount.java
index 3c224d4..feb1b3c 100644
--- a/WordCount.java
+++ b/WordCount.java
@@ -1,3 +1,4 @@
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -20,32 +21,43 @@ public class WordCount {
List<List<String>> inputs = FileParser.split(text, numSplites);
// TODO: instantiate a MapReduce object with correct input, key, value, and output types
+ MapReduce<List<String>, String, Long, Long> mapReduce = new MapReduce<>();
// TODO: set the mapper and reducer suppliers, and set the inputs
+ mapReduce.setMapperSupplier(Mapper::new);
+ mapReduce.setReducerSupplier(Reducer::new);
+ mapReduce.setInput(inputs);
// TODO: execute the MapReduce object and return the result
-
- throw new UnsupportedOperationException("WordCount.run() not implemented yet.");
+ return mapReduce.call();
}
- class Mapper
+ static class Mapper
extends mapreduce.Mapper<List<String>, String, Long> {
@Override
public Map<String, Long> compute() {
// TODO: implement the Map function for word count
- throw new UnsupportedOperationException("WordCount map function not implemented yet.");
+ Map<String, Long> map = new HashMap<>();
+ for (String word : input) {
+ map.merge(word, 1L, Long::sum);
+ }
+ return map;
}
}
- class Reducer
+ static class Reducer
extends mapreduce.Reducer<String, Long, Long> {
@Override
public Long compute() {
// TODO: implement the Reduce function for word count
- throw new UnsupportedOperationException("WordCount reduce function not implemented yet.");
+ long count = 0;
+ for (Long value : valueList) {
+ count += value;
+ }
+ return count;
}
}
diff --git a/mapreduce/MapReduce.java b/mapreduce/MapReduce.java
index f993f09..aebd4a3 100644
--- a/mapreduce/MapReduce.java
+++ b/mapreduce/MapReduce.java
@@ -44,20 +44,42 @@ public class MapReduce<IN, K, V, OUT>
Set<Mapper<IN, K, V>> mappers = new HashSet<>();
// TODO: for each element in inputList, create a mapper thread, set its input, and execute it
// helpful methods: Supplier.get() ForkJoinPool.execute()
-
+ for (IN input : inputList) {
+ Mapper<IN, K, V> mapper = mapperSupplier.get();
+ mapper.setInput(input);
+ pool.execute(mapper);
+ mappers.add(mapper);
+ }
+
Map<K, List<V>> mapResults = new HashMap<>();
// TODO: for each mapper thread, join and merge the results
// helpful methods: ForkJoinTask.join()
+ for (Mapper<IN, K, V> mapper : mappers) {
+ Map<K, V> map = mapper.join();
+ for (K key : map.keySet()) {
+ mapResults.putIfAbsent(key, new LinkedList<>());
+ mapResults.get(key).add(map.get(key));
+ }
+ }
Map<K, Reducer<K, V, OUT>> reducers = new HashMap<>();
// TODO: for each key in the mappers'results, create a reducer thread, set its input, and execute it
// helpful methods: Supplier.get() ForkJoinPool.execute()
+ mapResults.forEach((key, value) -> {
+ Reducer<K, V, OUT> reducer = reducerSupplier.get();
+ reducer.setInput(key, value);
+ pool.execute(reducer);
+ reducers.put(key, reducer);
+ });
Map<K, OUT> result = new HashMap<>();
// TODO: for each reducer thread, join and merge the result to the final result
// helpful methods: ForkJoinTask.join()
-
- throw new UnsupportedOperationException("MapReduce.call() not implemented yet.");
+ reducers.forEach((key, reducer) -> {
+ result.put(key, reducer.join());
+ });
+
+ return result;
}
/**