Skip to content

Commit ae97c63

Browse files
committed
Add geometry and geography support for iceberg-parquet and iceberg-data
1 parent 77c5146 commit ae97c63

File tree

13 files changed

+659
-9
lines changed

13 files changed

+659
-9
lines changed

api/src/main/java/org/apache/iceberg/expressions/Evaluator.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,14 +160,22 @@ public <T> Boolean notStartsWith(Bound<T> valueExpr, Literal<T> lit) {
160160

161161
@Override
162162
public <T> Boolean stIntersects(Bound<T> valueExpr, Literal<BoundingBox> literal) {
163-
throw new UnsupportedOperationException(
164-
"Evaluation of stIntersects against geometry/geography value is not implemented.");
163+
// Evaluation of stIntersects against geometry/geography value is not supported. Spatial
164+
// predicates only
165+
// supports data skipping but not filtering individual records in iceberg-api. Readers should
166+
// expect
167+
// false-positives and run the actual spatial filters on their own.
168+
return true;
165169
}
166170

167171
@Override
168172
public <T> Boolean stDisjoint(Bound<T> valueExpr, Literal<BoundingBox> literal) {
169-
throw new UnsupportedOperationException(
170-
"Evaluation of stDisjoint against geometry/geography value is not implemented.");
173+
// Evaluation of stIntersects against geometry/geography value is not supported. Spatial
174+
// predicates only
175+
// supports data skipping but not filtering individual records in iceberg-api. Readers should
176+
// expect
177+
// false-positives and run the actual spatial filters on their own.
178+
return true;
171179
}
172180
}
173181
}

