Skip to content

Commit 4ed039b

Browse files
author
Jorge Ejarque
committed
First implementation of the data processor
1 parent 0947ab1 commit 4ed039b

File tree

8 files changed

+361
-0
lines changed

8 files changed

+361
-0
lines changed

pom.xml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>es.jea.processor</groupId>
8+
<artifactId>paralle_data_processor</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<properties>
12+
<maven.compiler.source>11</maven.compiler.source>
13+
<maven.compiler.target>11</maven.compiler.target>
14+
</properties>
15+
16+
<build>
17+
<plugins>
18+
<plugin>
19+
<groupId>org.apache.maven.plugins</groupId>
20+
<artifactId>maven-jar-plugin</artifactId>
21+
<configuration>
22+
<archive>
23+
<manifest>
24+
<addClasspath>true</addClasspath>
25+
<mainClass>es.jea.processor.DataProcessingFramework</mainClass>
26+
</manifest>
27+
</archive>
28+
</configuration>
29+
</plugin>
30+
</plugins>
31+
</build>
32+
33+
</project>

run.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/bash
2+
3+
java -jar target/paralle_data_processor-1.0-SNAPSHOT.jar es.jea.processor.DataProcessingFramework $@
4+
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package es.jea.processor;
2+
3+
import java.io.BufferedReader;
4+
import java.io.FileNotFoundException;
5+
import java.io.FileReader;
6+
import java.io.IOException;
7+
import java.util.AbstractMap;
8+
import java.util.Map.Entry;
9+
10+
public class CSVDataReader implements DataReader{
11+
private static final String DEFAULT_SEPARATOR = ",";
12+
private String filename;
13+
private String separator;
14+
private boolean open = false;
15+
private boolean close = false;
16+
private BufferedReader br;
17+
18+
public CSVDataReader(String filename, String separator){
19+
this.filename = filename;
20+
this.separator = separator;
21+
}
22+
23+
public CSVDataReader(String filename){
24+
this.filename = filename;
25+
this.separator = DEFAULT_SEPARATOR;
26+
}
27+
28+
29+
@Override
30+
public void open() {
31+
try {
32+
br = new BufferedReader(new FileReader(filename));
33+
br.readLine(); //read the header line
34+
} catch (FileNotFoundException e) {
35+
System.err.println("File " + filename + " not found");
36+
} catch (IOException e) {
37+
System.err.println("Exception reading file " + filename);
38+
e.printStackTrace();
39+
}
40+
open = true;
41+
42+
43+
}
44+
45+
@Override
46+
public Entry<Integer, Float> readValues() {
47+
if (open && !close){
48+
try{
49+
String line;
50+
synchronized(br){
51+
line = br.readLine();
52+
}
53+
return getValuesFromLine(line);
54+
55+
} catch (IOException e) {
56+
System.err.println("Exception reading file " + filename);
57+
e.printStackTrace();
58+
return null;
59+
}
60+
}else {
61+
System.err.println("Reader could not read values because it is not open or already closed");
62+
return null;
63+
}
64+
}
65+
66+
private Entry<Integer, Float> getValuesFromLine(String line){
67+
68+
if (line != null){
69+
String[] values = line.split(separator);
70+
71+
int intValue = Integer.parseInt(values[0]);
72+
float floatValue = Float.parseFloat(values[1]);
73+
74+
return new AbstractMap.SimpleEntry<>(intValue, floatValue);
75+
} else {
76+
return null;
77+
}
78+
79+
}
80+
81+
@Override
82+
public void close() {
83+
try{
84+
br.close();
85+
} catch (IOException e) {
86+
System.err.println("Exception closing data reader");
87+
e.printStackTrace();
88+
}
89+
open = false;
90+
close = true;
91+
92+
}
93+
94+
95+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package es.jea.processor;
2+
3+
import java.io.BufferedWriter;
4+
import java.io.FileWriter;
5+
import java.io.IOException;
6+
7+
public class CSVResultsWriter implements ResultWriter {
8+
9+
private static final String DEFAULT_SEPARATOR = ",";
10+
private String filename;
11+
private String separator;
12+
private boolean open = false;
13+
private boolean close = false;
14+
private BufferedWriter bw;
15+
16+
public CSVResultsWriter(String filename, String separator){
17+
this.filename = filename;
18+
this.separator = separator;
19+
}
20+
21+
public CSVResultsWriter(String filename){
22+
this.filename = filename;
23+
this.separator = DEFAULT_SEPARATOR;
24+
}
25+
26+
@Override
27+
public void open() {
28+
try {
29+
bw = new BufferedWriter(new FileWriter(filename, true));
30+
bw.write("#sum"+separator+"#prod\n"); //write header line
31+
} catch (IOException e) {
32+
System.err.println("Exception reading file " + filename);
33+
e.printStackTrace();
34+
}
35+
open = true;
36+
}
37+
38+
39+
@Override
40+
public void writeValues(float sum, float prod) throws Exception {
41+
if (open && !close){
42+
String line = sum + "," + prod + "\n";
43+
synchronized(bw){
44+
bw.write(line);
45+
}
46+
}else{
47+
throw new Exception("ResultsWriter for " + filename + " is not open or is already closed");
48+
}
49+
}
50+
51+
@Override
52+
public void close() {
53+
try{
54+
bw.close();
55+
} catch (IOException e) {
56+
System.err.println("Exception closing data reader");
57+
e.printStackTrace();
58+
}
59+
open = false;
60+
close = true;
61+
}
62+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package es.jea.processor;
2+
3+
import java.util.concurrent.ExecutorService;
4+
import java.util.concurrent.Executors;
5+
import java.util.concurrent.TimeUnit;
6+
7+
/**
8+
* Main class of the Data Processing function-
9+
*/
10+
public class DataProcessingFramework {
11+
private static final long TIMEOUT=20L;
12+
private ExecutorService executorService;
13+
private DataReader reader;
14+
private ResultWriter writer;
15+
private int threads;
16+
/**
17+
* Class Constructor.
18+
*
19+
* @param reader Reader to get the values to process.
20+
* @param writer Writer where to store the results.
21+
* @param threads Number of threads to perform the parallel processing.
22+
*/
23+
public DataProcessingFramework(DataReader reader, ResultWriter writer, int threads){
24+
this.reader = reader;
25+
this.writer = writer;
26+
this.threads = threads;
27+
executorService = Executors.newFixedThreadPool(threads);
28+
}
29+
30+
/**
31+
* Start the processing of Data.
32+
*/
33+
public void start(){
34+
reader.open();
35+
writer.open();
36+
for (int i = 0; i < this.threads; i++ ){
37+
executorService.execute(new Processor(reader, writer));
38+
}
39+
}
40+
41+
42+
/**
43+
* Wait to the end of the computation.
44+
*
45+
* @throws InterruptedException
46+
*/
47+
public void awaitTermination() throws InterruptedException{
48+
executorService.shutdown();
49+
while (!executorService.isTerminated()){
50+
executorService.awaitTermination(TIMEOUT, TimeUnit.SECONDS);
51+
}
52+
53+
}
54+
55+
/**
56+
* Executed the data processing funtion:
57+
* @param args Array of arguments expected: InputFile, OutputFile, number of threads.
58+
*/
59+
public static void main(String[] args) {
60+
if (args.length != 4){
61+
System.err.println("Incorrect number of arguments <inputFile> <outputFile> <numThreads>");
62+
System.exit(1);
63+
}
64+
String inputFile = args[1];
65+
String outputFile = args[2];
66+
int threads = Integer.parseInt(args[3]);
67+
DataProcessingFramework dpf = new DataProcessingFramework(new CSVDataReader(inputFile), new CSVResultsWriter(outputFile), threads );
68+
System.out.println("Starting processing of " + inputFile);
69+
dpf.start();
70+
try {
71+
dpf.awaitTermination();
72+
} catch (InterruptedException e) {
73+
System.err.println("Executor interrupted");
74+
}
75+
System.out.println("Processing finished");
76+
}
77+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package es.jea.processor;
2+
3+
import java.util.Map.Entry;
4+
/**
5+
* Interface for implementing a Data reader
6+
*/
7+
public interface DataReader {
8+
/**
9+
* Open the reader.
10+
*/
11+
public void open();
12+
13+
/**
14+
* Read the values from the reader
15+
* @return Espected data as a tuple of Integer and Float values. Nulll if no more data to read.
16+
*/
17+
public Entry<Integer, Float> readValues();
18+
19+
/**
20+
* Close de reader.
21+
*/
22+
public void close();
23+
24+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package es.jea.processor;
2+
3+
import java.util.Map.Entry;
4+
5+
/**
6+
* Thread that performs the data processing
7+
*/
8+
public class Processor implements Runnable{
9+
10+
private DataReader reader;
11+
private ResultWriter writer;
12+
13+
public Processor (DataReader reader, ResultWriter writer){
14+
this.reader = reader;
15+
this.writer = writer;
16+
}
17+
18+
19+
@Override
20+
public void run() {
21+
// Read values form the reader and perform the computations
22+
// until no data is read.
23+
try{
24+
Entry<Integer, Float> pair = reader.readValues();
25+
while (pair != null){ // null indicates no more data to read
26+
writer.writeValues(sumValues(pair), multiplyValues(pair));
27+
pair = reader.readValues();
28+
}
29+
}catch (Exception e){
30+
31+
}
32+
}
33+
34+
private Float sumValues(Entry<Integer, Float> pair){
35+
return pair.getKey() + pair.getValue();
36+
}
37+
38+
private Float multiplyValues(Entry<Integer, Float> pair){
39+
return pair.getKey() * pair.getValue();
40+
}
41+
42+
43+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package es.jea.processor;
2+
/**
3+
* Interface to store the computation results
4+
*/
5+
public interface ResultWriter {
6+
/**
7+
* Open the writer.
8+
*/
9+
public void open();
10+
11+
/**
12+
* Writes the computations results.
13+
* @param sum Result of the sum.
14+
* @param prod Result of the product.
15+
* @throws Exception If reader is not open or already close or other problem while reading.
16+
*/
17+
public void writeValues(float sum, float prod) throws Exception;
18+
19+
/**
20+
* Close the writer.
21+
*/
22+
public void close();
23+
}

0 commit comments

Comments
 (0)