-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRelevanceAnalyzer.java
More file actions
100 lines (89 loc) · 4.17 KB
/
RelevanceAnalyzer.java
File metadata and controls
100 lines (89 loc) · 4.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package utils;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.json.JSONObject;
/**
* Compute Relevance of Query to all documents
*/
public class RelevanceAnalyzer extends Configured implements Tool {
public static final String QUERY = "query_text";
public static class RelevanceMapper
extends Mapper<Object, Text, DoubleWritable, Text> {
/**
* Map Function multiplies Query TF/IDF vector by each Document TF/IDF vector
*
* @param key - default key
* @param document - json document
* @param context - store (rank, doc_id) to allow shuffle stage automatically sort by rank
*/
public void map(Object key, Text document, Context context) throws IOException, InterruptedException {
StringTokenizer words = new StringTokenizer(document.toString(), "\t");
Text docId = new Text(words.nextToken());
String ww = words.nextToken();
JSONObject docVect = new JSONObject(ww);
JSONObject queryVec = new JSONObject(context.getConfiguration().get(QUERY));
double res = 0;
Iterator<String> keys = queryVec.keys();
while (keys.hasNext()) {
String qKey = keys.next();
if (docVect.has(qKey))
res += Double.parseDouble(docVect.get(qKey).toString()) * Double.parseDouble(queryVec.get(qKey).toString());
}
context.write(new DoubleWritable(res * -1), new Text(docId));
}
}
public static class RelevanceReducer
extends Reducer<DoubleWritable, Text, Text, DoubleWritable> {
/**
* Reduce class just puts to store doc ids sorted by relevance (rank)
*
* @param Rank - rank
* @param values - document id
* @param context - store
*/
public void reduce(DoubleWritable Rank, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
Text docId = new Text(values.iterator().next().toString());
context.write(docId, new DoubleWritable(Rank.get() * -1));
}
}
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "relevance analyzer");
job.setJarByClass(RelevanceAnalyzer.class);
job.setMapperClass(RelevanceMapper.class);
job.setReducerClass(RelevanceReducer.class);
job.setOutputKeyClass(DoubleWritable.class);
job.setOutputValueClass(Text.class);
FileSystem fs = FileSystem.get(getConf());
Path out = new Path(Paths.RELV_OUT);
if ((fs.exists(out) & !fs.delete(out, true)) | (fs.exists(new Path(Paths.QUERY_OUT)) & !fs.delete(new Path(Paths.QUERY_OUT), true))) {
System.out.println("Remove already existing output directory (output/relevance, output/query-out) first, automatic remove failed");
System.exit(-1);
}
FileInputFormat.addInputPath(job, new Path(Paths.RELV_IN1));
FileOutputFormat.setOutputPath(job, new Path(Paths.RELV_OUT));
job.getConfiguration().set(QUERY, QueryVectorizer.queryToVector(args, job.getConfiguration()));
int k = job.waitForCompletion(true) ? 0 : 1;
ContentExtractor.run(args, getConf());
return k;
}
public static void main(String[] args) throws Exception {
int resultOfJob = ToolRunner.run(new RelevanceAnalyzer(), args);
System.exit(resultOfJob);
}
}