Skip to content

Commit a470a34

Browse files
committed
Initial pass at adding ORC to Iceberg.
Known problems: * Doesn't do schema evolution. * Doesn't include column size metrics. * Doesn't properly handle timestamp with timezone. * Doesn't do the schema mangling for partitions.
1 parent b82956a commit a470a34

File tree

20 files changed

+2821
-169
lines changed

20 files changed

+2821
-169
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@
33
*.iml
44
# gradle build
55
.gradle
6-
build
6+
build
7+
out

api/src/main/java/com/netflix/iceberg/FileFormat.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
* Enum of supported file formats.
2323
*/
2424
public enum FileFormat {
25+
ORC("orc"),
2526
PARQUET("parquet"),
2627
AVRO("avro");
2728

api/src/main/java/com/netflix/iceberg/UpdateProperties.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,10 @@ public interface UpdateProperties extends PendingUpdate<Map<String, String>> {
4747
*/
4848
UpdateProperties remove(String key);
4949

50+
/**
51+
* Set the file format for the table.
52+
* @param format
53+
* @return this
54+
*/
55+
UpdateProperties format(FileFormat format);
5056
}

build.gradle

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ subprojects {
6060

6161
ext {
6262
avroVersion = '1.8.2'
63+
orcVersion = '1.4.2'
6364
parquetVersion = '1.9.1-SNAPSHOT'
6465

6566
jacksonVersion = '2.6.7'
@@ -114,6 +115,19 @@ project(':iceberg-core') {
114115
}
115116
}
116117

118+
project(':iceberg-orc') {
119+
dependencies {
120+
compile project(':iceberg-api')
121+
compile project(':iceberg-core')
122+
123+
compile "org.apache.orc:orc-core:$orcVersion:nohive"
124+
125+
compileOnly('org.apache.hadoop:hadoop-client:2.7.3') {
126+
exclude group: 'org.apache.avro', module: 'avro'
127+
}
128+
}
129+
}
130+
117131
project(':iceberg-parquet') {
118132
dependencies {
119133
compile project(':iceberg-api')
@@ -137,6 +151,7 @@ project(':iceberg-spark') {
137151
compile project(':iceberg-common')
138152
compile project(':iceberg-avro')
139153
compile project(':iceberg-core')
154+
compile project(':iceberg-orc')
140155
compile project(':iceberg-parquet')
141156

142157
compileOnly "org.apache.avro:avro:$avroVersion"
@@ -174,10 +189,12 @@ project(':iceberg-runtime') {
174189
shadow project(':iceberg-common')
175190
shadow project(':iceberg-avro')
176191
shadow project(':iceberg-core')
192+
shadow project(':iceberg-orc')
177193
shadow project(':iceberg-parquet')
178194
shadow project(':iceberg-spark')
179195

180196
shadow "org.apache.avro:avro:$avroVersion"
197+
shadow "org.apache.orc:orc-core:$orcVersion:nohive"
181198
shadow "org.apache.parquet:parquet-avro:$parquetVersion"
182199
}
183200

core/src/main/java/com/netflix/iceberg/PropertiesUpdate.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ public UpdateProperties remove(String key) {
6868
return this;
6969
}
7070

71+
@Override
72+
public UpdateProperties format(FileFormat format) {
73+
set(TableProperties.DEFAULT_FILE_FORMAT, format.name());
74+
return this;
75+
}
76+
7177
@Override
7278
public Map<String, String> apply() {
7379
this.base = ops.refresh();
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Copyright 2018 Hortonworks
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.netflix.iceberg.orc;
18+
19+
import com.google.common.base.Preconditions;
20+
import com.netflix.iceberg.PartitionSpec;
21+
import com.netflix.iceberg.Schema;
22+
import com.netflix.iceberg.io.InputFile;
23+
import com.netflix.iceberg.io.OutputFile;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.fs.Path;
26+
import org.apache.orc.OrcFile;
27+
import org.apache.orc.Reader;
28+
import org.apache.orc.TypeDescription;
29+
30+
import java.io.IOException;
31+
import java.nio.charset.StandardCharsets;
32+
import java.util.ArrayList;
33+
import java.util.HashMap;
34+
import java.util.List;
35+
import java.util.Map;
36+
import java.util.Properties;
37+
38+
public class ORC {
39+
private ORC() {
40+
}
41+
42+
public static WriteBuilder write(OutputFile file) {
43+
return new WriteBuilder(file);
44+
}
45+
46+
public static class WriteBuilder {
47+
private final OutputFile file;
48+
private Schema schema = null;
49+
private PartitionSpec spec = null;
50+
private Configuration conf = null;
51+
private final Properties tableProperties = new Properties();
52+
private Map<String, byte[]> metadata = new HashMap<>();
53+
54+
private WriteBuilder(OutputFile file) {
55+
this.file = file;
56+
}
57+
58+
public WriteBuilder partitionSpec(PartitionSpec spec) {
59+
this.spec = spec;
60+
return this;
61+
}
62+
63+
public WriteBuilder metadata(String property, String value) {
64+
metadata.put(property, value.getBytes(StandardCharsets.UTF_8));
65+
return this;
66+
}
67+
68+
public WriteBuilder tableProperties(Properties properties) {
69+
tableProperties.putAll(properties);
70+
return this;
71+
}
72+
73+
public WriteBuilder schema(Schema schema) {
74+
this.schema = schema;
75+
return this;
76+
}
77+
78+
public WriteBuilder conf(Configuration conf) {
79+
this.conf = conf;
80+
return this;
81+
}
82+
83+
public OrcFileAppender build() {
84+
Preconditions.checkNotNull(schema, "PartitionSpec is required");
85+
if (conf == null) {
86+
conf = new Configuration();
87+
}
88+
OrcFile.WriterOptions options =
89+
OrcFile.writerOptions(tableProperties, conf);
90+
return new OrcFileAppender(schema, spec, file, options, metadata);
91+
}
92+
}
93+
94+
public static ReadBuilder read(InputFile file) {
95+
return new ReadBuilder(file);
96+
}
97+
98+
public static class ReadBuilder {
99+
private final InputFile file;
100+
private com.netflix.iceberg.Schema schema = null;
101+
private Long start = null;
102+
private Long length = null;
103+
private Configuration conf = null;
104+
105+
private ReadBuilder(InputFile file) {
106+
Preconditions.checkNotNull(file, "Input file cannot be null");
107+
this.file = file;
108+
}
109+
110+
/**
111+
* Restricts the read to the given range: [start, start + length).
112+
*
113+
* @param start the start position for this read
114+
* @param length the length of the range this read should scan
115+
* @return this builder for method chaining
116+
*/
117+
public ReadBuilder split(long start, long length) {
118+
this.start = start;
119+
this.length = length;
120+
return this;
121+
}
122+
123+
public ReadBuilder schema(com.netflix.iceberg.Schema schema) {
124+
this.schema = schema;
125+
return this;
126+
}
127+
128+
public ReadBuilder conf(Configuration conf) {
129+
this.conf = conf;
130+
return this;
131+
}
132+
133+
public OrcIterator build() {
134+
Preconditions.checkNotNull(schema, "Schema is required");
135+
try {
136+
Path path = new Path(file.location());
137+
if (conf == null) {
138+
conf = new Configuration();
139+
}
140+
Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
141+
List<Integer> columnIds = new ArrayList<>();
142+
TypeDescription orcSchema = TypeConversion.toOrc(schema, columnIds);
143+
Reader.Options options = reader.options();
144+
if (start != null) {
145+
options.range(start, length);
146+
}
147+
options.schema(orcSchema);
148+
return new OrcIterator(path, orcSchema, reader.rows(options));
149+
} catch (IOException e) {
150+
throw new RuntimeException("Can't open " + file.location(), e);
151+
}
152+
}
153+
}
154+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright 2018 Hortonworks
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/package com.netflix.iceberg.orc;
16+
17+
import com.netflix.iceberg.Metrics;
18+
import com.netflix.iceberg.PartitionSpec;
19+
import com.netflix.iceberg.Schema;
20+
import com.netflix.iceberg.io.FileAppender;
21+
import com.netflix.iceberg.io.OutputFile;
22+
import org.apache.hadoop.fs.Path;
23+
import org.apache.orc.ColumnStatistics;
24+
import org.apache.orc.TypeDescription;
25+
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
26+
import org.apache.orc.OrcFile;
27+
import org.apache.orc.Writer;
28+
29+
import java.io.IOException;
30+
import java.nio.ByteBuffer;
31+
import java.nio.charset.StandardCharsets;
32+
import java.util.ArrayList;
33+
import java.util.HashMap;
34+
import java.util.List;
35+
import java.util.Map;
36+
37+
/**
38+
* Create a file appender for ORC.
39+
*/
40+
public class OrcFileAppender implements FileAppender<VectorizedRowBatch> {
41+
private final Writer writer;
42+
private final TypeDescription orcSchema;
43+
private final List<Integer> columnIds = new ArrayList<>();
44+
private final Path path;
45+
46+
public static final String COLUMN_NUMBERS_ATTRIBUTE = "iceberg.column.ids";
47+
48+
static ByteBuffer buidIdString(List<Integer> list) {
49+
StringBuilder buffer = new StringBuilder();
50+
for(int i=0; i < list.size(); ++i) {
51+
if (i != 0) {
52+
buffer.append(',');
53+
}
54+
buffer.append(list.get(i));
55+
}
56+
return ByteBuffer.wrap(buffer.toString().getBytes(StandardCharsets.UTF_8));
57+
}
58+
59+
OrcFileAppender(Schema schema,
60+
PartitionSpec spec,
61+
OutputFile file,
62+
OrcFile.WriterOptions options,
63+
Map<String,byte[]> metadata) {
64+
orcSchema = TypeConversion.toOrc(schema, columnIds);
65+
options.setSchema(orcSchema);
66+
path = new Path(file.location());
67+
try {
68+
writer = OrcFile.createWriter(path, options);
69+
} catch (IOException e) {
70+
throw new RuntimeException("Can't create file " + path, e);
71+
}
72+
writer.addUserMetadata(COLUMN_NUMBERS_ATTRIBUTE, buidIdString(columnIds));
73+
metadata.forEach(
74+
(key,value) -> writer.addUserMetadata(key, ByteBuffer.wrap(value)));
75+
}
76+
77+
@Override
78+
public void add(VectorizedRowBatch datum) {
79+
try {
80+
writer.addRowBatch(datum);
81+
} catch (IOException e) {
82+
throw new RuntimeException("Problem writing to ORC file " + path, e);
83+
}
84+
}
85+
86+
@Override
87+
public Metrics metrics() {
88+
try {
89+
long rows = writer.getNumberOfRows();
90+
ColumnStatistics[] stats = writer.getStatistics();
91+
// we don't currently have columnSizes or distinct counts.
92+
Map<Integer, Long> valueCounts = new HashMap<>();
93+
Map<Integer, Long> nullCounts = new HashMap<>();
94+
for(int c=1; c < stats.length; ++c) {
95+
int fieldId = columnIds.get(c);
96+
valueCounts.put(fieldId, stats[c].getNumberOfValues());
97+
}
98+
for(TypeDescription child: orcSchema.getChildren()) {
99+
int c = child.getId();
100+
int fieldId = columnIds.get(c);
101+
nullCounts.put(fieldId, rows - stats[c].getNumberOfValues());
102+
}
103+
return new Metrics(rows, null, valueCounts, nullCounts);
104+
} catch (IOException e) {
105+
throw new RuntimeException("Can't get statistics " + path, e);
106+
}
107+
}
108+
109+
@Override
110+
public void close() throws IOException {
111+
writer.close();
112+
}
113+
114+
public TypeDescription getSchema() {
115+
return orcSchema;
116+
}
117+
}

0 commit comments

Comments
 (0)