api/src/test/java/org/apache/iceberg/expressions/TestEvaluator.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -843,9 +843,6 @@ public void testGeospatialPredicates(Expression.Operation operation, String colu
843843

844844
Evaluator evaluator =
845845
new Evaluator(geoStruct, Expressions.geospatialPredicate(operation, columnName, bbox));
846-
assertThatThrownBy(() -> evaluator.eval(TestHelpers.Row.of(wkb, wkb)))
847-
.isInstanceOf(UnsupportedOperationException.class)
848-
.hasMessageMatching(
849-
"Evaluation of \\w+ against geometry/geography value is not implemented.");
846+
assertThat(evaluator.eval(TestHelpers.Row.of(wkb, wkb))).isTrue();
850847
}
851848
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.data.parquet;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
23+
import java.io.File;
24+
import java.io.IOException;
25+
import java.nio.ByteBuffer;
26+
import java.nio.file.Path;
27+
import java.util.List;
28+
import org.apache.hadoop.conf.Configuration;
29+
import org.apache.iceberg.DataFile;
30+
import org.apache.iceberg.DataFiles;
31+
import org.apache.iceberg.FileFormat;
32+
import org.apache.iceberg.Metrics;
33+
import org.apache.iceberg.PartitionSpec;
34+
import org.apache.iceberg.Schema;
35+
import org.apache.iceberg.Table;
36+
import org.apache.iceberg.TableProperties;
37+
import org.apache.iceberg.Tables;
38+
import org.apache.iceberg.data.GenericAppenderFactory;
39+
import org.apache.iceberg.data.GenericRecord;
40+
import org.apache.iceberg.data.IcebergGenerics;
41+
import org.apache.iceberg.data.Record;
42+
import org.apache.iceberg.hadoop.HadoopInputFile;
43+
import org.apache.iceberg.hadoop.HadoopOutputFile;
44+
import org.apache.iceberg.hadoop.HadoopTables;
45+
import org.apache.iceberg.io.CloseableIterable;
46+
import org.apache.iceberg.io.OutputFile;
47+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
48+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
49+
import org.apache.iceberg.types.Types;
50+
import org.apache.iceberg.util.ByteBuffers;
51+
import org.junit.jupiter.api.Test;
52+
import org.junit.jupiter.api.io.TempDir;
53+
import org.locationtech.jts.geom.Coordinate;
54+
import org.locationtech.jts.geom.Geometry;
55+
import org.locationtech.jts.geom.GeometryFactory;
56+
import org.locationtech.jts.geom.Point;
57+
import org.locationtech.jts.io.ParseException;
58+
import org.locationtech.jts.io.WKBReader;
59+
import org.locationtech.jts.io.WKBWriter;
60+
61+
public class TestGeographyReadersAndWriters {
62+
private final Schema schema;
63+
private static final Configuration CONF = new Configuration();
64+
private static final Tables TABLES = new HadoopTables(CONF);
65+
66+
@TempDir Path tempDir;
67+
68+
private final List<Record> testData;
69+
70+
public TestGeographyReadersAndWriters() {
71+
this.schema =
72+
new Schema(
73+
Types.NestedField.required(1, "id", Types.LongType.get()),
74+
Types.NestedField.optional(3, "geog", Types.GeographyType.crs84()));
75+
testData = prepareTestData();
76+
}
77+
78+
private List<Record> prepareTestData() {
79+
List<Record> recordList = Lists.newArrayList();
80+
GeometryFactory factory = new GeometryFactory();
81+
WKBWriter wkbWriter = new WKBWriter();
82+
for (long id = 0; id < 1000; id++) {
83+
// lng: -100 to 100, lat: -50 to 50
84+
double lng = id * 0.2 - 100;
85+
double lat = id * 0.1 - 50;
86+
Coordinate center = new Coordinate(lng, lat);
87+
byte[] wkb = wkbWriter.write(factory.createPoint(center));
88+
ByteBuffer geog = ByteBuffer.wrap(wkb);
89+
Record record = GenericRecord.create(schema);
90+
record.setField("id", id);
91+
record.setField("geog", geog);
92+
recordList.add(record);
93+
}
94+
return recordList;
95+
}
96+
97+
@Test
98+
public void testWriteAndReadGeometryValues() throws IOException, ParseException {
99+
// Create a table
100+
File location = tempDir.resolve("geog-table-1").toFile();
101+
Table table =
102+
TABLES.create(
103+
schema,
104+
PartitionSpec.unpartitioned(),
105+
ImmutableMap.of(
106+
TableProperties.FORMAT_VERSION, "3",
107+
TableProperties.DEFAULT_FILE_FORMAT, "parquet"),
108+
location.toString());
109+
110+
// Write some data
111+
GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema());
112+
Path path = tempDir.resolve("data.parquet");
113+
OutputFile outputFile =
114+
HadoopOutputFile.fromPath(new org.apache.hadoop.fs.Path(path.toString()), CONF);
115+
try (var fileAppender = appenderFactory.newAppender(outputFile, FileFormat.PARQUET)) {
116+
fileAppender.addAll(testData);
117+
fileAppender.close();
118+
Metrics metrics = fileAppender.metrics();
119+
120+
// Commit the data file to the table
121+
DataFile dataFile =
122+
DataFiles.builder(PartitionSpec.unpartitioned())
123+
.withInputFile(
124+
HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(path.toString()), CONF))
125+
.withFormat(FileFormat.PARQUET)
126+
.withMetrics(metrics)
127+
.build();
128+
table.newAppend().appendFile(dataFile).commit();
129+
130+
// Read the data
131+
WKBReader wkbReader = new WKBReader();
132+
try (CloseableIterable<Record> reader = IcebergGenerics.read(table).build()) {
133+
int numRecords = 0;
134+
for (Record record : reader) {
135+
ByteBuffer geogWkb = (ByteBuffer) record.getField("geog");
136+
Geometry geometry = wkbReader.read(ByteBuffers.toByteArray(geogWkb));
137+
assertThat(geometry).isInstanceOf(Point.class);
138+
numRecords++;
139+
}
140+
assertThat(numRecords).as("Record count must match").isEqualTo(testData.size());
141+
}
142+
}
143+
}
144+
}

0 commit comments

Comments
 (0)