Skip to content

Commit 4433acd

Browse files
committed
Make ParquetSchemaUtil an internal library
Also update relevant tests to check against full MessageType schema instead of pulling out the ColumnDescriptors
1 parent e1f6cb0 commit 4433acd

File tree

15 files changed

+215
-135
lines changed

15 files changed

+215
-135
lines changed

Util/src/main/java/io/deephaven/util/annotations/InternalUseOnly.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@
99
import java.lang.annotation.Target;
1010

1111
/**
12-
* Indicates that a particular method is for internal use only and should not be used by client code. It is subject to
13-
* change/removal at any time.
12+
* Indicates that a particular {@link ElementType#METHOD method}, {@link ElementType#CONSTRUCTOR constructor},
13+
* {@link ElementType#TYPE type}, or {@link ElementType#PACKAGE package} is for internal use only and should not be used
14+
* by client code. It is subject to change/removal at any time.
1415
*/
15-
@Target({ElementType.METHOD, ElementType.CONSTRUCTOR, ElementType.TYPE})
16+
@Target({ElementType.METHOD, ElementType.CONSTRUCTOR, ElementType.TYPE, ElementType.PACKAGE})
1617
@Inherited
1718
@Documented
1819
public @interface InternalUseOnly {

extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@
3434
import org.apache.iceberg.catalog.Namespace;
3535
import org.apache.iceberg.catalog.TableIdentifier;
3636
import org.apache.iceberg.types.Types;
37-
import org.apache.parquet.column.ColumnDescriptor;
3837
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
38+
import org.apache.parquet.schema.LogicalTypeAnnotation;
39+
import org.apache.parquet.schema.MessageType;
3940
import org.jetbrains.annotations.Nullable;
4041
import org.junit.jupiter.api.AfterEach;
4142
import org.junit.jupiter.api.BeforeEach;
@@ -53,6 +54,11 @@
5354
import java.util.List;
5455
import java.util.stream.Collectors;
5556
import static io.deephaven.engine.testutil.TstUtils.assertTableEquals;
57+
import static org.apache.parquet.schema.LogicalTypeAnnotation.intType;
58+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
59+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
60+
import static org.apache.parquet.schema.Types.buildMessage;
61+
import static org.apache.parquet.schema.Types.optional;
5662
import static org.assertj.core.api.Assertions.assertThat;
5763
import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
5864

@@ -416,8 +422,12 @@ void testColumnRenameWhileWriting() throws URISyntaxException {
416422
{
417423
final List<String> parquetFiles = getAllParquetFilesFromDataFiles(tableIdentifier);
418424
assertThat(parquetFiles).hasSize(1);
419-
verifyFieldIdsFromParquetFile(parquetFiles.get(0), originalDefinition.getColumnNames(),
420-
nameToFieldIdFromSchema);
425+
final MessageType expectedSchema = buildMessage()
426+
.addFields(
427+
optional(INT32).id(1).as(intType(32, true)).named("intCol"),
428+
optional(DOUBLE).id(2).named("doubleCol"))
429+
.named("root");
430+
verifySchema(parquetFiles.get(0), expectedSchema);
421431
}
422432

423433
final Table moreData = TableTools.emptyTable(5)
@@ -442,10 +452,18 @@ void testColumnRenameWhileWriting() throws URISyntaxException {
442452

443453
final List<String> parquetFiles = getAllParquetFilesFromDataFiles(tableIdentifier);
444454
assertThat(parquetFiles).hasSize(2);
445-
verifyFieldIdsFromParquetFile(parquetFiles.get(0), moreData.getDefinition().getColumnNames(),
446-
newNameToFieldId);
447-
verifyFieldIdsFromParquetFile(parquetFiles.get(1), originalDefinition.getColumnNames(),
448-
nameToFieldIdFromSchema);
455+
final MessageType expectedSchema0 = buildMessage()
456+
.addFields(
457+
optional(INT32).id(1).as(intType(32, true)).named("newIntCol"),
458+
optional(DOUBLE).id(2).named("newDoubleCol"))
459+
.named("root");
460+
final MessageType expectedSchema1 = buildMessage()
461+
.addFields(
462+
optional(INT32).id(1).as(intType(32, true)).named("intCol"),
463+
optional(DOUBLE).id(2).named("doubleCol"))
464+
.named("root");
465+
verifySchema(parquetFiles.get(0), expectedSchema0);
466+
verifySchema(parquetFiles.get(1), expectedSchema1);
449467
}
450468

451469
// TODO: This is failing because we don't map columns based on the column ID when reading. Uncomment this
@@ -455,31 +473,13 @@ void testColumnRenameWhileWriting() throws URISyntaxException {
455473
// moreData.renameColumns("intCol = newIntCol", "doubleCol = newDoubleCol")), fromIceberg);
456474
}
457475

458-
/**
459-
* Verify that the schema of the parquet file read from the provided path has the provided column and corresponding
460-
* field IDs.
461-
*/
462-
private void verifyFieldIdsFromParquetFile(
463-
final String path,
464-
final List<String> columnNames,
465-
final Map<String, Integer> nameToFieldId) throws URISyntaxException {
476+
private void verifySchema(String path, MessageType expectedSchema) throws URISyntaxException {
466477
final ParquetMetadata metadata =
467478
new ParquetTableLocationKey(new URI(path), 0, null, ParquetInstructions.builder()
468479
.setSpecialInstructions(dataInstructions())
469480
.build())
470481
.getMetadata();
471-
final List<ColumnDescriptor> columnsMetadata = metadata.getFileMetaData().getSchema().getColumns();
472-
473-
final int numColumns = columnNames.size();
474-
for (int colIdx = 0; colIdx < numColumns; colIdx++) {
475-
final String columnName = columnNames.get(colIdx);
476-
final String columnNameFromParquetFile = columnsMetadata.get(colIdx).getPath()[0];
477-
assertThat(columnName).isEqualTo(columnNameFromParquetFile);
478-
479-
final int expectedFieldId = nameToFieldId.get(columnName);
480-
final int fieldIdFromParquetFile = columnsMetadata.get(colIdx).getPrimitiveType().getId().intValue();
481-
assertThat(fieldIdFromParquetFile).isEqualTo(expectedFieldId);
482-
}
482+
assertThat(metadata.getFileMetaData().getSchema()).isEqualTo(expectedSchema);
483483
}
484484

485485
/**

extensions/parquet/base/build.gradle

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,16 @@ dependencies {
2222
implementation libs.guava
2323

2424
compileOnly libs.jetbrains.annotations
25-
testImplementation libs.junit4
25+
26+
testImplementation libs.assertj
27+
28+
testImplementation platform(libs.junit.bom)
29+
testImplementation libs.junit.jupiter
30+
testRuntimeOnly libs.junit.jupiter.engine
31+
testRuntimeOnly libs.junit.platform.launcher
32+
}
33+
34+
tasks.withType(Test).configureEach {
35+
useJUnitPlatform {
36+
}
2637
}

extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package io.deephaven.parquet.base;
55

66
import io.deephaven.UncheckedDeephavenException;
7+
import io.deephaven.parquet.impl.ParquetSchemaUtil;
78
import io.deephaven.util.channel.SeekableChannelContext;
89
import io.deephaven.util.channel.SeekableChannelsProvider;
910
import io.deephaven.parquet.compress.CompressorAdapter;
@@ -68,8 +69,7 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
6869
this.columnName = columnName;
6970
this.channelsProvider = channelsProvider;
7071
this.columnChunk = columnChunk;
71-
this.path = type
72-
.getColumnDescription(columnChunk.meta_data.getPath_in_schema().toArray(new String[0]));
72+
this.path = ParquetSchemaUtil.getColumnDescriptor(type, columnChunk.meta_data.getPath_in_schema());
7373
if (columnChunk.getMeta_data().isSetCodec()) {
7474
decompressor = DeephavenCompressorAdapterFactory.getInstance()
7575
.getByName(columnChunk.getMeta_data().getCodec().name());

extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.nio.channels.Channels;
3333
import java.nio.channels.WritableByteChannel;
3434
import java.util.EnumSet;
35+
import java.util.Objects;
3536
import java.util.Set;
3637

3738
import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
@@ -76,11 +77,11 @@ final class ColumnWriterImpl implements ColumnWriter {
7677
final CompressorAdapter compressorAdapter,
7778
final int targetPageSize,
7879
final ByteBufferAllocator allocator) {
79-
this.countingOutput = countingOutput;
80-
this.column = column;
81-
this.compressorAdapter = compressorAdapter;
80+
this.countingOutput = Objects.requireNonNull(countingOutput);
81+
this.column = Objects.requireNonNull(column);
82+
this.compressorAdapter = Objects.requireNonNull(compressorAdapter);
8283
this.targetPageSize = targetPageSize;
83-
this.allocator = allocator;
84+
this.allocator = Objects.requireNonNull(allocator);
8485
dlEncoder = column.getMaxDefinitionLevel() == 0 ? null
8586
: new RunLengthBitPackingHybridEncoder(
8687
getWidthFromMaxInt(column.getMaxDefinitionLevel()), MIN_SLAB_SIZE, targetPageSize, allocator);

extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupWriterImpl.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import com.google.common.io.CountingOutputStream;
77
import io.deephaven.parquet.compress.CompressorAdapter;
8+
import io.deephaven.parquet.impl.ParquetSchemaUtil;
89
import org.apache.parquet.bytes.ByteBufferAllocator;
910
import org.apache.parquet.hadoop.metadata.BlockMetaData;
1011
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -16,6 +17,7 @@
1617
import java.util.ArrayList;
1718
import java.util.Arrays;
1819
import java.util.List;
20+
import java.util.Objects;
1921

2022
final class RowGroupWriterImpl implements RowGroupWriter {
2123
private final CountingOutputStream countingOutput;
@@ -42,12 +44,12 @@ private RowGroupWriterImpl(CountingOutputStream countingOutput,
4244
ByteBufferAllocator allocator,
4345
BlockMetaData blockMetaData,
4446
CompressorAdapter compressorAdapter) {
45-
this.countingOutput = countingOutput;
46-
this.type = type;
47+
this.countingOutput = Objects.requireNonNull(countingOutput);
48+
this.type = Objects.requireNonNull(type);
4749
this.targetPageSize = targetPageSize;
48-
this.allocator = allocator;
49-
this.blockMetaData = blockMetaData;
50-
this.compressorAdapter = compressorAdapter;
50+
this.allocator = Objects.requireNonNull(allocator);
51+
this.blockMetaData = Objects.requireNonNull(blockMetaData);
52+
this.compressorAdapter = Objects.requireNonNull(compressorAdapter);
5153
}
5254

5355
String[] getPrimitivePath(String columnName) {
@@ -74,7 +76,7 @@ public ColumnWriter addColumn(String columnName) {
7476
}
7577
activeWriter = new ColumnWriterImpl(this,
7678
countingOutput,
77-
type.getColumnDescription(getPrimitivePath(columnName)),
79+
ParquetSchemaUtil.getColumnDescriptor(type, getPrimitivePath(columnName)),
7880
compressorAdapter,
7981
targetPageSize,
8082
allocator);

extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetSchemaUtil.java renamed to extensions/parquet/base/src/main/java/io/deephaven/parquet/impl/ParquetSchemaUtil.java

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//
22
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
33
//
4-
package io.deephaven.parquet.table.location;
4+
package io.deephaven.parquet.impl;
55

66
import io.deephaven.base.verify.Assert;
77
import org.apache.parquet.column.ColumnDescriptor;
@@ -20,9 +20,12 @@
2020
import java.util.function.Consumer;
2121
import java.util.function.Predicate;
2222

23-
final class ParquetSchemaUtil {
23+
/**
24+
* Various improved ways of traversing {@link MessageType}.
25+
*/
26+
public final class ParquetSchemaUtil {
2427

25-
interface Visitor {
28+
public interface Visitor {
2629

2730
/**
2831
* Accept a Parquet column.
@@ -43,27 +46,36 @@ interface Visitor {
4346
/**
4447
* A more efficient implementation of {@link MessageType#getColumns()}.
4548
*/
46-
static List<ColumnDescriptor> getColumns(MessageType schema) {
49+
public static List<ColumnDescriptor> getColumns(MessageType schema) {
4750
final List<ColumnDescriptor> out = new ArrayList<>();
4851
walkColumnDescriptors(schema, out::add);
4952
return out;
5053
}
5154

52-
static void walkColumnDescriptors(MessageType type, Consumer<ColumnDescriptor> consumer) {
55+
/**
56+
* A more efficient implementation of {@link MessageType#getPaths()}.
57+
*/
58+
public static List<String[]> getPaths(MessageType schema) {
59+
final List<String[]> out = new ArrayList<>();
60+
walk(schema, (path, primitiveType) -> out.add(makeNamePath(path)));
61+
return out;
62+
}
63+
64+
public static void walkColumnDescriptors(MessageType type, Consumer<ColumnDescriptor> consumer) {
5365
walk(type, new ColumnDescriptorVisitor(consumer));
5466
}
5567

5668
/**
5769
* An alternative interface for traversing the leaf fields of a Parquet schema.
5870
*/
59-
static void walk(MessageType type, Visitor visitor) {
71+
public static void walk(MessageType type, Visitor visitor) {
6072
walk(type, visitor, new ArrayDeque<>());
6173
}
6274

6375
/**
64-
* A more efficient implementation of {@link MessageType#getColumnDescription(String[])}
76+
* A more efficient implementation of {@link MessageType#getColumnDescription(String[])}.
6577
*/
66-
static ColumnDescriptor getColumnDescriptor(MessageType schema, String[] path) {
78+
public static ColumnDescriptor getColumnDescriptor(MessageType schema, String[] path) {
6779
if (path.length == 0) {
6880
return null;
6981
}
@@ -106,28 +118,39 @@ static ColumnDescriptor getColumnDescriptor(MessageType schema, String[] path) {
106118
return new ColumnDescriptor(path, primitiveType, repeatedCount, notRequiredCount);
107119
}
108120

109-
static ColumnDescriptor makeColumnDescriptor(Collection<Type> path, PrimitiveType primitiveType) {
110-
final String[] namePath = path.stream().map(Type::getName).toArray(String[]::new);
121+
/**
122+
* A more efficient implementation of {@link MessageType#getColumnDescription(String[])}.
123+
*/
124+
public static ColumnDescriptor getColumnDescriptor(MessageType schema, List<String> path) {
125+
return getColumnDescriptor(schema, path.toArray(new String[0]));
126+
}
127+
128+
public static ColumnDescriptor makeColumnDescriptor(Collection<Type> path, PrimitiveType primitiveType) {
129+
final String[] namePath = makeNamePath(path);
111130
final int maxRep = (int) path.stream().filter(ParquetSchemaUtil::isRepeated).count();
112131
final int maxDef = (int) path.stream().filter(Predicate.not(ParquetSchemaUtil::isRequired)).count();
113132
return new ColumnDescriptor(namePath, primitiveType, maxRep, maxDef);
114133
}
115134

116-
static boolean columnDescriptorEquals(ColumnDescriptor a, ColumnDescriptor b) {
135+
public static boolean columnDescriptorEquals(ColumnDescriptor a, ColumnDescriptor b) {
117136
return a.equals(b)
118137
&& a.getPrimitiveType().equals(b.getPrimitiveType())
119138
&& a.getMaxRepetitionLevel() == b.getMaxRepetitionLevel()
120139
&& a.getMaxDefinitionLevel() == b.getMaxDefinitionLevel();
121140
}
122141

123-
static boolean contains(MessageType schema, ColumnDescriptor descriptor) {
142+
public static boolean contains(MessageType schema, ColumnDescriptor descriptor) {
124143
final ColumnDescriptor cd = getColumnDescriptor(schema, descriptor.getPath());
125144
if (cd == null) {
126145
return false;
127146
}
128147
return columnDescriptorEquals(descriptor, cd);
129148
}
130149

150+
private static String[] makeNamePath(Collection<Type> path) {
151+
return path.stream().map(Type::getName).toArray(String[]::new);
152+
}
153+
131154
private static void walk(Type type, Visitor visitor, Deque<Type> stack) {
132155
if (type.isPrimitive()) {
133156
visitor.accept(stack, type.asPrimitiveType());
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
@InternalUseOnly
2+
package io.deephaven.parquet.impl;
3+
4+
import io.deephaven.util.annotations.InternalUseOnly;

extensions/parquet/base/src/test/java/io/deephaven/parquet/base/TestParquetTimeUtils.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,52 +5,53 @@
55

66
import io.deephaven.time.DateTimeUtils;
77
import io.deephaven.util.QueryConstants;
8-
import junit.framework.TestCase;
9-
import org.junit.Test;
8+
import org.junit.jupiter.api.Test;
109

1110
import java.time.Instant;
1211
import java.time.LocalDateTime;
1312
import java.time.ZoneId;
1413

15-
public class TestParquetTimeUtils {
14+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
15+
16+
class TestParquetTimeUtils {
1617

1718
@Test
18-
public void testEpochNanosUTC() {
19+
void testEpochNanosUTC() {
1920
final long nanos = 123456789123456789L;
2021
final Instant dt2 = Instant.ofEpochSecond(0, nanos);
2122
final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC"));
22-
TestCase.assertEquals(nanos, ParquetTimeUtils.epochNanosUTC(ldt));
23-
TestCase.assertEquals(QueryConstants.NULL_LONG, ParquetTimeUtils.epochNanosUTC(null));
23+
assertThat(ParquetTimeUtils.epochNanosUTC(ldt)).isEqualTo(nanos);
24+
assertThat(ParquetTimeUtils.epochNanosUTC(null)).isEqualTo(QueryConstants.NULL_LONG);
2425
}
2526

2627
@Test
27-
public void testEpochNanosTo() {
28+
void testEpochNanosTo() {
2829
final long nanos = 123456789123456789L;
2930
final Instant dt2 = Instant.ofEpochSecond(0, nanos);
3031
final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC"));
31-
TestCase.assertEquals(ldt, ParquetTimeUtils.epochNanosToLocalDateTimeUTC(nanos));
32-
TestCase.assertNull(ParquetTimeUtils.epochNanosToLocalDateTimeUTC(QueryConstants.NULL_LONG));
32+
assertThat(ParquetTimeUtils.epochNanosToLocalDateTimeUTC(nanos)).isEqualTo(ldt);
33+
assertThat(ParquetTimeUtils.epochNanosToLocalDateTimeUTC(QueryConstants.NULL_LONG)).isNull();
3334
}
3435

3536
@Test
36-
public void testEpochMicrosTo() {
37+
void testEpochMicrosTo() {
3738
long nanos = 123456789123456789L;
3839
final long micros = DateTimeUtils.nanosToMicros(nanos);
3940
nanos = DateTimeUtils.microsToNanos(micros);
4041
final Instant dt2 = Instant.ofEpochSecond(0, nanos);
4142
final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC"));
42-
TestCase.assertEquals(ldt, ParquetTimeUtils.epochMicrosToLocalDateTimeUTC(micros));
43-
TestCase.assertNull(ParquetTimeUtils.epochMicrosToLocalDateTimeUTC(QueryConstants.NULL_LONG));
43+
assertThat(ParquetTimeUtils.epochMicrosToLocalDateTimeUTC(micros)).isEqualTo(ldt);
44+
assertThat(ParquetTimeUtils.epochMicrosToLocalDateTimeUTC(QueryConstants.NULL_LONG)).isNull();
4445
}
4546

4647
@Test
47-
public void testEpochMillisTo() {
48+
void testEpochMillisTo() {
4849
long nanos = 123456789123456789L;
4950
final long millis = DateTimeUtils.nanosToMillis(nanos);
5051
nanos = DateTimeUtils.millisToNanos(millis);
5152
final Instant dt2 = Instant.ofEpochSecond(0, nanos);
5253
final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC"));
53-
TestCase.assertEquals(ldt, ParquetTimeUtils.epochMillisToLocalDateTimeUTC(millis));
54-
TestCase.assertNull(ParquetTimeUtils.epochMillisToLocalDateTimeUTC(QueryConstants.NULL_LONG));
54+
assertThat(ParquetTimeUtils.epochMillisToLocalDateTimeUTC(millis)).isEqualTo(ldt);
55+
assertThat(ParquetTimeUtils.epochMillisToLocalDateTimeUTC(QueryConstants.NULL_LONG)).isNull();
5556
}
5657
}

0 commit comments

Comments
 (0)