Skip to content

Commit f3a6226

Browse files
committed
Added header, footer and separator settings to ObjectWriter.
These settings allow to prefix the output with a header string and to append a footer at the end of the stream. Additionally, the separator between objects can be now changed. Previously, this was always a newline character. The default settings of header, footer and separator are chosen so that the output of ObjectWriter is the same as in the old version of the module (i.e. a new line character is append to each object). In addition to these new options the encoding and compression options can now be set via Flux. This commit offers a solution for issue #152: The output of the JSON encoder can be turned into a JSON array by setting header to "[", separator to ", " and footer to "]".
1 parent e65c7a2 commit f3a6226

File tree

8 files changed

+589
-60
lines changed

8 files changed

+589
-60
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2013 Christoph Böhme
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.culturegraph.mf.stream.sink;
17+
18+
/**
19+
* Common functions for object writers.
20+
*
21+
* @author Christoph Böhme
22+
*
23+
* @param <T>
24+
* object type
25+
*/
26+
public abstract class AbstractObjectWriter<T> implements ConfigurableObjectWriter<T> {
27+
28+
private String header = DEFAULT_HEADER;
29+
private String footer = DEFAULT_FOOTER;
30+
private String separator = DEFAULT_SEPARATOR;
31+
32+
@Override
33+
public final String getHeader() {
34+
return header;
35+
}
36+
37+
@Override
38+
public final void setHeader(final String header) {
39+
this.header = header;
40+
}
41+
42+
@Override
43+
public final String getFooter() {
44+
return footer;
45+
}
46+
47+
@Override
48+
public final void setFooter(final String footer) {
49+
this.footer = footer;
50+
}
51+
52+
@Override
53+
public final String getSeparator() {
54+
return separator;
55+
}
56+
57+
@Override
58+
public final void setSeparator(final String separator) {
59+
this.separator = separator;
60+
}
61+
62+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright 2013 Christoph Böhme
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.culturegraph.mf.stream.sink;
17+
18+
import org.culturegraph.mf.framework.ObjectReceiver;
19+
import org.culturegraph.mf.util.FileCompression;
20+
21+
/**
22+
* Back end implementations for {@link ObjectWriter} should offer
23+
* a default set of configuration options. These are defined by
24+
* this interface.
25+
*
26+
* @author Christoph Böhme
27+
*
28+
* @param <T> object type
29+
*/
30+
public interface ConfigurableObjectWriter<T> extends ObjectReceiver<T> {
31+
32+
String DEFAULT_HEADER = "";
33+
String DEFAULT_FOOTER = "\n";
34+
String DEFAULT_SEPARATOR = "\n";
35+
36+
/**
37+
* Returns the encoding used by the underlying writer.
38+
*
39+
* @return current encoding
40+
*/
41+
String getEncoding();
42+
43+
/**
44+
* Sets the encoding used by the underlying writer.
45+
*
46+
* @param encoding
47+
* name of the encoding
48+
*/
49+
void setEncoding(String encoding);
50+
51+
/**
52+
* Returns the compression mode.
53+
*
54+
* @return current compression mode
55+
*/
56+
FileCompression getCompression();
57+
58+
/**
59+
* Sets the compression mode.
60+
*
61+
* @param compression
62+
*/
63+
void setCompression(final FileCompression compression);
64+
65+
/**
66+
* Sets the compression mode.
67+
*
68+
* @param compression
69+
*/
70+
void setCompression(final String compression);
71+
72+
/**
73+
* Returns the header which is output before the first object.
74+
*
75+
* @return header string
76+
*/
77+
String getHeader();
78+
79+
/**
80+
* Sets the header which is output before the first object.
81+
*
82+
* @param header new header string
83+
*/
84+
void setHeader(final String header);
85+
86+
/**
87+
* Returns the footer which is output after the last object.
88+
*
89+
* @return footer string
90+
*/
91+
String getFooter();
92+
93+
/**
94+
* Sets the footer which is output after the last object.
95+
*
96+
* @param footer new footer string
97+
*/
98+
void setFooter(final String footer);
99+
100+
/**
101+
* Returns the separator which is output between objects.
102+
*
103+
* @return separator string
104+
*/
105+
String getSeparator();
106+
107+
/**
108+
* Sets the separator which is output between objects.
109+
*
110+
* @param separator new separator string
111+
*/
112+
void setSeparator(final String separator);
113+
114+
}

src/main/java/org/culturegraph/mf/stream/sink/ObjectFileWriter.java

Lines changed: 51 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -24,31 +24,33 @@
2424
import java.util.regex.Pattern;
2525

2626
import org.culturegraph.mf.exceptions.MetafactureException;
27-
import org.culturegraph.mf.framework.ObjectReceiver;
2827
import org.culturegraph.mf.util.FileCompression;
2928

3029
/**
3130
* @param <T>
3231
* object type
3332
*
3433
* @author Markus Geipel
34+
* @author Christoph Böhme
3535
*
3636
*/
37-
38-
public final class ObjectFileWriter<T> implements ObjectReceiver<T> {
37+
public final class ObjectFileWriter<T> extends AbstractObjectWriter<T> {
3938

4039
private static final String VAR = "${i}";
4140
private static final Pattern VAR_PATTERN = Pattern.compile(VAR, Pattern.LITERAL);
4241

4342
private String path;
4443
private int count;
4544
private Writer writer;
45+
private boolean firstObject;
4646
private boolean closed;
4747

4848
private String encoding = "UTF-8";
4949
private FileCompression compression = FileCompression.AUTO;
50-
50+
5151
public ObjectFileWriter(final String path) {
52+
super();
53+
5254
this.path = path;
5355
startNewFile();
5456

@@ -58,67 +60,43 @@ public ObjectFileWriter(final String path) {
5860
}
5961
}
6062

61-
/**
62-
* Returns the encoding used to open the resource.
63-
*
64-
* @return current default setting
65-
*/
63+
@Override
6664
public String getEncoding() {
6765
return encoding;
6866
}
6967

70-
/**
71-
* Sets the encoding used to open the resource.
72-
*
73-
* @param encoding
74-
* new encoding
75-
*/
68+
@Override
7669
public void setEncoding(final String encoding) {
7770
this.encoding = encoding;
7871
}
7972

73+
@Override
8074
public FileCompression getCompression() {
8175
return compression;
8276
}
8377

78+
@Override
8479
public void setCompression(final FileCompression compression) {
8580
this.compression = compression;
8681
}
8782

83+
@Override
8884
public void setCompression(final String compression) {
8985
setCompression(FileCompression.valueOf(compression.toUpperCase()));
9086
}
9187

92-
private void startNewFile() {
93-
final Matcher matcher = VAR_PATTERN.matcher(this.path);
94-
final String path = matcher.replaceAll(String.valueOf(count));
95-
try {
96-
final OutputStream file = new FileOutputStream(path);
97-
try {
98-
final OutputStream compressor = compression.createCompressor(file, path);
99-
try {
100-
writer = new OutputStreamWriter(compressor, encoding);
101-
closed = false;
102-
} catch (IOException e) {
103-
compressor.close();
104-
throw e;
105-
}
106-
} catch (IOException e) {
107-
file.close();
108-
throw e;
109-
}
110-
} catch (IOException e) {
111-
throw new MetafactureException("Error creating file '" + path + "'.", e);
112-
}
113-
}
114-
11588
@Override
11689
public void process(final T obj) {
11790
assert !closed;
11891
try {
92+
if (firstObject) {
93+
writer.write(getHeader());
94+
firstObject = false;
95+
} else {
96+
writer.write(getSeparator());
97+
}
11998
writer.write(obj.toString());
120-
writer.append('\n');
121-
} catch (IOException e) {
99+
} catch (final IOException e) {
122100
throw new MetafactureException(e);
123101
}
124102
}
@@ -127,8 +105,11 @@ public void process(final T obj) {
127105
public void resetStream() {
128106
if (!closed) {
129107
try {
108+
if (!firstObject) {
109+
writer.write(getFooter());
110+
}
130111
writer.close();
131-
} catch (IOException e) {
112+
} catch (final IOException e) {
132113
throw new MetafactureException(e);
133114
} finally {
134115
closed = true;
@@ -142,12 +123,40 @@ public void resetStream() {
142123
public void closeStream() {
143124
if (!closed) {
144125
try {
126+
if (!firstObject) {
127+
writer.write(getFooter());
128+
}
145129
writer.close();
146-
} catch (IOException e) {
130+
} catch (final IOException e) {
147131
throw new MetafactureException(e);
148132
} finally {
149133
closed = true;
150134
}
151135
}
152136
}
137+
138+
private void startNewFile() {
139+
final Matcher matcher = VAR_PATTERN.matcher(this.path);
140+
final String path = matcher.replaceAll(String.valueOf(count));
141+
try {
142+
final OutputStream file = new FileOutputStream(path);
143+
try {
144+
final OutputStream compressor = compression.createCompressor(file, path);
145+
try {
146+
writer = new OutputStreamWriter(compressor, encoding);
147+
firstObject = true;
148+
closed = false;
149+
} catch (final IOException e) {
150+
compressor.close();
151+
throw e;
152+
}
153+
} catch (final IOException e) {
154+
file.close();
155+
throw e;
156+
}
157+
} catch (final IOException e) {
158+
throw new MetafactureException("Error creating file '" + path + "'.", e);
159+
}
160+
}
161+
153162
}

0 commit comments

Comments
 (0)