diff options
author | github-classroom[bot] <66690702+github-classroom[bot]@users.noreply.github.com> | 2023-12-10 19:07:21 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-10 19:07:21 +0000 |
commit | cb491e82b5ce3dcb7e3c41973a46cb7dcbaa9008 (patch) | |
tree | c9c6a81111803facc4e3b677e394495cea696bc0 |
Initial commit
-rw-r--r-- | .gitignore | 41 | ||||
-rw-r--r-- | FileParser.java | 56 | ||||
-rw-r--r-- | InvertedIndex.java | 56 | ||||
-rw-r--r-- | Makefile | 10 | ||||
-rw-r--r-- | Test.java | 7 | ||||
-rw-r--r-- | WordCount.java | 52 | ||||
-rw-r--r-- | mapreduce/MapReduce.java | 84 | ||||
-rw-r--r-- | mapreduce/Mapper.java | 22 | ||||
-rw-r--r-- | mapreduce/Reducer.java | 26 |
9 files changed, 354 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4dd9473 --- /dev/null +++ b/.gitignore @@ -0,0 +1,41 @@ +# Compiled class file +*.class + +# Log files +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.ear + +# Maven # +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties + +# Gradle # +bin/ +build/ +.gradle/ +!gradle/wrapper/gradle-wrapper.jar + +# IntelliJ IDEA # +*.iml +*.ipr +*.iws +.idea/ +out/ +gen/
\ No newline at end of file diff --git a/FileParser.java b/FileParser.java new file mode 100644 index 0000000..c8ede97 --- /dev/null +++ b/FileParser.java @@ -0,0 +1,56 @@ +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class FileParser { + + static List<String> readLines(String fileName) { + List<String> lines = new LinkedList<>(); + try { + BufferedReader reader = new BufferedReader(new FileReader(fileName)); + String line = reader.readLine(); + while (line != null) { + lines.add(line); + line = reader.readLine(); + } + reader.close(); + } catch (IOException ex) { + Logger.getLogger(WordCount.class + .getName()).log(Level.SEVERE, null, ex); + System.exit(0x1); + } + return lines; + } + + static List<String> parse(String fileName) { + List<String> lines = readLines(fileName); + List<String> words = new LinkedList<>(); + lines.stream().forEach((line) -> { + words.addAll( + Arrays.asList( + line.toLowerCase() + .split("\\W|\\d|_"))); + }); + return words; + } + + static List<List<String>> split(List<String> text, int parts) { + List<List<String>> inputs = new ArrayList<>(parts); + int size = text.size(); + int div = size / parts; + int rem = size % parts; + int cur = 0; + for (int i = 0; i < parts; i++) { + int step = (rem-- > 0) ? div + 1 : div; + inputs.add(text.subList(cur, cur + step)); + cur += step; + } + return inputs; + } +} diff --git a/InvertedIndex.java b/InvertedIndex.java new file mode 100644 index 0000000..bc75ba6 --- /dev/null +++ b/InvertedIndex.java @@ -0,0 +1,56 @@ +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import mapreduce.MapReduce; + +public class InvertedIndex { + + public Map<String, List<String>> run(List<String> filenames) { + List<Document> inputs = new LinkedList<>(); + for (String filename : filenames) { + inputs.add(new Document(filename, FileParser.parse(filename))); + } + // TODO: instantiate a MapReduce object with correct input, key, value, and output types + + // TODO: set the mapper and reducer suppliers, and set the inputs + + // TODO: execute the MapReduce object and return the result + + throw new UnsupportedOperationException("InvertedIndex.run() not implemented yet."); + } + + class Document { + + String name; + List<String> words; + + public Document(String name, List<String> words) { + this.name = name; + this.words = words; + } + + } + + class Mapper + extends mapreduce.Mapper<Document, String, String> { + + @Override + public Map<String, String> compute() { + // TODO: implement the Map function for inverted index + throw new UnsupportedOperationException("InvertedIndex map function not implemented yet."); + } + + } + + class Reducer + extends mapreduce.Reducer<String, String, List<String>> { + + @Override + public List<String> compute() { + // TODO: implement the Reduce function for inverted index + throw new UnsupportedOperationException("InvertedIndex reduce function not implemented yet."); + } + } + +} diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..28004b7 --- /dev/null +++ b/Makefile @@ -0,0 +1,10 @@ +JAVAFILES = Test.java WordCount.java InvertedIndex.java FileParser.java mapreduce/MapReduce.java mapreduce/Mapper.java mapreduce/Reducer.java +JFLAGS = java -ea +OUTDIR = out + +run: $(JAVAFILES) + javac -d $(OUTDIR) $(JAVAFILES) + $(JFLAGS) -cp $(OUTDIR) Test + +clean: + rm -rf $(OUTDIR)
\ No newline at end of file diff --git a/Test.java b/Test.java new file mode 100644 index 0000000..f61ca95 --- /dev/null +++ b/Test.java @@ -0,0 +1,7 @@ +public class Test { + + public static void main(String[] args) { + // test your WordCount and InvertedIndex program with your own sample input files here + + } +} diff --git a/WordCount.java b/WordCount.java new file mode 100644 index 0000000..3c224d4 --- /dev/null +++ b/WordCount.java @@ -0,0 +1,52 @@ +import java.util.List; +import java.util.Map; + +import mapreduce.MapReduce; + +/** + * + * @author mph + */ +public class WordCount { + + int numSplites; + + public WordCount(int numSplites) { + this.numSplites = numSplites; + } + + public Map<String, Long> run(String filename) { + List<String> text = FileParser.parse(filename); + List<List<String>> inputs = FileParser.split(text, numSplites); + + // TODO: instantiate a MapReduce object with correct input, key, value, and output types + + // TODO: set the mapper and reducer suppliers, and set the inputs + + // TODO: execute the MapReduce object and return the result + + throw new UnsupportedOperationException("WordCount.run() not implemented yet."); + } + + class Mapper + extends mapreduce.Mapper<List<String>, String, Long> { + + @Override + public Map<String, Long> compute() { + // TODO: implement the Map function for word count + throw new UnsupportedOperationException("WordCount map function not implemented yet."); + } + + } + + class Reducer + extends mapreduce.Reducer<String, Long, Long> { + + @Override + public Long compute() { + // TODO: implement the Reduce function for word count + throw new UnsupportedOperationException("WordCount reduce function not implemented yet."); + } + } + +} diff --git a/mapreduce/MapReduce.java b/mapreduce/MapReduce.java new file mode 100644 index 0000000..f993f09 --- /dev/null +++ b/mapreduce/MapReduce.java @@ -0,0 +1,84 @@ +package mapreduce; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Supplier; + +/** + * + * @author mph + * @param <IN> input type + * @param <K> key type + * @param <V> value type + * @param <OUT> output type + */ +public class MapReduce<IN, K, V, OUT> + implements Callable<Map<K, OUT>> { + + private List<IN> inputList; + private Supplier<Mapper<IN, K, V>> mapperSupplier; + private Supplier<Reducer<K, V, OUT>> reducerSupplier; + private static ForkJoinPool pool; + + /** + * + */ + public MapReduce() { + pool = new ForkJoinPool(); + mapperSupplier = () -> { + throw new UnsupportedOperationException("No mapper supplier"); + }; + reducerSupplier = () -> { + throw new UnsupportedOperationException("No reducer supplier"); + }; + } + + @Override + public Map<K, OUT> call() { + Set<Mapper<IN, K, V>> mappers = new HashSet<>(); + // TODO: for each element in inputList, create a mapper thread, set its input, and execute it + // helpful methods: Supplier.get() ForkJoinPool.execute() + + Map<K, List<V>> mapResults = new HashMap<>(); + // TODO: for each mapper thread, join and merge the results + // helpful methods: ForkJoinTask.join() + + Map<K, Reducer<K, V, OUT>> reducers = new HashMap<>(); + // TODO: for each key in the mappers'results, create a reducer thread, set its input, and execute it + // helpful methods: Supplier.get() ForkJoinPool.execute() + + Map<K, OUT> result = new HashMap<>(); + // TODO: for each reducer thread, join and merge the result to the final result + // helpful methods: ForkJoinTask.join() + + throw new UnsupportedOperationException("MapReduce.call() not implemented yet."); + } + + /** + * @param aMapperSupplier construct mapper thread + */ + public void setMapperSupplier(Supplier<Mapper<IN, K, V>> aMapperSupplier) { + mapperSupplier = aMapperSupplier; + } + + /** + * @param aReducerSupplier construct reducer thread + */ + public void setReducerSupplier(Supplier<Reducer<K, V, OUT>> aReducerSupplier) { + reducerSupplier = aReducerSupplier; + } + + /** + * @param anInput the task's set of inputs + */ + public void setInput(List<IN> anInput) { + inputList = anInput; + } + +} diff --git a/mapreduce/Mapper.java b/mapreduce/Mapper.java new file mode 100644 index 0000000..c2da326 --- /dev/null +++ b/mapreduce/Mapper.java @@ -0,0 +1,22 @@ +package mapreduce; + +import java.util.Map; +import java.util.concurrent.RecursiveTask; + +/** + * + * @author mph + * @param <IN> input type + * @param <K> key type + * @param <V> accumulator type + */ +public abstract class Mapper<IN, K, V> extends RecursiveTask<Map<K, V>> { + protected IN input; + + /** + * @param anInput list of input items + */ + public void setInput(IN anInput) { + input = anInput; + } +} diff --git a/mapreduce/Reducer.java b/mapreduce/Reducer.java new file mode 100644 index 0000000..982ddfc --- /dev/null +++ b/mapreduce/Reducer.java @@ -0,0 +1,26 @@ +package mapreduce; + +import java.util.List; +import java.util.concurrent.RecursiveTask; + +/** + * + * @author mph + * @param <K> key + * @param <V> valueListumulator + * @param <OUT> output value + */ +public abstract class Reducer<K, V, OUT> extends RecursiveTask<OUT> { + + protected K key; + protected List<V> valueList; + + /** + * @param aKey key for this reducer + * @param aList list of values + */ + public void setInput(K aKey, List<V> aList) { + key = aKey; + valueList = aList; + } +}; |