Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions src/main/java/blue/strategic/parquet/ParquetWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@
import java.io.IOException;
import java.util.Collections;

import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY;

public final class ParquetWriter<T> implements Closeable {

private final org.apache.parquet.hadoop.ParquetWriter<T> writer;

public static <T> ParquetWriter<T> writeFile(MessageType schema, File out, Dehydrator<T> dehydrator) throws IOException {
return writeFile(schema, out, dehydrator, SNAPPY);
}

public static <T> ParquetWriter<T> writeFile(MessageType schema, File out, Dehydrator<T> dehydrator, CompressionCodecName codecName) throws IOException {
OutputFile f = new OutputFile() {
@Override
public PositionOutputStream create(long blockSizeHint) throws IOException {
Expand Down Expand Up @@ -51,18 +57,18 @@ public long defaultBlockSize() {
return 1024L;
}
};
return writeOutputFile(schema, f, dehydrator);
return writeOutputFile(schema, f, dehydrator, codecName);
}

private static <T> ParquetWriter<T> writeOutputFile(MessageType schema, OutputFile file, Dehydrator<T> dehydrator) throws IOException {
return new ParquetWriter<>(file, schema, dehydrator);
private static <T> ParquetWriter<T> writeOutputFile(MessageType schema, OutputFile file, Dehydrator<T> dehydrator, CompressionCodecName codecName) throws IOException {
return new ParquetWriter<>(file, schema, dehydrator, codecName);
}

private ParquetWriter(OutputFile outputFile, MessageType schema, Dehydrator<T> dehydrator) throws IOException {
private ParquetWriter(OutputFile outputFile, MessageType schema, Dehydrator<T> dehydrator, CompressionCodecName codecName) throws IOException {
this.writer = new Builder<T>(outputFile)
.withType(schema)
.withDehydrator(dehydrator)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withCompressionCodec(codecName)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.build();
}
Expand All @@ -76,6 +82,10 @@ public void close() throws IOException {
this.writer.close();
}

public long getDataSize() {
return writer.getDataSize();
}

private static final class Builder<T> extends org.apache.parquet.hadoop.ParquetWriter.Builder<T, ParquetWriter.Builder<T>> {
private MessageType schema;
private Dehydrator<T> dehydrator;
Expand Down
53 changes: 36 additions & 17 deletions src/test/java/blue/strategic/parquet/ParquetReadWriteTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package blue.strategic.parquet;

import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
Expand All @@ -16,9 +18,13 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.ZSTD;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;

public class ParquetReadWriteTest {
Expand Down Expand Up @@ -58,27 +64,40 @@ public Map<String, Object> finish(Map<String, Object> target) {
}
};

try(ParquetWriter<Object[]> writer = ParquetWriter.writeFile(schema, parquet, dehydrator)) {
writer.write(new Object[]{1L, "hello1"});
writer.write(new Object[]{2L, "hello2"});
}
for (CompressionCodecName codecName : List.of(SNAPPY, ZSTD)) {

try (Stream<Map<String, Object>> s = ParquetReader.streamContent(parquet, HydratorSupplier.constantly(hydrator))) {
List<Map<String, Object>> result = s.collect(Collectors.toList());
try(ParquetWriter<Object[]> writer = ParquetWriter.writeFile(schema, parquet, dehydrator, codecName)) {
writer.write(new Object[]{1L, "hello1"});
writer.write(new Object[]{2L, "hello2"});

//noinspection unchecked
assertThat(result, hasItems(
Map.of("id", 1L, "email", "hello1"),
Map.of("id", 2L, "email", "hello2")));
}
// TODO add better matchers to the classpath
assertThat("data size should be gt 10", writer.getDataSize() > 10, is(true));
}

try (Stream<Map<String, Object>> s = ParquetReader.streamContent(parquet, HydratorSupplier.constantly(hydrator), Collections.singleton("id"))) {
List<Map<String, Object>> result = s.collect(Collectors.toList());
ParquetMetadata metadata = ParquetReader.readMetadata(parquet);

//noinspection unchecked
assertThat(result, hasItems(
Map.of("id", 1L),
Map.of("id", 2L)));
assertThat(metadata.getBlocks().stream().findFirst().orElseThrow().toString(),
containsString(codecName.name()));

try (Stream<Map<String, Object>> s = ParquetReader.streamContent(parquet, HydratorSupplier.constantly(hydrator))) {
List<Map<String, Object>> result = s.collect(Collectors.toList());

//noinspection unchecked
assertThat(result, hasItems(
Map.of("id", 1L, "email", "hello1"),
Map.of("id", 2L, "email", "hello2")));
}

try (Stream<Map<String, Object>> s = ParquetReader.streamContent(parquet, HydratorSupplier.constantly(hydrator), Collections.singleton("id"))) {
List<Map<String, Object>> result = s.collect(Collectors.toList());

//noinspection unchecked
assertThat(result, hasItems(
Map.of("id", 1L),
Map.of("id", 2L)));
}
}


}
}