Skip to content

Commit d19f505

Browse files
sayedkeikamboehm7
authored andcommitted
[SYSTEMDS-2229] Extended I/O Framework: Readers/Writers for Parquet
Closes #2229.
1 parent 4b2d83e commit d19f505

File tree

7 files changed

+828
-0
lines changed

7 files changed

+828
-0
lines changed

src/main/java/org/apache/sysds/common/Types.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -868,6 +868,7 @@ public enum FileFormat {
868868
PROTO, // protocol buffer representation
869869
HDF5, // Hierarchical Data Format (HDF)
870870
COG, // Cloud-optimized GeoTIFF
871+
PARQUET, // parquet format for columnar data storage
871872
UNKNOWN;
872873

873874
public boolean isIJV() {
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.sysds.runtime.io;
20+
21+
import java.io.IOException;
22+
import java.io.InputStream;
23+
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.fs.Path;
26+
import org.apache.parquet.example.data.Group;
27+
import org.apache.parquet.hadoop.ParquetFileReader;
28+
import org.apache.parquet.hadoop.ParquetReader;
29+
import org.apache.parquet.hadoop.example.GroupReadSupport;
30+
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
31+
import org.apache.parquet.hadoop.util.HadoopInputFile;
32+
import org.apache.parquet.schema.MessageType;
33+
import org.apache.parquet.schema.PrimitiveType;
34+
import org.apache.sysds.common.Types.ValueType;
35+
import org.apache.sysds.conf.ConfigurationManager;
36+
import org.apache.sysds.runtime.DMLRuntimeException;
37+
import org.apache.sysds.runtime.frame.data.FrameBlock;
38+
import org.apache.sysds.runtime.util.HDFSTool;
39+
40+
/**
41+
* Single-threaded frame parquet reader.
42+
*
43+
*/
44+
public class FrameReaderParquet extends FrameReader {
45+
46+
/**
47+
* Reads a Parquet file from HDFS and converts it into a FrameBlock.
48+
*
49+
* @param fname The HDFS file path to the Parquet file.
50+
* @param schema The expected data types of the columns.
51+
* @param names The names of the columns.
52+
* @param rlen The expected number of rows.
53+
* @param clen The expected number of columns.
54+
* @return A FrameBlock containing the data read from the Parquet file.
55+
*/
56+
@Override
57+
public FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen) throws IOException, DMLRuntimeException {
58+
// Prepare file access
59+
Configuration conf = ConfigurationManager.getCachedJobConf();
60+
Path path = new Path(fname);
61+
62+
// Check existence and non-empty file
63+
if (!HDFSTool.existsFileOnHDFS(path.toString())) {
64+
throw new IOException("File does not exist on HDFS: " + fname);
65+
}
66+
67+
// Allocate output frame block
68+
ValueType[] lschema = createOutputSchema(schema, clen);
69+
String[] lnames = createOutputNames(names, clen);
70+
FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen);
71+
72+
// Read Parquet file
73+
readParquetFrameFromHDFS(path, conf, ret, lschema, rlen, clen);
74+
75+
return ret;
76+
}
77+
78+
/**
79+
* Reads data from a Parquet file on HDFS and fills the provided FrameBlock.
80+
* The method retrieves the Parquet schema from the file footer, maps the required column names
81+
* to their corresponding indices, and then uses a ParquetReader to iterate over each row.
82+
* Data is extracted based on the column type and set into the output FrameBlock.
83+
*
84+
* @param path The HDFS path to the Parquet file.
85+
* @param conf The Hadoop configuration.
86+
* @param dest The FrameBlock to populate with data.
87+
* @param schema The expected value types for the output columns.
88+
* @param rlen The expected number of rows.
89+
* @param clen The expected number of columns.
90+
*/
91+
protected void readParquetFrameFromHDFS(Path path, Configuration conf, FrameBlock dest, ValueType[] schema, long rlen, long clen) throws IOException {
92+
// Retrieve schema from Parquet footer
93+
ParquetMetadata metadata = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf)).getFooter();
94+
MessageType parquetSchema = metadata.getFileMetaData().getSchema();
95+
96+
// Map column names to Parquet schema indices
97+
String[] columnNames = dest.getColumnNames();
98+
int[] columnIndices = new int[columnNames.length];
99+
for (int i = 0; i < columnNames.length; i++) {
100+
columnIndices[i] = parquetSchema.getFieldIndex(columnNames[i]);
101+
}
102+
103+
// Read data usind ParquetReader
104+
try (ParquetReader<Group> rowReader = ParquetReader.builder(new GroupReadSupport(), path)
105+
.withConf(conf)
106+
.build()) {
107+
108+
Group group;
109+
int row = 0;
110+
while ((group = rowReader.read()) != null) {
111+
for (int col = 0; col < clen; col++) {
112+
int colIndex = columnIndices[col];
113+
if (group.getFieldRepetitionCount(colIndex) > 0) {
114+
PrimitiveType.PrimitiveTypeName type = parquetSchema.getType(columnNames[col]).asPrimitiveType().getPrimitiveTypeName();
115+
switch (type) {
116+
case INT32:
117+
dest.set(row, col, group.getInteger(colIndex, 0));
118+
break;
119+
case INT64:
120+
dest.set(row, col, group.getLong(colIndex, 0));
121+
break;
122+
case FLOAT:
123+
dest.set(row, col, group.getFloat(colIndex, 0));
124+
break;
125+
case DOUBLE:
126+
dest.set(row, col, group.getDouble(colIndex, 0));
127+
break;
128+
case BOOLEAN:
129+
dest.set(row, col, group.getBoolean(colIndex, 0));
130+
break;
131+
case BINARY:
132+
dest.set(row, col, group.getBinary(colIndex, 0).toStringUsingUTF8());
133+
break;
134+
default:
135+
throw new IOException("Unsupported data type: " + type);
136+
}
137+
} else {
138+
dest.set(row, col, null);
139+
}
140+
}
141+
row++;
142+
}
143+
144+
// Check frame dimensions
145+
if (row != rlen) {
146+
throw new IOException("Mismatch in row count: expected " + rlen + ", but got " + row);
147+
}
148+
}
149+
}
150+
151+
//not implemented
152+
@Override
153+
public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] schema, String[] names, long rlen, long clen)
154+
throws IOException, DMLRuntimeException {
155+
throw new UnsupportedOperationException("Unimplemented method 'readFrameFromInputStream'");
156+
}
157+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.sysds.runtime.io;
20+
21+
import java.io.IOException;
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.concurrent.Callable;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Future;
27+
import org.apache.hadoop.conf.Configuration;
28+
import org.apache.hadoop.fs.FileSystem;
29+
import org.apache.hadoop.fs.Path;
30+
import org.apache.parquet.example.data.Group;
31+
import org.apache.parquet.hadoop.ParquetReader;
32+
import org.apache.parquet.hadoop.example.GroupReadSupport;
33+
import org.apache.sysds.common.Types.ValueType;
34+
import org.apache.sysds.hops.OptimizerUtils;
35+
import org.apache.sysds.runtime.DMLRuntimeException;
36+
import org.apache.sysds.runtime.frame.data.FrameBlock;
37+
import org.apache.sysds.runtime.util.CommonThreadPool;
38+
39+
/**
40+
* Multi-threaded frame parquet reader.
41+
*
42+
*/
43+
public class FrameReaderParquetParallel extends FrameReaderParquet {
44+
45+
/**
46+
* Reads a Parquet frame in parallel and populates the provided FrameBlock with the data.
47+
* The method retrieves all file paths from the sequence files at that location, it then determines
48+
* the number of threads to use based on the available files and a configured parallelism setting.
49+
* A thread pool is created to run a reading task for each file concurrently.
50+
*
51+
* @param path The HDFS path to the Parquet file or the directory containing sequence files.
52+
* @param conf The Hadoop configuration.
53+
* @param dest The FrameBlock to be updated with the data read from the files.
54+
* @param schema The expected value types for the frame columns.
55+
* @param rlen The expected number of rows.
56+
* @param clen The expected number of columns.
57+
*/
58+
@Override
59+
protected void readParquetFrameFromHDFS(Path path, Configuration conf, FrameBlock dest, ValueType[] schema, long rlen, long clen) throws IOException, DMLRuntimeException {
60+
FileSystem fs = IOUtilFunctions.getFileSystem(path);
61+
Path[] files = IOUtilFunctions.getSequenceFilePaths(fs, path);
62+
int numThreads = Math.min(OptimizerUtils.getParallelBinaryReadParallelism(), files.length);
63+
64+
// Create and execute read tasks
65+
ExecutorService pool = CommonThreadPool.get(numThreads);
66+
try {
67+
List<ReadFileTask> tasks = new ArrayList<>();
68+
for (Path file : files) {
69+
tasks.add(new ReadFileTask(file, conf, dest, schema, clen));
70+
}
71+
72+
for (Future<Object> task : pool.invokeAll(tasks)) {
73+
task.get();
74+
}
75+
} catch (Exception e) {
76+
throw new IOException("Failed parallel read of Parquet frame.", e);
77+
} finally {
78+
pool.shutdown();
79+
}
80+
}
81+
82+
private class ReadFileTask implements Callable<Object> {
83+
private Path path;
84+
private Configuration conf;
85+
private FrameBlock dest;
86+
@SuppressWarnings("unused")
87+
private ValueType[] schema;
88+
private long clen;
89+
90+
public ReadFileTask(Path path, Configuration conf, FrameBlock dest, ValueType[] schema, long clen) {
91+
this.path = path;
92+
this.conf = conf;
93+
this.dest = dest;
94+
this.schema = schema;
95+
this.clen = clen;
96+
}
97+
98+
// When executed, a ParquetReader for the assigned file opens and iterates over each row processing every column.
99+
@Override
100+
public Object call() throws Exception {
101+
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), path).withConf(conf).build()) {
102+
Group group;
103+
int row = 0;
104+
while ((group = reader.read()) != null) {
105+
for (int col = 0; col < clen; col++) {
106+
if (group.getFieldRepetitionCount(col) > 0) {
107+
dest.set(row, col, group.getValueToString(col, 0));
108+
} else {
109+
dest.set(row, col, null);
110+
}
111+
}
112+
row++;
113+
}
114+
}
115+
return null;
116+
}
117+
}
118+
}

0 commit comments

Comments
 (0)