Skip to content

Commit 484bb82

Browse files
committed
Merge pull request #170 from schaeferd/triples-to-stream
adds triple-to-stream in order to process viaf-dump data for entityfacts
2 parents bbe340c + 55fc144 commit 484bb82

File tree

2 files changed

+52
-1
lines changed

2 files changed

+52
-1
lines changed
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/**
2+
*
3+
*/
4+
package org.culturegraph.mf.stream.converter;
5+
6+
import org.culturegraph.mf.formeta.parser.FormetaParser;
7+
import org.culturegraph.mf.formeta.parser.PartialRecordEmitter;
8+
import org.culturegraph.mf.framework.DefaultObjectPipe;
9+
import org.culturegraph.mf.framework.StreamReceiver;
10+
import org.culturegraph.mf.framework.annotations.Description;
11+
import org.culturegraph.mf.framework.annotations.In;
12+
import org.culturegraph.mf.framework.annotations.Out;
13+
import org.culturegraph.mf.types.Triple;
14+
import org.culturegraph.mf.types.Triple.ObjectType;
15+
16+
/**
17+
* @author schaeferd
18+
*
19+
*/
20+
@Description("Converts a triple into a record stream")
21+
@In(Triple.class)
22+
@Out(StreamReceiver.class)
23+
public final class TriplesToStream extends
24+
DefaultObjectPipe<Triple, StreamReceiver> {
25+
26+
private final FormetaParser parser = new FormetaParser();
27+
private final PartialRecordEmitter emitter = new PartialRecordEmitter();
28+
29+
public TriplesToStream() {
30+
parser.setEmitter(emitter);
31+
}
32+
33+
public void process(final Triple triple) {
34+
getReceiver().startRecord(triple.getSubject());
35+
if(triple.getObjectType() == ObjectType.STRING){
36+
getReceiver().literal(triple.getPredicate(), triple.getObject());
37+
}else if (triple.getObjectType() == ObjectType.ENTITY){
38+
emitter.setDefaultName(triple.getPredicate());
39+
parser.parse(triple.getObject());
40+
}else{
41+
throw new UnsupportedOperationException(triple.getObjectType() + " can not yet be decoded");
42+
}
43+
getReceiver().endRecord();
44+
}
45+
46+
@Override
47+
protected void onSetReceiver() {
48+
emitter.setReceiver(getReceiver());
49+
}
50+
51+
}

src/main/resources/flux-commands.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ count-triples org.culturegraph.mf.stream.pipe.sort.TripleCount
1515
collect-triples org.culturegraph.mf.stream.pipe.sort.TripleCollect
1616
stream-to-triples org.culturegraph.mf.stream.converter.StreamToTriples
1717
filter-triples org.culturegraph.mf.stream.pipe.TripleFilter
18-
18+
triples-to-stream org.culturegraph.mf.stream.converter.TriplesToStream
1919
calculate-metrics org.culturegraph.mf.stream.pipe.stat.CooccurrenceMetricCalculator
2020

2121
jscript org.culturegraph.mf.stream.pipe.JScriptObjectPipe

0 commit comments

Comments
 (0)