aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgithub-classroom[bot] <66690702+github-classroom[bot]@users.noreply.github.com>2023-12-10 19:07:21 +0000
committerGitHub <noreply@github.com>2023-12-10 19:07:21 +0000
commitcb491e82b5ce3dcb7e3c41973a46cb7dcbaa9008 (patch)
treec9c6a81111803facc4e3b677e394495cea696bc0
Initial commit
-rw-r--r--.gitignore41
-rw-r--r--FileParser.java56
-rw-r--r--InvertedIndex.java56
-rw-r--r--Makefile10
-rw-r--r--Test.java7
-rw-r--r--WordCount.java52
-rw-r--r--mapreduce/MapReduce.java84
-rw-r--r--mapreduce/Mapper.java22
-rw-r--r--mapreduce/Reducer.java26
9 files changed, 354 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..4dd9473
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,41 @@
+# Compiled class file
+*.class
+
+# Log files
+*.log
+
+# BlueJ files
+*.ctxt
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.ear
+
+# Maven #
+target/
+pom.xml.tag
+pom.xml.releaseBackup
+pom.xml.versionsBackup
+pom.xml.next
+release.properties
+dependency-reduced-pom.xml
+buildNumber.properties
+.mvn/timing.properties
+
+# Gradle #
+bin/
+build/
+.gradle/
+!gradle/wrapper/gradle-wrapper.jar
+
+# IntelliJ IDEA #
+*.iml
+*.ipr
+*.iws
+.idea/
+out/
+gen/ \ No newline at end of file
diff --git a/FileParser.java b/FileParser.java
new file mode 100644
index 0000000..c8ede97
--- /dev/null
+++ b/FileParser.java
@@ -0,0 +1,56 @@
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class FileParser {
+
+ static List<String> readLines(String fileName) {
+ List<String> lines = new LinkedList<>();
+ try {
+ BufferedReader reader = new BufferedReader(new FileReader(fileName));
+ String line = reader.readLine();
+ while (line != null) {
+ lines.add(line);
+ line = reader.readLine();
+ }
+ reader.close();
+ } catch (IOException ex) {
+ Logger.getLogger(WordCount.class
+ .getName()).log(Level.SEVERE, null, ex);
+ System.exit(0x1);
+ }
+ return lines;
+ }
+
+ static List<String> parse(String fileName) {
+ List<String> lines = readLines(fileName);
+ List<String> words = new LinkedList<>();
+ lines.stream().forEach((line) -> {
+ words.addAll(
+ Arrays.asList(
+ line.toLowerCase()
+ .split("\\W|\\d|_")));
+ });
+ return words;
+ }
+
+ static List<List<String>> split(List<String> text, int parts) {
+ List<List<String>> inputs = new ArrayList<>(parts);
+ int size = text.size();
+ int div = size / parts;
+ int rem = size % parts;
+ int cur = 0;
+ for (int i = 0; i < parts; i++) {
+ int step = (rem-- > 0) ? div + 1 : div;
+ inputs.add(text.subList(cur, cur + step));
+ cur += step;
+ }
+ return inputs;
+ }
+}
diff --git a/InvertedIndex.java b/InvertedIndex.java
new file mode 100644
index 0000000..bc75ba6
--- /dev/null
+++ b/InvertedIndex.java
@@ -0,0 +1,56 @@
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import mapreduce.MapReduce;
+
+public class InvertedIndex {
+
+ public Map<String, List<String>> run(List<String> filenames) {
+ List<Document> inputs = new LinkedList<>();
+ for (String filename : filenames) {
+ inputs.add(new Document(filename, FileParser.parse(filename)));
+ }
+ // TODO: instantiate a MapReduce object with correct input, key, value, and output types
+
+ // TODO: set the mapper and reducer suppliers, and set the inputs
+
+ // TODO: execute the MapReduce object and return the result
+
+ throw new UnsupportedOperationException("InvertedIndex.run() not implemented yet.");
+ }
+
+ class Document {
+
+ String name;
+ List<String> words;
+
+ public Document(String name, List<String> words) {
+ this.name = name;
+ this.words = words;
+ }
+
+ }
+
+ class Mapper
+ extends mapreduce.Mapper<Document, String, String> {
+
+ @Override
+ public Map<String, String> compute() {
+ // TODO: implement the Map function for inverted index
+ throw new UnsupportedOperationException("InvertedIndex map function not implemented yet.");
+ }
+
+ }
+
+ class Reducer
+ extends mapreduce.Reducer<String, String, List<String>> {
+
+ @Override
+ public List<String> compute() {
+ // TODO: implement the Reduce function for inverted index
+ throw new UnsupportedOperationException("InvertedIndex reduce function not implemented yet.");
+ }
+ }
+
+}
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..28004b7
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,10 @@
+JAVAFILES = Test.java WordCount.java InvertedIndex.java FileParser.java mapreduce/MapReduce.java mapreduce/Mapper.java mapreduce/Reducer.java
+JFLAGS = java -ea
+OUTDIR = out
+
+run: $(JAVAFILES)
+ javac -d $(OUTDIR) $(JAVAFILES)
+ $(JFLAGS) -cp $(OUTDIR) Test
+
+clean:
+ rm -rf $(OUTDIR) \ No newline at end of file
diff --git a/Test.java b/Test.java
new file mode 100644
index 0000000..f61ca95
--- /dev/null
+++ b/Test.java
@@ -0,0 +1,7 @@
+public class Test {
+
+ public static void main(String[] args) {
+ // test your WordCount and InvertedIndex program with your own sample input files here
+
+ }
+}
diff --git a/WordCount.java b/WordCount.java
new file mode 100644
index 0000000..3c224d4
--- /dev/null
+++ b/WordCount.java
@@ -0,0 +1,52 @@
+import java.util.List;
+import java.util.Map;
+
+import mapreduce.MapReduce;
+
+/**
+ *
+ * @author mph
+ */
+public class WordCount {
+
+ int numSplites;
+
+ public WordCount(int numSplites) {
+ this.numSplites = numSplites;
+ }
+
+ public Map<String, Long> run(String filename) {
+ List<String> text = FileParser.parse(filename);
+ List<List<String>> inputs = FileParser.split(text, numSplites);
+
+ // TODO: instantiate a MapReduce object with correct input, key, value, and output types
+
+ // TODO: set the mapper and reducer suppliers, and set the inputs
+
+ // TODO: execute the MapReduce object and return the result
+
+ throw new UnsupportedOperationException("WordCount.run() not implemented yet.");
+ }
+
+ class Mapper
+ extends mapreduce.Mapper<List<String>, String, Long> {
+
+ @Override
+ public Map<String, Long> compute() {
+ // TODO: implement the Map function for word count
+ throw new UnsupportedOperationException("WordCount map function not implemented yet.");
+ }
+
+ }
+
+ class Reducer
+ extends mapreduce.Reducer<String, Long, Long> {
+
+ @Override
+ public Long compute() {
+ // TODO: implement the Reduce function for word count
+ throw new UnsupportedOperationException("WordCount reduce function not implemented yet.");
+ }
+ }
+
+}
diff --git a/mapreduce/MapReduce.java b/mapreduce/MapReduce.java
new file mode 100644
index 0000000..f993f09
--- /dev/null
+++ b/mapreduce/MapReduce.java
@@ -0,0 +1,84 @@
+package mapreduce;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Supplier;
+
+/**
+ *
+ * @author mph
+ * @param <IN> input type
+ * @param <K> key type
+ * @param <V> value type
+ * @param <OUT> output type
+ */
+public class MapReduce<IN, K, V, OUT>
+ implements Callable<Map<K, OUT>> {
+
+ private List<IN> inputList;
+ private Supplier<Mapper<IN, K, V>> mapperSupplier;
+ private Supplier<Reducer<K, V, OUT>> reducerSupplier;
+ private static ForkJoinPool pool;
+
+ /**
+ *
+ */
+ public MapReduce() {
+ pool = new ForkJoinPool();
+ mapperSupplier = () -> {
+ throw new UnsupportedOperationException("No mapper supplier");
+ };
+ reducerSupplier = () -> {
+ throw new UnsupportedOperationException("No reducer supplier");
+ };
+ }
+
+ @Override
+ public Map<K, OUT> call() {
+ Set<Mapper<IN, K, V>> mappers = new HashSet<>();
+ // TODO: for each element in inputList, create a mapper thread, set its input, and execute it
+ // helpful methods: Supplier.get() ForkJoinPool.execute()
+
+ Map<K, List<V>> mapResults = new HashMap<>();
+ // TODO: for each mapper thread, join and merge the results
+ // helpful methods: ForkJoinTask.join()
+
+ Map<K, Reducer<K, V, OUT>> reducers = new HashMap<>();
+ // TODO: for each key in the mappers'results, create a reducer thread, set its input, and execute it
+ // helpful methods: Supplier.get() ForkJoinPool.execute()
+
+ Map<K, OUT> result = new HashMap<>();
+ // TODO: for each reducer thread, join and merge the result to the final result
+ // helpful methods: ForkJoinTask.join()
+
+ throw new UnsupportedOperationException("MapReduce.call() not implemented yet.");
+ }
+
+ /**
+ * @param aMapperSupplier construct mapper thread
+ */
+ public void setMapperSupplier(Supplier<Mapper<IN, K, V>> aMapperSupplier) {
+ mapperSupplier = aMapperSupplier;
+ }
+
+ /**
+ * @param aReducerSupplier construct reducer thread
+ */
+ public void setReducerSupplier(Supplier<Reducer<K, V, OUT>> aReducerSupplier) {
+ reducerSupplier = aReducerSupplier;
+ }
+
+ /**
+ * @param anInput the task's set of inputs
+ */
+ public void setInput(List<IN> anInput) {
+ inputList = anInput;
+ }
+
+}
diff --git a/mapreduce/Mapper.java b/mapreduce/Mapper.java
new file mode 100644
index 0000000..c2da326
--- /dev/null
+++ b/mapreduce/Mapper.java
@@ -0,0 +1,22 @@
+package mapreduce;
+
+import java.util.Map;
+import java.util.concurrent.RecursiveTask;
+
+/**
+ *
+ * @author mph
+ * @param <IN> input type
+ * @param <K> key type
+ * @param <V> accumulator type
+ */
+public abstract class Mapper<IN, K, V> extends RecursiveTask<Map<K, V>> {
+ protected IN input;
+
+ /**
+ * @param anInput list of input items
+ */
+ public void setInput(IN anInput) {
+ input = anInput;
+ }
+}
diff --git a/mapreduce/Reducer.java b/mapreduce/Reducer.java
new file mode 100644
index 0000000..982ddfc
--- /dev/null
+++ b/mapreduce/Reducer.java
@@ -0,0 +1,26 @@
+package mapreduce;
+
+import java.util.List;
+import java.util.concurrent.RecursiveTask;
+
+/**
+ *
+ * @author mph
+ * @param <K> key
+ * @param <V> valueListumulator
+ * @param <OUT> output value
+ */
+public abstract class Reducer<K, V, OUT> extends RecursiveTask<OUT> {
+
+ protected K key;
+ protected List<V> valueList;
+
+ /**
+ * @param aKey key for this reducer
+ * @param aList list of values
+ */
+ public void setInput(K aKey, List<V> aList) {
+ key = aKey;
+ valueList = aList;
+ }
+};