Skip to content

Commit ccec54b

Browse files
author
mgeipel
committed
added new stream modules
plus minor refactoring
1 parent 5abd10a commit ccec54b

File tree

4 files changed

+138
-66
lines changed

4 files changed

+138
-66
lines changed

src/main/java/org/culturegraph/mf/stream/converter/ObjectTemplate.java

Lines changed: 56 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -13,59 +13,59 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package org.culturegraph.mf.stream.converter;
17-
18-
import java.util.HashMap;
19-
import java.util.Map;
20-
import java.util.regex.Matcher;
21-
import java.util.regex.Pattern;
22-
23-
import org.culturegraph.mf.framework.DefaultObjectPipe;
24-
import org.culturegraph.mf.framework.ObjectReceiver;
25-
import org.culturegraph.mf.framework.annotations.Description;
26-
import org.culturegraph.mf.framework.annotations.In;
27-
import org.culturegraph.mf.framework.annotations.Out;
28-
import org.culturegraph.mf.types.Triple;
29-
import org.culturegraph.mf.util.StringUtil;
30-
31-
32-
/**
33-
* Builds a {@link String} from a template and an {@link Object}. ${obj} marks
34-
* the place where the object is to be inserted.
35-
*
36-
* @param <T>
37-
* object type
38-
*
39-
* @author Markus Geipel
40-
*
41-
*/
42-
@Description("Builds a String from a template and an Object. Provide template in brackets. ${o} marks the place where the object is to be inserted. " +
43-
"If the object in an instance of Triple ${s}, ${p} and ${o} are used instead")
44-
@In(Object.class)
45-
@Out(String.class)
46-
public final class ObjectTemplate<T> extends DefaultObjectPipe<T, ObjectReceiver<String>> {
47-
48-
//TODO: make replace more efficient
49-
private static final Pattern OBJ_PATTERN = Pattern.compile("${o}", Pattern.LITERAL);
50-
private final Map<String, String> vars = new HashMap<String, String>();
51-
private final String template;
52-
53-
public ObjectTemplate(final String template) {
54-
super();
55-
this.template = template;
56-
}
57-
58-
@Override
59-
public void process(final T obj) {
60-
if(obj instanceof Triple){
61-
final Triple triple = (Triple)obj;
62-
vars.put("s", triple.getSubject());
63-
vars.put("p", triple.getPredicate());
64-
vars.put("o", triple.getObject());
65-
getReceiver().process(StringUtil.format(template, vars));
66-
}else{
67-
final Matcher matcher = OBJ_PATTERN.matcher(template);
68-
getReceiver().process(matcher.replaceAll(obj.toString()));
69-
}
70-
}
71-
}
16+
package org.culturegraph.mf.stream.converter;
17+
18+
import java.util.HashMap;
19+
import java.util.Map;
20+
import java.util.regex.Matcher;
21+
import java.util.regex.Pattern;
22+
23+
import org.culturegraph.mf.framework.DefaultObjectPipe;
24+
import org.culturegraph.mf.framework.ObjectReceiver;
25+
import org.culturegraph.mf.framework.annotations.Description;
26+
import org.culturegraph.mf.framework.annotations.In;
27+
import org.culturegraph.mf.framework.annotations.Out;
28+
import org.culturegraph.mf.types.Triple;
29+
import org.culturegraph.mf.util.StringUtil;
30+
31+
32+
/**
33+
* Builds a {@link String} from a template and an {@link Object}. ${o} marks
34+
* the place where the object is to be inserted.
35+
*
36+
* @param <T>
37+
* object type
38+
*
39+
* @author Markus Geipel
40+
*
41+
*/
42+
@Description("Builds a String from a template and an Object. Provide template in brackets. ${o} marks the place where the object is to be inserted. " +
43+
"If the object in an instance of Triple ${s}, ${p} and ${o} are used instead")
44+
@In(Object.class)
45+
@Out(String.class)
46+
public final class ObjectTemplate<T> extends DefaultObjectPipe<T, ObjectReceiver<String>> {
47+
48+
//TODO: make replace more efficient
49+
private static final Pattern OBJ_PATTERN = Pattern.compile("${o}", Pattern.LITERAL);
50+
private final Map<String, String> vars = new HashMap<String, String>();
51+
private final String template;
52+
53+
public ObjectTemplate(final String template) {
54+
super();
55+
this.template = template;
56+
}
57+
58+
@Override
59+
public void process(final T obj) {
60+
if(obj instanceof Triple){
61+
final Triple triple = (Triple)obj;
62+
vars.put("s", triple.getSubject());
63+
vars.put("p", triple.getPredicate());
64+
vars.put("o", triple.getObject());
65+
getReceiver().process(StringUtil.format(template, vars));
66+
}else{
67+
final Matcher matcher = OBJ_PATTERN.matcher(template);
68+
getReceiver().process(matcher.replaceAll(obj.toString()));
69+
}
70+
}
71+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2013 Deutsche Nationalbibliothek
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.culturegraph.mf.stream.converter;
17+
18+
import org.culturegraph.mf.framework.DefaultObjectPipe;
19+
import org.culturegraph.mf.framework.ObjectReceiver;
20+
import org.culturegraph.mf.framework.annotations.Description;
21+
import org.culturegraph.mf.framework.annotations.In;
22+
import org.culturegraph.mf.framework.annotations.Out;
23+
24+
25+
/**
26+
* Adds a String preamle and/or epilogue to the stream
27+
*
28+
* @author Markus Geipel
29+
*
30+
*/
31+
@Description("Adds a String preamle and/or epilogue to the stream")
32+
@In(String.class)
33+
@Out(String.class)
34+
public final class PreambleEpilogueAdder extends DefaultObjectPipe<String, ObjectReceiver<String>> {
35+
36+
private String preamble = "";
37+
private String epilogue = "";
38+
private boolean init = true;
39+
40+
public void setEpilogue(final String epilogue) {
41+
this.epilogue = epilogue;
42+
}
43+
44+
public void setPreamble(final String preamble) {
45+
this.preamble = preamble;
46+
}
47+
48+
@Override
49+
public void process(final String obj) {
50+
if(init){
51+
getReceiver().process(preamble);
52+
init = false;
53+
}
54+
getReceiver().process(obj);
55+
}
56+
57+
@Override
58+
protected void onCloseStream() {
59+
if(!epilogue.isEmpty()){
60+
getReceiver().process(epilogue);
61+
}
62+
}
63+
}

src/main/java/org/culturegraph/mf/stream/sink/ObjectJavaIoWriter.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,25 @@
2222
import org.culturegraph.mf.framework.ObjectReceiver;
2323

2424
/**
25-
* @param <T> object type
25+
* @param <T>
26+
* object type
2627
*
2728
* @author Christoph Böhme, Markus Geipel
28-
*
29+
*
2930
*/
3031

31-
3232
public final class ObjectJavaIoWriter<T> implements ObjectReceiver<T> {
3333

3434
private final Writer writer;
35-
35+
private boolean closed;
36+
3637
public ObjectJavaIoWriter(final Writer writer) {
3738
this.writer = writer;
3839
}
39-
40+
4041
@Override
4142
public void process(final T obj) {
43+
assert !closed;
4244
try {
4345
writer.write(obj.toString());
4446
writer.append('\n');
@@ -51,13 +53,18 @@ public void process(final T obj) {
5153
public void resetStream() {
5254
throw new UnsupportedOperationException("Cannot reset ObjectJavaIoWriter");
5355
}
54-
56+
5557
@Override
5658
public void closeStream() {
57-
try {
58-
writer.close();
59-
} catch (IOException e) {
60-
throw new MetafactureException(e);
59+
if (!closed) {
60+
try {
61+
writer.close();
62+
} catch (IOException e) {
63+
throw new MetafactureException(e);
64+
}finally{
65+
closed=true;
66+
}
67+
6168
}
6269
}
6370
}

src/main/resources/flux-commands.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ open-bzip2 org.culturegraph.mf.stream.source.Bzip2Opener
55
open-http org.culturegraph.mf.stream.source.HttpOpener
66
open-resource org.culturegraph.mf.stream.source.ResourceOpener
77
read-string org.culturegraph.mf.stream.source.StringReader
8+
#ls org.culturegraph.mf.stream.source.DirReader
89

910
sort-triples org.culturegraph.mf.stream.pipe.sort.TripleSort
1011
count-triples org.culturegraph.mf.stream.pipe.sort.TripleCount
@@ -51,6 +52,7 @@ change-id org.culturegraph.mf.stream.pipe.IdChangePipe
5152
flatten org.culturegraph.mf.stream.pipe.StreamFlattener
5253

5354
template org.culturegraph.mf.stream.converter.ObjectTemplate
55+
add-preamble-epilogue org.culturegraph.mf.stream.converter.PreambleEpilogueAdder
5456

5557
object-tee org.culturegraph.mf.stream.pipe.ObjectTee
5658
stream-tee org.culturegraph.mf.stream.pipe.StreamTee

0 commit comments

Comments
 (0)