Skip to content

Commit f35fc92

Browse files
Merge remote-tracking branch 'delta-io/master' into spark-4.0-upgrade-merge
2 parents c089ca6 + 3cdcdd1 commit f35fc92

36 files changed

+2014
-279
lines changed

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ lazy val kernelApi = (project in file("kernel/kernel-api"))
282282

283283
lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
284284
.dependsOn(kernelApi)
285+
.dependsOn(storage)
285286
.dependsOn(spark % "test->test")
286287
.dependsOn(goldenTables % "test")
287288
.settings(

kernel/examples/run-kernel-examples.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ def clear_artifact_cache():
7171
print("Clearing Delta Kernel artifacts from ivy2 and mvn cache")
7272
delete_if_exists(os.path.expanduser("~/.ivy2/cache/io.delta.kernel"))
7373
delete_if_exists(os.path.expanduser("~/.ivy2/local/io.delta.kernel"))
74-
delete_if_exists(os.path.expanduser("~/.m2/repository/io/delta/kernel/"))
74+
delete_if_exists(os.path.expanduser("~/.ivy2/cache/io.delta"))
75+
delete_if_exists(os.path.expanduser("~/.ivy2/local/io.delta"))
76+
delete_if_exists(os.path.expanduser("~/.m2/repository/io/delta/"))
7577

7678

7779
def delete_if_exists(path):
@@ -163,7 +165,7 @@ def run_cmd(cmd, throw_on_error=True, env=None, stream_output=False, **kwargs):
163165

164166
if args.use_local:
165167
with WorkingDirectory(project_root_dir):
166-
run_cmd(["build/sbt", "kernelGroup/publishM2"], stream_output=True)
168+
run_cmd(["build/sbt", "kernelGroup/publishM2", "storage/publishM2"], stream_output=True)
167169

168170
golden_file_dir = path.join(
169171
examples_root_dir,

kernel/examples/table-reader/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ limitations under the License.-->
5151
<version>${delta-kernel.version}</version>
5252
</dependency>
5353

54+
<dependency>
55+
<groupId>io.delta</groupId>
56+
<artifactId>delta-storage</artifactId>
57+
<version>${delta-kernel.version}</version>
58+
</dependency>
59+
5460
<dependency>
5561
<groupId>org.apache.hadoop</groupId>
5662
<artifactId>hadoop-client-runtime</artifactId>

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/client/DefaultFileSystemClient.java

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,9 @@
1515
*/
1616
package io.delta.kernel.defaults.client;
1717

18-
import java.io.ByteArrayInputStream;
19-
import java.io.DataInputStream;
20-
import java.io.FileNotFoundException;
21-
import java.io.IOException;
22-
import java.util.Arrays;
23-
import java.util.Comparator;
24-
import java.util.Iterator;
18+
import java.io.*;
2519

20+
import io.delta.storage.LogStore;
2621
import org.apache.hadoop.conf.Configuration;
2722
import org.apache.hadoop.fs.FileSystem;
2823
import org.apache.hadoop.fs.Path;
@@ -34,41 +29,58 @@
3429

3530
import io.delta.kernel.internal.util.Utils;
3631

32+
import io.delta.kernel.defaults.internal.logstore.LogStoreProvider;
33+
3734
/**
38-
* Default implementation of {@link FileSystemClient} based on Hadoop APIs.
35+
* Default implementation of {@link FileSystemClient} based on Hadoop APIs. It takes a Hadoop
36+
* {@link Configuration} object to interact with the file system. The following optional
37+
* configurations can be set to customize the behavior of the client:
38+
* <ul>
39+
* <li>{@code io.delta.kernel.logStore.<scheme>.impl} - The class name of the custom
40+
* {@link LogStore} implementation to use for operations on storage systems with the
41+
* specified {@code scheme}. For example, to use a custom {@link LogStore} for S3 storage
42+
* objects:
43+
* <pre>{@code
44+
* <property>
45+
* <name>io.delta.kernel.logStore.s3.impl</name>
46+
* <value>com.example.S3LogStore</value>
47+
* </property>
48+
* }</pre>
49+
* If not set, the default LogStore implementation for the scheme will be used.
50+
* </li>
51+
* <li>{@code delta.enableFastS3AListFrom} - Set to {@code true} to enable fast listing
52+
* functionality when using a {@link LogStore} created for S3 storage objects.
53+
* </li>
54+
* </ul>
55+
*
56+
* The above list of options is not exhaustive. For a complete list of options, refer to the
57+
* specific implementation of {@link FileSystem}.
3958
*/
4059
public class DefaultFileSystemClient
41-
implements FileSystemClient {
60+
implements FileSystemClient {
4261
private final Configuration hadoopConf;
4362

63+
/**
64+
* Create an instance of the default {@link FileSystemClient} implementation.
65+
*
66+
* @param hadoopConf Configuration to use. List of options to customize the behavior of
67+
* the client can be found in the class documentation.
68+
*/
4469
public DefaultFileSystemClient(Configuration hadoopConf) {
4570
this.hadoopConf = hadoopConf;
4671
}
4772

4873
@Override
4974
public CloseableIterator<FileStatus> listFrom(String filePath) throws IOException {
50-
Iterator<org.apache.hadoop.fs.FileStatus> iter;
51-
5275
Path path = new Path(filePath);
53-
FileSystem fs = path.getFileSystem(hadoopConf);
54-
if (!fs.exists(path.getParent())) {
55-
throw new FileNotFoundException(
56-
String.format("No such file or directory: %s", path.getParent())
57-
);
58-
}
59-
org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(path.getParent());
60-
iter = Arrays.stream(files)
61-
.filter(f -> f.getPath().getName().compareTo(path.getName()) >= 0)
62-
.sorted(Comparator.comparing(o -> o.getPath().getName()))
63-
.iterator();
76+
LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, path.toUri().getScheme());
6477

65-
return Utils.toCloseableIterator(iter)
66-
.map(hadoopFileStatus ->
67-
FileStatus.of(
68-
hadoopFileStatus.getPath().toString(),
69-
hadoopFileStatus.getLen(),
70-
hadoopFileStatus.getModificationTime())
71-
);
78+
return Utils.toCloseableIterator(logStore.listFrom(path, hadoopConf))
79+
.map(hadoopFileStatus ->
80+
FileStatus.of(
81+
hadoopFileStatus.getPath().toString(),
82+
hadoopFileStatus.getLen(),
83+
hadoopFileStatus.getModificationTime()));
7284
}
7385

7486
@Override
@@ -81,7 +93,7 @@ public String resolvePath(String path) throws IOException {
8193

8294
@Override
8395
public CloseableIterator<ByteArrayInputStream> readFiles(
84-
CloseableIterator<FileReadRequest> readRequests) {
96+
CloseableIterator<FileReadRequest> readRequests) {
8597
return readRequests.map(elem ->
8698
getStream(elem.getPath(), elem.getStartOffset(), elem.getReadLength()));
8799
}
@@ -97,12 +109,12 @@ private ByteArrayInputStream getStream(String filePath, int offset, int size) {
97109
return new ByteArrayInputStream(buff);
98110
} catch (IOException ex) {
99111
throw new RuntimeException(String.format(
100-
"IOException reading from file %s at offset %s size %s",
101-
filePath, offset, size), ex);
112+
"IOException reading from file %s at offset %s size %s",
113+
filePath, offset, size), ex);
102114
}
103115
} catch (IOException ex) {
104116
throw new RuntimeException(String.format(
105-
"Could not resolve the FileSystem for path %s", filePath), ex);
117+
"Could not resolve the FileSystem for path %s", filePath), ex);
106118
}
107119
}
108120
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.defaults.internal;
17+
18+
import static java.lang.String.format;
19+
20+
public class DefaultTableClientErrors {
21+
22+
// TODO update to be table client exception with future exception framework
23+
// (see delta-io/delta#2231)
24+
public static IllegalArgumentException canNotInstantiateLogStore(String logStoreClassName) {
25+
return new IllegalArgumentException(
26+
format("Can not instantiate `LogStore` class: %s", logStoreClassName));
27+
}
28+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.defaults.internal.logstore;
17+
18+
import java.util.*;
19+
20+
import io.delta.storage.*;
21+
import org.apache.hadoop.conf.Configuration;
22+
23+
import io.delta.kernel.defaults.internal.DefaultTableClientErrors;
24+
25+
/**
26+
* Utility class to provide the correct {@link LogStore} based on the scheme of the path.
27+
*/
28+
public class LogStoreProvider {
29+
30+
// Supported schemes per storage system.
31+
private static final Set<String> S3_SCHEMES = unmodifiableSet("s3", "s3a", "s3n");
32+
private static final Set<String> AZURE_SCHEMES =
33+
unmodifiableSet("abfs", "abfss", "adl", "wasb", "wasbs");
34+
private static final Set<String> GCS_SCHEMES = unmodifiableSet("gs");
35+
36+
/**
37+
* Get the {@link LogStore} instance for the given scheme and configuration. Callers can set
38+
* {@code io.delta.kernel.logStore.<scheme>.impl} to specify the {@link LogStore}
39+
* implementation to use for {@code scheme}.
40+
* <p>
41+
* If not set, the default {@link LogStore} implementation (given below) for the scheme will
42+
* be used.
43+
* <ul>
44+
* <li>{@code s3, s3a, s3n}: {@link S3SingleDriverLogStore}</li>
45+
* <li>{@code abfs, abfss, adl, wasb, wasbs}: {@link AzureLogStore}</li>
46+
* <li>{@code gs}: {@link GCSLogStore}</li>
47+
* <li>{@code hdfs, file}: {@link HDFSLogStore}</li>
48+
* <li>remaining: {@link HDFSLogStore}</li>
49+
* </ul>
50+
*
51+
* @param hadoopConf {@link Configuration} to use for creating the LogStore.
52+
* @param scheme Scheme of the path.
53+
* @return {@link LogStore} instance.
54+
* @throws IllegalArgumentException if the LogStore implementation is not found or can not be
55+
* instantiated.
56+
*/
57+
public static LogStore getLogStore(Configuration hadoopConf, String scheme) {
58+
String schemeLower = Optional.ofNullable(scheme)
59+
.map(String::toLowerCase).orElse(null);
60+
61+
// Check if the LogStore implementation is set in the configuration.
62+
String classNameFromConfig = hadoopConf.get(getLogStoreSchemeConfKey(schemeLower));
63+
if (classNameFromConfig != null) {
64+
try {
65+
return getLogStoreClass(classNameFromConfig)
66+
.getConstructor(Configuration.class)
67+
.newInstance(hadoopConf);
68+
} catch (Exception e) {
69+
throw DefaultTableClientErrors.canNotInstantiateLogStore(classNameFromConfig);
70+
}
71+
}
72+
73+
// Create default LogStore based on the scheme.
74+
String defaultClassName = HDFSLogStore.class.getName();
75+
if (S3_SCHEMES.contains(schemeLower)) {
76+
defaultClassName = S3SingleDriverLogStore.class.getName();
77+
} else if (AZURE_SCHEMES.contains(schemeLower)) {
78+
defaultClassName = AzureLogStore.class.getName();
79+
} else if (GCS_SCHEMES.contains(schemeLower)) {
80+
defaultClassName = GCSLogStore.class.getName();
81+
}
82+
83+
try {
84+
return getLogStoreClass(defaultClassName)
85+
.getConstructor(Configuration.class)
86+
.newInstance(hadoopConf);
87+
} catch (Exception e) {
88+
throw DefaultTableClientErrors.canNotInstantiateLogStore(defaultClassName);
89+
}
90+
}
91+
92+
/**
93+
* Configuration key for setting the LogStore implementation for a scheme.
94+
* ex: `io.delta.kernel.logStore.s3.impl` -> `io.delta.storage.S3SingleDriverLogStore`
95+
*/
96+
static String getLogStoreSchemeConfKey(String scheme) {
97+
return "io.delta.kernel.logStore." + scheme + ".impl";
98+
}
99+
100+
/**
101+
* Utility method to get the LogStore class from the class name.
102+
*/
103+
private static Class<? extends LogStore> getLogStoreClass(String logStoreClassName)
104+
throws ClassNotFoundException {
105+
return Class.forName(
106+
logStoreClassName,
107+
true /* initialize */,
108+
Thread.currentThread().getContextClassLoader())
109+
.asSubclass(LogStore.class);
110+
}
111+
112+
/**
113+
* Remove this method once we start supporting JDK9+
114+
*/
115+
private static Set<String> unmodifiableSet(String... elements) {
116+
return Collections.unmodifiableSet(new HashSet<>(Arrays.asList(elements)));
117+
}
118+
}

0 commit comments

Comments
 (0)