Skip to content

Commit fadbe06

Browse files
author
mgeipel
committed
fixed #45
1 parent 844aa74 commit fadbe06

File tree

1 file changed

+107
-107
lines changed

1 file changed

+107
-107
lines changed

src/main/java/org/culturegraph/mf/stream/pipe/OreAggregationAdder.java

Lines changed: 107 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package org.culturegraph.mf.stream.pipe;
17-
16+
package org.culturegraph.mf.stream.pipe;
17+
1818
import java.util.Deque;
1919
import java.util.LinkedList;
2020
import java.util.List;
@@ -29,108 +29,108 @@
2929
import org.culturegraph.mf.framework.annotations.Out;
3030
import org.culturegraph.mf.types.ListMap;
3131
import org.culturegraph.mf.util.ResourceUtil;
32-
33-
34-
35-
/**
36-
* adds ore:Aggregation to an Europeana Data Model stream
37-
*
38-
* @author Markus Michael Geipel
39-
*
40-
*/
41-
@Description("adds ore:Aggregation to an Europeana Data Model stream. The aggregation id is set by emitting literal('aggregation_id', id)")
42-
@In(StreamReceiver.class)
43-
@Out(StreamReceiver.class)
44-
public final class OreAggregationAdder extends DefaultStreamPipe<StreamReceiver> {
45-
46-
private static final String RDF_ABOUT = "~rdf:about";
47-
private static final ListMap<String, String> AGGREGATED_ENTITIES = new ListMap<String, String>();
48-
private static final String ORE_AGGREGATION_PROPERTIES = "ore-aggregation.properties";
49-
private static final String ORE_AGGREGATION = "ore:Aggregation";
50-
private static final String AGGREGATION_ID = "aggregation_id";
51-
private static final String RDF_REFERENCE = "~rdf:reference";
52-
private static final Pattern SPLIT_PATTERN = Pattern.compile("\\s*,\\s*");
53-
private final Deque<String> entityStack = new LinkedList<String>();
54-
private final ListMap<String, String> aggregation = new ListMap<String, String>();
55-
private String aggregationId;
56-
57-
static {
58-
final Properties properties = ResourceUtil.loadProperties(ORE_AGGREGATION_PROPERTIES);
59-
for (Entry<Object, Object> entry : properties.entrySet()) {
60-
final String[] parts = SPLIT_PATTERN.split(entry.getValue().toString());
61-
final String name = entry.getKey().toString();
62-
for (String value : parts) {
63-
AGGREGATED_ENTITIES.add(name, value);
64-
}
65-
}
66-
}
67-
68-
@Override
69-
public void startRecord(final String identifier) {
70-
entityStack.clear();
71-
aggregationId = identifier;
72-
getReceiver().startRecord(identifier);
73-
}
74-
75-
@Override
76-
public void endRecord() {
77-
writeAggregation();
78-
aggregation.clear();
79-
getReceiver().endRecord();
80-
}
81-
82-
private void writeAggregation() {
83-
if (!aggregation.isEmpty()) {
84-
final StreamReceiver receiver = getReceiver();
85-
receiver.startEntity(ORE_AGGREGATION);
86-
receiver.literal(RDF_ABOUT, aggregationId);
87-
for (Entry<String, List<String>> entry : aggregation.entrySet()) {
88-
final String key = entry.getKey();
89-
if (AGGREGATED_ENTITIES.containsKey(key)) {
90-
91-
for (String entity : AGGREGATED_ENTITIES.get(key)) {
92-
for (String value : entry.getValue()) {
93-
receiver.startEntity(entity);
94-
receiver.literal(RDF_REFERENCE, value);
95-
receiver.endEntity();
96-
}
97-
}
98-
} else {
99-
for (String value : entry.getValue()) {
100-
receiver.literal(key, value);
101-
}
102-
}
103-
}
104-
receiver.endEntity();
105-
}
106-
}
107-
108-
@Override
109-
public void startEntity(final String name) {
110-
entityStack.push(name);
111-
getReceiver().startEntity(name);
112-
}
113-
114-
@Override
115-
public void endEntity() {
116-
entityStack.pop();
117-
getReceiver().endEntity();
118-
}
119-
120-
@Override
121-
public void literal(final String name, final String value) {
122-
if (entityStack.isEmpty()) {
123-
if (AGGREGATION_ID.equals(name)) {
124-
aggregationId = value;
125-
} else {
126-
aggregation.add(name, value);
127-
}
128-
return;
129-
}
130-
131-
if (entityStack.size()==1 && RDF_ABOUT.equals(name) && AGGREGATED_ENTITIES.containsKey(entityStack.peek())) {
132-
aggregation.add(entityStack.peek(), value);
133-
}
134-
getReceiver().literal(name, value);
135-
}
136-
}
32+
33+
34+
35+
/**
36+
* adds ore:Aggregation to an Europeana Data Model stream
37+
*
38+
* @author Markus Michael Geipel
39+
*
40+
*/
41+
@Description("adds ore:Aggregation to an Europeana Data Model stream. The aggregation id is set by emitting literal('aggregation_id', id)")
42+
@In(StreamReceiver.class)
43+
@Out(StreamReceiver.class)
44+
public final class OreAggregationAdder extends DefaultStreamPipe<StreamReceiver> {
45+
46+
private static final String RDF_ABOUT = "~rdf:about";
47+
private static final ListMap<String, String> AGGREGATED_ENTITIES = new ListMap<String, String>();
48+
private static final String ORE_AGGREGATION_PROPERTIES = "ore-aggregation.properties";
49+
private static final String ORE_AGGREGATION = "ore:Aggregation";
50+
private static final String AGGREGATION_ID = "aggregation_id";
51+
private static final String RDF_REFERENCE = "~rdf:resource";
52+
private static final Pattern SPLIT_PATTERN = Pattern.compile("\\s*,\\s*");
53+
private final Deque<String> entityStack = new LinkedList<String>();
54+
private final ListMap<String, String> aggregation = new ListMap<String, String>();
55+
private String aggregationId;
56+
57+
static {
58+
final Properties properties = ResourceUtil.loadProperties(ORE_AGGREGATION_PROPERTIES);
59+
for (Entry<Object, Object> entry : properties.entrySet()) {
60+
final String[] parts = SPLIT_PATTERN.split(entry.getValue().toString());
61+
final String name = entry.getKey().toString();
62+
for (String value : parts) {
63+
AGGREGATED_ENTITIES.add(name, value);
64+
}
65+
}
66+
}
67+
68+
@Override
69+
public void startRecord(final String identifier) {
70+
entityStack.clear();
71+
aggregationId = identifier;
72+
getReceiver().startRecord(identifier);
73+
}
74+
75+
@Override
76+
public void endRecord() {
77+
writeAggregation();
78+
aggregation.clear();
79+
getReceiver().endRecord();
80+
}
81+
82+
private void writeAggregation() {
83+
if (!aggregation.isEmpty()) {
84+
final StreamReceiver receiver = getReceiver();
85+
receiver.startEntity(ORE_AGGREGATION);
86+
receiver.literal(RDF_ABOUT, aggregationId);
87+
for (Entry<String, List<String>> entry : aggregation.entrySet()) {
88+
final String key = entry.getKey();
89+
if (AGGREGATED_ENTITIES.containsKey(key)) {
90+
91+
for (String entity : AGGREGATED_ENTITIES.get(key)) {
92+
for (String value : entry.getValue()) {
93+
receiver.startEntity(entity);
94+
receiver.literal(RDF_REFERENCE, value);
95+
receiver.endEntity();
96+
}
97+
}
98+
} else {
99+
for (String value : entry.getValue()) {
100+
receiver.literal(key, value);
101+
}
102+
}
103+
}
104+
receiver.endEntity();
105+
}
106+
}
107+
108+
@Override
109+
public void startEntity(final String name) {
110+
entityStack.push(name);
111+
getReceiver().startEntity(name);
112+
}
113+
114+
@Override
115+
public void endEntity() {
116+
entityStack.pop();
117+
getReceiver().endEntity();
118+
}
119+
120+
@Override
121+
public void literal(final String name, final String value) {
122+
if (entityStack.isEmpty()) {
123+
if (AGGREGATION_ID.equals(name)) {
124+
aggregationId = value;
125+
} else {
126+
aggregation.add(name, value);
127+
}
128+
return;
129+
}
130+
131+
if (entityStack.size()==1 && RDF_ABOUT.equals(name) && AGGREGATED_ENTITIES.containsKey(entityStack.peek())) {
132+
aggregation.add(entityStack.peek(), value);
133+
}
134+
getReceiver().literal(name, value);
135+
}
136+
}

0 commit comments

Comments
 (0)