diff --git a/src/main/java/blue/strategic/parquet/ParquetWriter.java b/src/main/java/blue/strategic/parquet/ParquetWriter.java index 7d75b05..eeb07d9 100644 --- a/src/main/java/blue/strategic/parquet/ParquetWriter.java +++ b/src/main/java/blue/strategic/parquet/ParquetWriter.java @@ -19,11 +19,17 @@ import java.io.IOException; import java.util.Collections; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; + public final class ParquetWriter implements Closeable { private final org.apache.parquet.hadoop.ParquetWriter writer; public static ParquetWriter writeFile(MessageType schema, File out, Dehydrator dehydrator) throws IOException { + return writeFile(schema, out, dehydrator, SNAPPY); + } + + public static ParquetWriter writeFile(MessageType schema, File out, Dehydrator dehydrator, CompressionCodecName codecName) throws IOException { OutputFile f = new OutputFile() { @Override public PositionOutputStream create(long blockSizeHint) throws IOException { @@ -51,18 +57,18 @@ public long defaultBlockSize() { return 1024L; } }; - return writeOutputFile(schema, f, dehydrator); + return writeOutputFile(schema, f, dehydrator, codecName); } - private static ParquetWriter writeOutputFile(MessageType schema, OutputFile file, Dehydrator dehydrator) throws IOException { - return new ParquetWriter<>(file, schema, dehydrator); + private static ParquetWriter writeOutputFile(MessageType schema, OutputFile file, Dehydrator dehydrator, CompressionCodecName codecName) throws IOException { + return new ParquetWriter<>(file, schema, dehydrator, codecName); } - private ParquetWriter(OutputFile outputFile, MessageType schema, Dehydrator dehydrator) throws IOException { + private ParquetWriter(OutputFile outputFile, MessageType schema, Dehydrator dehydrator, CompressionCodecName codecName) throws IOException { this.writer = new Builder(outputFile) .withType(schema) .withDehydrator(dehydrator) - .withCompressionCodec(CompressionCodecName.SNAPPY) + .withCompressionCodec(codecName) .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) .build(); } @@ -76,6 +82,10 @@ public void close() throws IOException { this.writer.close(); } + public long getDataSize() { + return writer.getDataSize(); + } + private static final class Builder extends org.apache.parquet.hadoop.ParquetWriter.Builder> { private MessageType schema; private Dehydrator dehydrator; diff --git a/src/test/java/blue/strategic/parquet/ParquetReadWriteTest.java b/src/test/java/blue/strategic/parquet/ParquetReadWriteTest.java index 26db983..58c0b23 100644 --- a/src/test/java/blue/strategic/parquet/ParquetReadWriteTest.java +++ b/src/test/java/blue/strategic/parquet/ParquetReadWriteTest.java @@ -1,5 +1,7 @@ package blue.strategic.parquet; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Types; @@ -16,9 +18,13 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.ZSTD; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.MatcherAssert.assertThat; public class ParquetReadWriteTest { @@ -58,27 +64,40 @@ public Map finish(Map target) { } }; - try(ParquetWriter writer = ParquetWriter.writeFile(schema, parquet, dehydrator)) { - writer.write(new Object[]{1L, "hello1"}); - writer.write(new Object[]{2L, "hello2"}); - } + for (CompressionCodecName codecName : List.of(SNAPPY, ZSTD)) { - try (Stream> s = ParquetReader.streamContent(parquet, HydratorSupplier.constantly(hydrator))) { - List> result = s.collect(Collectors.toList()); + try(ParquetWriter writer = ParquetWriter.writeFile(schema, parquet, dehydrator, codecName)) { + writer.write(new Object[]{1L, "hello1"}); + writer.write(new Object[]{2L, "hello2"}); - //noinspection unchecked - assertThat(result, hasItems( - Map.of("id", 1L, "email", "hello1"), - Map.of("id", 2L, "email", "hello2"))); - } + // TODO add better matchers to the classpath + assertThat("data size should be gt 10", writer.getDataSize() > 10, is(true)); + } - try (Stream> s = ParquetReader.streamContent(parquet, HydratorSupplier.constantly(hydrator), Collections.singleton("id"))) { - List> result = s.collect(Collectors.toList()); + ParquetMetadata metadata = ParquetReader.readMetadata(parquet); - //noinspection unchecked - assertThat(result, hasItems( - Map.of("id", 1L), - Map.of("id", 2L))); + assertThat(metadata.getBlocks().stream().findFirst().orElseThrow().toString(), + containsString(codecName.name())); + + try (Stream> s = ParquetReader.streamContent(parquet, HydratorSupplier.constantly(hydrator))) { + List> result = s.collect(Collectors.toList()); + + //noinspection unchecked + assertThat(result, hasItems( + Map.of("id", 1L, "email", "hello1"), + Map.of("id", 2L, "email", "hello2"))); + } + + try (Stream> s = ParquetReader.streamContent(parquet, HydratorSupplier.constantly(hydrator), Collections.singleton("id"))) { + List> result = s.collect(Collectors.toList()); + + //noinspection unchecked + assertThat(result, hasItems( + Map.of("id", 1L), + Map.of("id", 2L))); + } } + + } } \ No newline at end of file