35
35
@ In (Triple .class )
36
36
@ Out (StreamReceiver .class )
37
37
public final class TripleCollect extends DefaultObjectPipe <Triple , StreamReceiver > {
38
-
39
- private String currentSubject ;
40
- private final PartialRecordEmitter emitter = new PartialRecordEmitter ();
41
38
private final FormetaParser parser = new FormetaParser ();
39
+ private final PartialRecordEmitter emitter = new PartialRecordEmitter ();
40
+
41
+ private String currentSubject ;
42
42
43
43
public TripleCollect () {
44
44
parser .setEmitter (emitter );
45
45
}
46
-
46
+
47
47
@ Override
48
48
public void process (final Triple triple ) {
49
49
if (currentSubject == null ) {
50
50
currentSubject = triple .getSubject ();
51
51
getReceiver ().startRecord (currentSubject );
52
52
}
53
+
53
54
if (currentSubject .equals (triple .getSubject ())) {
54
55
decodeTriple (triple );
55
56
} else {
@@ -60,14 +61,14 @@ public void process(final Triple triple) {
60
61
}
61
62
}
62
63
63
- private void decodeTriple (final Triple triple ) {
64
+ public void decodeTriple (final Triple triple ) {
64
65
if (triple .getObjectType () == ObjectType .STRING ){
65
66
getReceiver ().literal (triple .getPredicate (), triple .getObject ());
66
- }else {
67
- //getReceiver().startEntity(triple.getPredicate());
67
+ }else if (triple .getObjectType () == ObjectType .ENTITY ){
68
68
emitter .setDefaultName (triple .getPredicate ());
69
69
parser .parse (triple .getObject ());
70
- //getReceiver().endEntity();
70
+ }else {
71
+ throw new UnsupportedOperationException (triple .getObjectType () + " can not yet be decoded" );
71
72
}
72
73
}
73
74
0 commit comments