Skip to content

Commit 7d140cd

Browse files
nanxiuzikanata163
authored andcommitted
add lzo conpress for hdfs wtire with text type
1 parent 81ae641 commit 7d140cd

File tree

3 files changed

+25
-0
lines changed

3 files changed

+25
-0
lines changed

flinkx-hdfs/flinkx-hdfs-core/src/main/java/com/dtstack/flinkx/hdfs/ECompressType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public enum ECompressType {
3232
*/
3333
TEXT_GZIP("GZIP", "text", ".gz", 0.331F),
3434
TEXT_BZIP2("BZIP2", "text", ".bz2", 0.259F),
35+
TEXT_LZO("LZO", "text", ".lzo", 1.0F),
3536
TEXT_NONE("NONE", "text", "", 0.637F),
3637

3738
/**

flinkx-hdfs/flinkx-hdfs-writer/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,24 @@ under the License.
3434
</properties>
3535

3636
<dependencies>
37+
<!--lzop-->
38+
<dependency>
39+
<groupId>org.anarres.lzo</groupId>
40+
<artifactId>lzo-core</artifactId>
41+
<version>1.0.2</version>
42+
</dependency>
43+
<dependency>
44+
<groupId>org.anarres.lzo</groupId>
45+
<artifactId>lzo-hadoop</artifactId>
46+
<version>1.0.5</version>
47+
<exclusions>
48+
<exclusion>
49+
<artifactId>hadoop-core</artifactId>
50+
<groupId>org.apache.hadoop</groupId>
51+
</exclusion>
52+
</exclusions>
53+
</dependency>
54+
<!--lzop-->
3755
<dependency>
3856
<groupId>com.dtstack.flinkx</groupId>
3957
<artifactId>flinkx-hdfs-core</artifactId>

flinkx-hdfs/flinkx-hdfs-writer/src/main/java/com/dtstack/flinkx/hdfs/writer/HdfsTextOutputFormat.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@
2626
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
2727
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
2828
import org.apache.flink.types.Row;
29+
import org.apache.hadoop.conf.Configuration;
2930
import org.apache.hadoop.fs.Path;
3031
import org.apache.hadoop.hive.common.type.HiveDecimal;
32+
import org.apache.hadoop.io.compress.CompressionCodecFactory;
33+
3134
import java.io.IOException;
3235
import java.io.OutputStream;
3336
import java.math.BigDecimal;
@@ -93,6 +96,9 @@ protected void nextBlock(){
9396
stream = new GzipCompressorOutputStream(fs.create(p));
9497
} else if(compressType == ECompressType.TEXT_BZIP2){
9598
stream = new BZip2CompressorOutputStream(fs.create(p));
99+
} else if (compressType == ECompressType.TEXT_LZO) {
100+
CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
101+
stream = factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(fs.create(p));
96102
}
97103
}
98104

0 commit comments

Comments
 (0)