Skip to content

Commit 1dca5a4

Browse files
committed
Added infrastructure for reading compressed files and updated
FileOpener.
1 parent 5564d89 commit 1dca5a4

File tree

14 files changed

+389
-35
lines changed

14 files changed

+389
-35
lines changed

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,11 @@
219219
<version>0.11.4.1</version>
220220
<scope>test</scope>
221221
</dependency>
222+
<dependency>
223+
<groupId>commons-io</groupId>
224+
<artifactId>commons-io</artifactId>
225+
<version>2.1</version>
226+
</dependency>
222227
</dependencies>
223228

224229

src/main/java/org/culturegraph/mf/stream/source/Bzip2Opener.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,16 @@
3333

3434
/**
3535
* Opens a bzip2 file and passes a reader for it to the receiver.
36+
*
37+
* @deprecated Use FileOpener instead and set compression to AUTO or bzip2
3638
*
37-
* @author Markus Geipel
39+
* @author Christoph Böhme
3840
*
3941
*/
4042
@Description("Opens a bzip2 file.")
4143
@In(String.class)
42-
@Out(java.io.Reader.class)
44+
@Out(java.io.Reader.class)
45+
@Deprecated
4346
public final class Bzip2Opener extends DefaultObjectPipe<String, ObjectReceiver<Reader>> implements Opener {
4447

4548
private static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024;

src/main/java/org/culturegraph/mf/stream/source/FileOpener.java

Lines changed: 49 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,38 +15,34 @@
1515
*/
1616
package org.culturegraph.mf.stream.source;
1717

18-
import java.io.BufferedReader;
19-
import java.io.FileInputStream;
20-
import java.io.IOException;
21-
import java.io.InputStreamReader;
22-
import java.io.Reader;
23-
24-
import org.culturegraph.mf.exceptions.MetafactureException;
25-
import org.culturegraph.mf.framework.DefaultObjectPipe;
26-
import org.culturegraph.mf.framework.ObjectReceiver;
27-
import org.culturegraph.mf.framework.annotations.Description;
28-
import org.culturegraph.mf.framework.annotations.In;
29-
import org.culturegraph.mf.framework.annotations.Out;
18+
import java.io.FileInputStream;
19+
import java.io.IOException;
20+
import java.io.InputStream;
21+
import java.io.InputStreamReader;
22+
import java.io.Reader;
23+
24+
import org.culturegraph.mf.exceptions.MetafactureException;
25+
import org.culturegraph.mf.framework.DefaultObjectPipe;
26+
import org.culturegraph.mf.framework.ObjectReceiver;
27+
import org.culturegraph.mf.framework.annotations.Description;
28+
import org.culturegraph.mf.framework.annotations.In;
29+
import org.culturegraph.mf.framework.annotations.Out;
30+
import org.culturegraph.mf.util.FileCompression;
3031

3132

3233
/**
3334
* Opens a file and passes a reader for it to the receiver.
3435
*
35-
* @author Markus Geipel
36-
*
37-
*/
38-
/**
39-
* @author geipel
36+
* @author Christoph Böhme
4037
*
4138
*/
4239
@Description("Opens a file.")
4340
@In(String.class)
4441
@Out(java.io.Reader.class)
4542
public final class FileOpener extends DefaultObjectPipe<String, ObjectReceiver<Reader>> implements Opener {
4643

47-
private static final int DEFAULT_BUFFER_SIZE = 8 * 1024 * 1024;
48-
private int bufferSize = DEFAULT_BUFFER_SIZE;
49-
private String encoding = "UTF-8";
44+
private String encoding = "UTF-8";
45+
private FileCompression compression = FileCompression.AUTO;
5046

5147
/**
5248
* Returns the encoding used to open the resource.
@@ -65,21 +61,43 @@ public String getEncoding() {
6561
*/
6662
public void setEncoding(final String encoding) {
6763
this.encoding = encoding;
64+
}
65+
66+
public FileCompression getCompression() {
67+
return compression;
68+
}
69+
70+
public void setCompression(final FileCompression compression) {
71+
this.compression = compression;
6872
}
69-
70-
/**
71-
* @param bufferSize
72-
* in MB
73-
*/
74-
public void setBufferSize(final int bufferSize) {
75-
this.bufferSize = bufferSize * 1024 * 1024;
76-
}
73+
74+
public void setCompression(final String compression) {
75+
setCompression(FileCompression.valueOf(compression.toUpperCase()));
76+
}
7777

7878
@Override
7979
public void process(final String file) {
80-
try {
81-
getReceiver().process(
82-
new BufferedReader(new InputStreamReader(new FileInputStream(file), encoding), bufferSize));
80+
try {
81+
final InputStream fileStream = new FileInputStream(file);
82+
try {
83+
final InputStream decompressor = compression.createDecompressor(fileStream);
84+
try {
85+
final Reader reader = new InputStreamReader(decompressor, encoding);
86+
getReceiver().process(reader);
87+
} catch (IOException e) {
88+
decompressor.close();
89+
throw e;
90+
} catch (MetafactureException e) {
91+
decompressor.close();
92+
throw e;
93+
}
94+
} catch (IOException e) {
95+
fileStream.close();
96+
throw e;
97+
} catch (MetafactureException e) {
98+
fileStream.close();
99+
throw e;
100+
}
83101
} catch (IOException e) {
84102
throw new MetafactureException(e);
85103
}

src/main/java/org/culturegraph/mf/stream/source/GzipOpener.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,15 @@
3434
/**
3535
* Opens a gz file and passes a reader for it to the receiver.
3636
*
37+
* @deprecated Use FileOpener instead and set compression to AUTO or bzip2
38+
3739
* @author Markus Geipel
3840
*
3941
*/
4042
@Description("Opens a gz file.")
4143
@In(String.class)
42-
@Out(java.io.Reader.class)
44+
@Out(java.io.Reader.class)
45+
@Deprecated
4346
public final class GzipOpener extends DefaultObjectPipe<String, ObjectReceiver<Reader>> implements Opener {
4447

4548
private static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024;
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
* Copyright 2013 Deutsche Nationalbibliothek
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.culturegraph.mf.util;
17+
18+
import java.io.BufferedInputStream;
19+
import java.io.BufferedOutputStream;
20+
import java.io.InputStream;
21+
import java.io.OutputStream;
22+
23+
import org.apache.commons.compress.compressors.CompressorException;
24+
import org.apache.commons.compress.compressors.CompressorStreamFactory;
25+
import org.apache.commons.io.FilenameUtils;
26+
import org.apache.commons.io.input.ProxyInputStream;
27+
import org.apache.commons.io.output.ProxyOutputStream;
28+
import org.culturegraph.mf.exceptions.MetafactureException;
29+
30+
/**
31+
* Provides a convenient interface for using stream compressors
32+
* and decompressors.
33+
*
34+
* @author Christoph Böhme
35+
*
36+
*/
37+
public enum FileCompression {
38+
39+
NONE {
40+
@Override
41+
public OutputStream createCompressor(final OutputStream writeTo, final String fileName) {
42+
return new ProxyOutputStream(writeTo);
43+
}
44+
45+
@Override
46+
public InputStream createDecompressor(final InputStream readFrom) {
47+
return new ProxyInputStream(readFrom) {
48+
};
49+
}
50+
},
51+
52+
AUTO {
53+
@Override
54+
public OutputStream createCompressor(final OutputStream writeTo, final String fileName) {
55+
if (fileName == null) {
56+
throw new IllegalArgumentException("fileName is required for auto-selecting compressor");
57+
}
58+
59+
final String extension = FilenameUtils.getExtension(fileName);
60+
final FileCompression compressor;
61+
if ("gz".equalsIgnoreCase(extension)) {
62+
compressor = GZIP;
63+
} else if ("gzip".equalsIgnoreCase(extension)) {
64+
compressor = GZIP;
65+
} else if ("bz2".equalsIgnoreCase(extension)) {
66+
compressor = BZIP2;
67+
} else if ("bzip2".equalsIgnoreCase(extension)) {
68+
compressor = BZIP2;
69+
} else if ("xz".equalsIgnoreCase(extension)) {
70+
compressor = XZ;
71+
} else {
72+
compressor = NONE;
73+
}
74+
75+
return compressor.createCompressor(writeTo, fileName);
76+
}
77+
78+
@Override
79+
public InputStream createDecompressor(final InputStream readFrom) {
80+
final InputStream bufferedStream = bufferStream(readFrom);
81+
try {
82+
return APACHE_COMPRESSOR_FACTORY.createCompressorInputStream(bufferedStream);
83+
} catch (CompressorException e) {
84+
return NONE.createDecompressor(bufferedStream);
85+
}
86+
}
87+
},
88+
89+
BZIP2 {
90+
@Override
91+
public OutputStream createCompressor(final OutputStream writeTo, final String fileName) {
92+
try {
93+
return APACHE_COMPRESSOR_FACTORY.createCompressorOutputStream(
94+
CompressorStreamFactory.BZIP2, bufferStream(writeTo));
95+
} catch (CompressorException e) {
96+
throw new MetafactureException(e);
97+
}
98+
}
99+
100+
@Override
101+
public InputStream createDecompressor(final InputStream readFrom) {
102+
try {
103+
return APACHE_COMPRESSOR_FACTORY.createCompressorInputStream(
104+
CompressorStreamFactory.BZIP2, bufferStream(readFrom));
105+
} catch (CompressorException e) {
106+
throw new MetafactureException(e);
107+
}
108+
}
109+
},
110+
111+
GZIP {
112+
@Override
113+
public OutputStream createCompressor(final OutputStream writeTo, final String fileName) {
114+
try {
115+
return APACHE_COMPRESSOR_FACTORY.createCompressorOutputStream(
116+
CompressorStreamFactory.GZIP, bufferStream(writeTo));
117+
} catch (CompressorException e) {
118+
throw new MetafactureException(e);
119+
}
120+
}
121+
122+
@Override
123+
public InputStream createDecompressor(final InputStream readFrom) {
124+
try {
125+
return APACHE_COMPRESSOR_FACTORY.createCompressorInputStream(
126+
CompressorStreamFactory.GZIP, bufferStream(readFrom));
127+
} catch (CompressorException e) {
128+
throw new MetafactureException(e);
129+
}
130+
}
131+
},
132+
133+
PACK200 {
134+
@Override
135+
public OutputStream createCompressor(final OutputStream writeTo, final String fileName) {
136+
try {
137+
return APACHE_COMPRESSOR_FACTORY.createCompressorOutputStream(
138+
CompressorStreamFactory.PACK200, bufferStream(writeTo));
139+
} catch (CompressorException e) {
140+
throw new MetafactureException(e);
141+
}
142+
}
143+
144+
@Override
145+
public InputStream createDecompressor(final InputStream readFrom) {
146+
try {
147+
return APACHE_COMPRESSOR_FACTORY.createCompressorInputStream(
148+
CompressorStreamFactory.PACK200, bufferStream(readFrom));
149+
} catch (CompressorException e) {
150+
throw new MetafactureException(e);
151+
}
152+
}
153+
},
154+
155+
XZ {
156+
@Override
157+
public OutputStream createCompressor(final OutputStream writeTo, final String fileName) {
158+
try {
159+
return APACHE_COMPRESSOR_FACTORY.createCompressorOutputStream(
160+
CompressorStreamFactory.XZ, bufferStream(writeTo));
161+
} catch (CompressorException e) {
162+
throw new MetafactureException(e);
163+
}
164+
}
165+
166+
@Override
167+
public InputStream createDecompressor(final InputStream readFrom) {
168+
try {
169+
return APACHE_COMPRESSOR_FACTORY.createCompressorInputStream(
170+
CompressorStreamFactory.XZ, bufferStream(readFrom));
171+
} catch (CompressorException e) {
172+
throw new MetafactureException(e);
173+
}
174+
}
175+
};
176+
177+
private static final CompressorStreamFactory APACHE_COMPRESSOR_FACTORY = new CompressorStreamFactory();
178+
private static final int BUFFER_SIZE = 8 * 1024 * 1024;
179+
180+
public abstract OutputStream createCompressor(final OutputStream writeTo, final String fileName);
181+
182+
public abstract InputStream createDecompressor(final InputStream readFrom);
183+
184+
private static OutputStream bufferStream(final OutputStream stream) {
185+
if (stream instanceof BufferedOutputStream) {
186+
return stream;
187+
}
188+
return new BufferedOutputStream(stream, BUFFER_SIZE);
189+
}
190+
191+
private static InputStream bufferStream(final InputStream stream) {
192+
if (stream instanceof BufferedInputStream) {
193+
return stream;
194+
}
195+
return new BufferedInputStream(stream, BUFFER_SIZE);
196+
}
197+
198+
}

src/test/java/org/culturegraph/mf/stream/DataFilePath.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ public final class DataFilePath {
3434
public static final String CG_XML = DATA_PREFIX + "cgxml_test.xml";
3535

3636
public static final String GENERIC_XML = DATA_PREFIX + "generic_xml_test.xml";
37+
38+
public static final String COMPRESSED_NONE = DATA_PREFIX + "compressed.txt";
39+
public static final String COMPRESSED_BZ2 = DATA_PREFIX + "compressed.txt.bz2";
40+
public static final String COMPRESSED_BZIP2 = DATA_PREFIX + "compressed.txt.bzip2";
41+
public static final String COMPRESSED_GZ = DATA_PREFIX + "compressed.txt.gz";
42+
public static final String COMPRESSED_GZIP = DATA_PREFIX + "compressed.txt.gzip";
43+
public static final String COMPRESSED_XZ = DATA_PREFIX + "compressed.txt.xz";
3744

3845
private DataFilePath() {/*no instances exist*/}
3946

0 commit comments

Comments
 (0)