From 323b5dd6664f5b16e8e51d6c599995c79ad0ca50 Mon Sep 17 00:00:00 2001 From: arnavb Date: Mon, 25 Aug 2025 06:27:43 +0000 Subject: [PATCH 1/3] update --- .../org/apache/parquet/storage/Storage.java | 56 ++++++++++++ .../parquet/storage/StorageProvider.java | 69 +++++++++++++++ .../storage/impl/NioStorageProvider.java | 85 +++++++++++++++++++ ...org.apache.parquet.storage.StorageProvider | 18 ++++ .../apache/parquet/storage/TestStorage.java | 74 ++++++++++++++++ .../impl/hadoop/HadoopStorageProvider.java | 77 +++++++++++++++++ ...org.apache.parquet.storage.StorageProvider | 18 ++++ 7 files changed, 397 insertions(+) create mode 100644 parquet-common/src/main/java/org/apache/parquet/storage/Storage.java create mode 100644 parquet-common/src/main/java/org/apache/parquet/storage/StorageProvider.java create mode 100644 parquet-common/src/main/java/org/apache/parquet/storage/impl/NioStorageProvider.java create mode 100644 parquet-common/src/main/resources/META-INF/services/org.apache.parquet.storage.StorageProvider create mode 100644 parquet-common/src/test/java/org/apache/parquet/storage/TestStorage.java create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/storage/impl/hadoop/HadoopStorageProvider.java create mode 100644 parquet-hadoop/src/main/resources/META-INF/services/org.apache.parquet.storage.StorageProvider diff --git a/parquet-common/src/main/java/org/apache/parquet/storage/Storage.java b/parquet-common/src/main/java/org/apache/parquet/storage/Storage.java new file mode 100644 index 0000000000..2eae2d29ce --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/storage/Storage.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.storage; + +import java.net.URI; + +/** + * Simple helper to select a {@link StorageProvider} implementation based on a + * Hadoop/NIO constructs. This keeps provider discovery logic outside of call-sites. + */ +public final class Storage { + private Storage() {} + + /** + * Return the appropriate provider for the given URI scheme. + * - file:// or no scheme → NIO provider + * - hdfs:// → Hadoop provider (if available) + */ + public static StorageProvider select(URI uri) { + final String scheme = uri == null ? null : uri.getScheme(); + + if (scheme == null || "file".equals(scheme)) { + return new org.apache.parquet.storage.impl.NioStorageProvider(); + } + + try { + Class confClass = Class.forName("org.apache.hadoop.conf.Configuration"); + Object conf = confClass.getDeclaredConstructor().newInstance(); + Class providerClass = Class.forName("org.apache.parquet.storage.impl.hadoop.HadoopStorageProvider"); + return (StorageProvider) providerClass.getConstructor(confClass).newInstance(conf); + } catch (ClassNotFoundException e) { + throw new IllegalStateException( + "Hadoop not on classpath for URI scheme hdfs, please ensure hadoop dependencies are available in classpath", + e); + } catch (ReflectiveOperationException e) { + throw new IllegalStateException("Failed to initialize HadoopStorageProvider", e); + } + } +} diff --git a/parquet-common/src/main/java/org/apache/parquet/storage/StorageProvider.java b/parquet-common/src/main/java/org/apache/parquet/storage/StorageProvider.java new file mode 100644 index 0000000000..1787b0bfdb --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/storage/StorageProvider.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.storage; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * Simple abstraction for file IO that allows Parquet components to work with + * different storage back-ends (NIO, Hadoop, cloud SDKs, etc.) without taking + * an explicit dependency on the back-end libraries. + * + *

The interface purposefully exposes a very small surface – just the + * operations currently needed by the Parquet code we intend to decouple from + * Hadoop. Additional methods can be added incrementally as other areas are + * ported.

+ * + *

This interface lives in the parquet-common module so that it + * can be referenced by any other module without creating additional coupling + * between them.

