Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit c7307ac

Browse files
committed
[SPARK-15689][SQL] data source v2 read path
## What changes were proposed in this pull request? This PR adds the infrastructure for data source v2, and implement features which Spark already have in data source v1, i.e. column pruning, filter push down, catalyst expression filter push down, InternalRow scan, schema inference, data size report. The write path is excluded to avoid making this PR growing too big, and will be added in follow-up PR. ## How was this patch tested? new tests Author: Wenchen Fan <[email protected]> Closes apache#19136 from cloud-fan/data-source-v2.
1 parent 79a4dab commit c7307ac

25 files changed

+1518
-7
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
import org.apache.spark.annotation.InterfaceStability;
21+
22+
/**
23+
* The base interface for data source v2. Implementations must have a public, no arguments
24+
* constructor.
25+
*
26+
* Note that this is an empty interface, data source implementations should mix-in at least one of
27+
* the plug-in interfaces like {@link ReadSupport}. Otherwise it's just a dummy data source which is
28+
* un-readable/writable.
29+
*/
30+
@InterfaceStability.Evolving
31+
public interface DataSourceV2 {}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
import java.util.HashMap;
21+
import java.util.Locale;
22+
import java.util.Map;
23+
import java.util.Optional;
24+
25+
import org.apache.spark.annotation.InterfaceStability;
26+
27+
/**
28+
* An immutable string-to-string map in which keys are case-insensitive. This is used to represent
29+
* data source options.
30+
*/
31+
@InterfaceStability.Evolving
32+
public class DataSourceV2Options {
33+
private final Map<String, String> keyLowerCasedMap;
34+
35+
private String toLowerCase(String key) {
36+
return key.toLowerCase(Locale.ROOT);
37+
}
38+
39+
public DataSourceV2Options(Map<String, String> originalMap) {
40+
keyLowerCasedMap = new HashMap<>(originalMap.size());
41+
for (Map.Entry<String, String> entry : originalMap.entrySet()) {
42+
keyLowerCasedMap.put(toLowerCase(entry.getKey()), entry.getValue());
43+
}
44+
}
45+
46+
/**
47+
* Returns the option value to which the specified key is mapped, case-insensitively.
48+
*/
49+
public Optional<String> get(String key) {
50+
return Optional.ofNullable(keyLowerCasedMap.get(toLowerCase(key)));
51+
}
52+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
import org.apache.spark.annotation.InterfaceStability;
21+
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
22+
23+
/**
24+
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
25+
* provide data reading ability and scan the data from the data source.
26+
*/
27+
@InterfaceStability.Evolving
28+
public interface ReadSupport {
29+
30+
/**
31+
* Creates a {@link DataSourceV2Reader} to scan the data from this data source.
32+
*
33+
* @param options the options for this data source reader, which is an immutable case-insensitive
34+
* string-to-string map.
35+
* @return a reader that implements the actual read logic.
36+
*/
37+
DataSourceV2Reader createReader(DataSourceV2Options options);
38+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
import org.apache.spark.annotation.InterfaceStability;
21+
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
22+
import org.apache.spark.sql.types.StructType;
23+
24+
/**
25+
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
26+
* provide data reading ability and scan the data from the data source.
27+
*
28+
* This is a variant of {@link ReadSupport} that accepts user-specified schema when reading data.
29+
* A data source can implement both {@link ReadSupport} and {@link ReadSupportWithSchema} if it
30+
* supports both schema inference and user-specified schema.
31+
*/
32+
@InterfaceStability.Evolving
33+
public interface ReadSupportWithSchema {
34+
35+
/**
36+
* Create a {@link DataSourceV2Reader} to scan the data from this data source.
37+
*
38+
* @param schema the full schema of this data source reader. Full schema usually maps to the
39+
* physical schema of the underlying storage of this data source reader, e.g.
40+
* CSV files, JSON files, etc, while this reader may not read data with full
41+
* schema, as column pruning or other optimizations may happen.
42+
* @param options the options for this data source reader, which is an immutable case-insensitive
43+
* string-to-string map.
44+
* @return a reader that implements the actual read logic.
45+
*/
46+
DataSourceV2Reader createReader(StructType schema, DataSourceV2Options options);
47+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2.reader;
19+
20+
import java.io.Closeable;
21+
22+
import org.apache.spark.annotation.InterfaceStability;
23+
24+
/**
25+
* A data reader returned by {@link ReadTask#createReader()} and is responsible for outputting data
26+
* for a RDD partition.
27+
*/
28+
@InterfaceStability.Evolving
29+
public interface DataReader<T> extends Closeable {
30+
31+
/**
32+
* Proceed to next record, returns false if there is no more records.
33+
*/
34+
boolean next();
35+
36+
/**
37+
* Return the current record. This method should return same value until `next` is called.
38+
*/
39+
T get();
40+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2.reader;
19+
20+
import java.util.List;
21+
22+
import org.apache.spark.annotation.InterfaceStability;
23+
import org.apache.spark.sql.Row;
24+
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
25+
import org.apache.spark.sql.sources.v2.ReadSupport;
26+
import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
27+
import org.apache.spark.sql.types.StructType;
28+
29+
/**
30+
* A data source reader that is returned by
31+
* {@link ReadSupport#createReader(DataSourceV2Options)} or
32+
* {@link ReadSupportWithSchema#createReader(StructType, DataSourceV2Options)}.
33+
* It can mix in various query optimization interfaces to speed up the data scan. The actual scan
34+
* logic should be delegated to {@link ReadTask}s that are returned by {@link #createReadTasks()}.
35+
*
36+
* There are mainly 3 kinds of query optimizations:
37+
* 1. Operators push-down. E.g., filter push-down, required columns push-down(aka column
38+
* pruning), etc. These push-down interfaces are named like `SupportsPushDownXXX`.
39+
* 2. Information Reporting. E.g., statistics reporting, ordering reporting, etc. These
40+
* reporting interfaces are named like `SupportsReportingXXX`.
41+
* 3. Special scans. E.g, columnar scan, unsafe row scan, etc. These scan interfaces are named
42+
* like `SupportsScanXXX`.
43+
*
44+
* Spark first applies all operator push-down optimizations that this data source supports. Then
45+
* Spark collects information this data source reported for further optimizations. Finally Spark
46+
* issues the scan request and does the actual data reading.
47+
*/
48+
@InterfaceStability.Evolving
49+
public interface DataSourceV2Reader {
50+
51+
/**
52+
* Returns the actual schema of this data source reader, which may be different from the physical
53+
* schema of the underlying storage, as column pruning or other optimizations may happen.
54+
*/
55+
StructType readSchema();
56+
57+
/**
58+
* Returns a list of read tasks. Each task is responsible for outputting data for one RDD
59+
* partition. That means the number of tasks returned here is same as the number of RDD
60+
* partitions this scan outputs.
61+
*
62+
* Note that, this may not be a full scan if the data source reader mixes in other optimization
63+
* interfaces like column pruning, filter push-down, etc. These optimizations are applied before
64+
* Spark issues the scan request.
65+
*/
66+
List<ReadTask<Row>> createReadTasks();
67+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2.reader;
19+
20+
import java.io.Serializable;
21+
22+
import org.apache.spark.annotation.InterfaceStability;
23+
24+
/**
25+
* A read task returned by {@link DataSourceV2Reader#createReadTasks()} and is responsible for
26+
* creating the actual data reader. The relationship between {@link ReadTask} and {@link DataReader}
27+
* is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}.
28+
*
29+
* Note that, the read task will be serialized and sent to executors, then the data reader will be
30+
* created on executors and do the actual reading.
31+
*/
32+
@InterfaceStability.Evolving
33+
public interface ReadTask<T> extends Serializable {
34+
35+
/**
36+
* The preferred locations where this read task can run faster, but Spark does not guarantee that
37+
* this task will always run on these locations. The implementations should make sure that it can
38+
* be run on any location. The location is a string representing the host name of an executor.
39+
*/
40+
default String[] preferredLocations() {
41+
return new String[0];
42+
}
43+
44+
/**
45+
* Returns a data reader to do the actual reading work for this read task.
46+
*/
47+
DataReader<T> createReader();
48+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2.reader;
19+
20+
import java.util.OptionalLong;
21+
22+
import org.apache.spark.annotation.InterfaceStability;
23+
24+
/**
25+
* An interface to represent statistics for a data source, which is returned by
26+
* {@link SupportsReportStatistics#getStatistics()}.
27+
*/
28+
@InterfaceStability.Evolving
29+
public interface Statistics {
30+
OptionalLong sizeInBytes();
31+
OptionalLong numRows();
32+
}

0 commit comments

Comments
 (0)