+ */ +public interface StorageProvider { + + /** + * Opens the given path for reading. + * + * @param path fully-qualified file path (implementation specific semantics) + * @return an InputStream that must be closed by the caller + */ + InputStream openForRead(String path) throws IOException; + + /** + * Opens the given path for writing. If the file already exists the behaviour + * depends on the overwrite flag. + * + * @param path fully-qualified file path + * @param overwrite whether an existing file should be replaced + * @return an OutputStream that must be closed by the caller + */ + OutputStream openForWrite(String path, boolean overwrite) throws IOException; + + /** + * Deletes the file at path if it exists. + */ + boolean delete(String path) throws IOException; + + /** + * Renames the file located at source to target. + */ + boolean rename(String source, String target) throws IOException; +} diff --git a/parquet-common/src/main/java/org/apache/parquet/storage/impl/NioStorageProvider.java b/parquet-common/src/main/java/org/apache/parquet/storage/impl/NioStorageProvider.java new file mode 100644 index 0000000000..4ba38d3daa --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/storage/impl/NioStorageProvider.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.storage.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import org.apache.parquet.storage.StorageProvider; + +/** + * A StorageProvider that relies solely on the Java NIO standard library. It + * can work with any FileSystem implementation available + * on the class-path. + */ +public class NioStorageProvider implements StorageProvider { + + public NioStorageProvider() {} + + private static Path toPath(String s) { + try { + java.net.URI uri = java.net.URI.create(s); + if (uri.getScheme() != null) { + return Paths.get(uri); + } + } catch (IllegalArgumentException ignore) { + } + return Paths.get(s); + } + + @Override + public InputStream openForRead(String path) throws IOException { + return Files.newInputStream(toPath(path)); + } + + @Override + public OutputStream openForWrite(String path, boolean overwrite) throws IOException { + Path p = toPath(path); + if (overwrite) { + return Files.newOutputStream( + p, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE); + } else { + return Files.newOutputStream(p, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE); + } + } + + @Override + public boolean delete(String path) throws IOException { + return Files.deleteIfExists(toPath(path)); + } + + @Override + public boolean rename(String source, String target) throws IOException { + Path src = toPath(source); + Path tgt = toPath(target); + if (Files.exists(tgt)) return false; + try { + Files.move(src, tgt, StandardCopyOption.ATOMIC_MOVE); + } catch (AtomicMoveNotSupportedException e) { + Files.move(src, tgt); + } + return true; + } +} diff --git a/parquet-common/src/main/resources/META-INF/services/org.apache.parquet.storage.StorageProvider b/parquet-common/src/main/resources/META-INF/services/org.apache.parquet.storage.StorageProvider new file mode 100644 index 0000000000..0c49514a24 --- /dev/null +++ b/parquet-common/src/main/resources/META-INF/services/org.apache.parquet.storage.StorageProvider @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +org.apache.parquet.storage.impl.NioStorageProvider diff --git a/parquet-common/src/test/java/org/apache/parquet/storage/TestStorage.java b/parquet-common/src/test/java/org/apache/parquet/storage/TestStorage.java new file mode 100644 index 0000000000..ee652a4510 --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/storage/TestStorage.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.storage; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.net.URI; +import org.junit.Test; + +public class TestStorage { + + @Test + public void testFileSchemeReturnsNioProvider() { + StorageProvider provider = Storage.select(URI.create("file:///tmp/test.parquet")); + assertTrue( + "file:// should return NioStorageProvider", + provider instanceof org.apache.parquet.storage.impl.NioStorageProvider); + } + + @Test + public void testNullUriReturnsNioProvider() { + StorageProvider provider = Storage.select(null); + assertTrue( + "null URI should return NioStorageProvider", + provider instanceof org.apache.parquet.storage.impl.NioStorageProvider); + } + + @Test + public void testRelativePathReturnsNioProvider() { + StorageProvider provider = Storage.select(URI.create("relative/path")); + assertTrue( + "relative path should return NioStorageProvider", + provider instanceof org.apache.parquet.storage.impl.NioStorageProvider); + } + + @Test + public void testHdfsSchemeReturnsHadoopProvider() { + StorageProvider provider = Storage.select(URI.create("hdfs://namenode:8020/test.parquet")); + assertNotNull("Should return a provider", provider); + } + + @Test + public void testUnknownSchemeReturnsNioProvider() { + StorageProvider provider = Storage.select(URI.create("s3://bucket/test.parquet")); + assertTrue( + "unknown scheme should return NioStorageProvider", + provider instanceof org.apache.parquet.storage.impl.NioStorageProvider); + } + + @Test + public void testNioProviderBasicOperations() throws IOException { + StorageProvider provider = Storage.select(URI.create("file:///tmp/test.parquet")); + + assertTrue(provider instanceof org.apache.parquet.storage.impl.NioStorageProvider); + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/storage/impl/hadoop/HadoopStorageProvider.java b/parquet-hadoop/src/main/java/org/apache/parquet/storage/impl/hadoop/HadoopStorageProvider.java new file mode 100644 index 0000000000..a9cb493953 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/storage/impl/hadoop/HadoopStorageProvider.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.storage.impl.hadoop; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.storage.StorageProvider; + +/** + * Hadoop-backed implementation of {@link StorageProvider}. Lives in the + * parquet-hadoop module, under the same {@code org.apache.parquet.storage.impl} + * hierarchy as other providers. + */ +public class HadoopStorageProvider implements StorageProvider { + + private final FileSystem fs; + + /** + * Uses default Hadoop Configuration. + */ + public HadoopStorageProvider() throws IOException { + this(new Configuration()); + } + + public HadoopStorageProvider(Configuration conf) throws IOException { + this.fs = FileSystem.get(conf); + } + + @Override + public InputStream openForRead(String path) throws IOException { + Path p = new Path(path); + FSDataInputStream in = fs.open(p); + // Deliberately return the Hadoop stream as InputStream; callers should + // not rely on Hadoop-specific methods. + return in; + } + + @Override + public OutputStream openForWrite(String path, boolean overwrite) throws IOException { + Path p = new Path(path); + FSDataOutputStream out = fs.create(p, overwrite); + return out; + } + + @Override + public boolean delete(String path) throws IOException { + return fs.delete(new Path(path), false); + } + + @Override + public boolean rename(String source, String target) throws IOException { + return fs.rename(new Path(source), new Path(target)); + } +} diff --git a/parquet-hadoop/src/main/resources/META-INF/services/org.apache.parquet.storage.StorageProvider b/parquet-hadoop/src/main/resources/META-INF/services/org.apache.parquet.storage.StorageProvider new file mode 100644 index 0000000000..deff4a3ba4 --- /dev/null +++ b/parquet-hadoop/src/main/resources/META-INF/services/org.apache.parquet.storage.StorageProvider @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +org.apache.parquet.storage.impl.hadoop.HadoopStorageProvider From d483f895bb18899df6256c97b4098bca1d080908 Mon Sep 17 00:00:00 2001 From: arnavb Date: Mon, 25 Aug 2025 07:08:56 +0000 Subject: [PATCH 2/3] update --- .../storage/impl/NioStorageProvider.java | 1 + .../apache/parquet/storage/TestStorage.java | 36 +++++++++++++------ 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/storage/impl/NioStorageProvider.java b/parquet-common/src/main/java/org/apache/parquet/storage/impl/NioStorageProvider.java index 4ba38d3daa..2c795930e5 100644 --- a/parquet-common/src/main/java/org/apache/parquet/storage/impl/NioStorageProvider.java +++ b/parquet-common/src/main/java/org/apache/parquet/storage/impl/NioStorageProvider.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.file.AtomicMoveNotSupportedException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; diff --git a/parquet-common/src/test/java/org/apache/parquet/storage/TestStorage.java b/parquet-common/src/test/java/org/apache/parquet/storage/TestStorage.java index ee652a4510..a25ee5b53e 100644 --- a/parquet-common/src/test/java/org/apache/parquet/storage/TestStorage.java +++ b/parquet-common/src/test/java/org/apache/parquet/storage/TestStorage.java @@ -51,18 +51,14 @@ public void testRelativePathReturnsNioProvider() { provider instanceof org.apache.parquet.storage.impl.NioStorageProvider); } - @Test - public void testHdfsSchemeReturnsHadoopProvider() { - StorageProvider provider = Storage.select(URI.create("hdfs://namenode:8020/test.parquet")); - assertNotNull("Should return a provider", provider); + @Test(expected = IllegalStateException.class) + public void testHdfsSchemeThrowsExceptionWhenHadoopNotAvailable() { + Storage.select(URI.create("hdfs://namenode:8020/test.parquet")); } - @Test - public void testUnknownSchemeReturnsNioProvider() { - StorageProvider provider = Storage.select(URI.create("s3://bucket/test.parquet")); - assertTrue( - "unknown scheme should return NioStorageProvider", - provider instanceof org.apache.parquet.storage.impl.NioStorageProvider); + @Test(expected = IllegalStateException.class) + public void testUnknownSchemeThrowsExceptionWhenHadoopNotAvailable() { + Storage.select(URI.create("s3://bucket/test.parquet")); } @Test @@ -71,4 +67,24 @@ public void testNioProviderBasicOperations() throws IOException { assertTrue(provider instanceof org.apache.parquet.storage.impl.NioStorageProvider); } + + @Test + public void testProviderSelectionLogic() { + StorageProvider fileProvider = Storage.select(URI.create("file:///tmp/test.parquet")); + assertTrue( + "file:// should always return NIO provider", + fileProvider instanceof org.apache.parquet.storage.impl.NioStorageProvider); + + try { + Storage.select(URI.create("hdfs://namenode:8020/test.parquet")); + fail("Should throw IllegalStateException for hdfs:// when Hadoop not available"); + } catch (IllegalStateException e) { + } + + try { + Storage.select(URI.create("s3://bucket/test.parquet")); + fail("Should throw IllegalStateException for s3:// when Hadoop not available"); + } catch (IllegalStateException e) { + } + } } From 7184e2bf6171230dca92877945422819923c1a7b Mon Sep 17 00:00:00 2001 From: arnavb Date: Mon, 25 Aug 2025 07:23:28 +0000 Subject: [PATCH 3/3] update --- .../src/main/java/org/apache/parquet/storage/Storage.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/storage/Storage.java b/parquet-common/src/main/java/org/apache/parquet/storage/Storage.java index 2eae2d29ce..b0e4709211 100644 --- a/parquet-common/src/main/java/org/apache/parquet/storage/Storage.java +++ b/parquet-common/src/main/java/org/apache/parquet/storage/Storage.java @@ -30,8 +30,6 @@ private Storage() {} /** * Return the appropriate provider for the given URI scheme. - * - file:// or no scheme → NIO provider - * - hdfs:// → Hadoop provider (if available) */ public static StorageProvider select(URI uri) { final String scheme = uri == null ? null : uri.getScheme();