Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.storage;

import java.net.URI;

/**
* Simple helper to select a {@link StorageProvider} implementation based on a
* Hadoop/NIO constructs. This keeps provider discovery logic outside of call-sites.
*/
public final class Storage {
private Storage() {}

/**
* Return the appropriate provider for the given URI scheme.
*/
public static StorageProvider select(URI uri) {
final String scheme = uri == null ? null : uri.getScheme();

if (scheme == null || "file".equals(scheme)) {
return new org.apache.parquet.storage.impl.NioStorageProvider();
}

try {
Class<?> confClass = Class.forName("org.apache.hadoop.conf.Configuration");
Object conf = confClass.getDeclaredConstructor().newInstance();
Class<?> providerClass = Class.forName("org.apache.parquet.storage.impl.hadoop.HadoopStorageProvider");
return (StorageProvider) providerClass.getConstructor(confClass).newInstance(conf);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(
"Hadoop not on classpath for URI scheme hdfs, please ensure hadoop dependencies are available in classpath",
e);
} catch (ReflectiveOperationException e) {
throw new IllegalStateException("Failed to initialize HadoopStorageProvider", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.storage;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
* Simple abstraction for file IO that allows Parquet components to work with
* different storage back-ends (NIO, Hadoop, cloud SDKs, etc.) without taking
* an explicit dependency on the back-end libraries.
*
* <p>The interface purposefully exposes a very small surface – just the
* operations currently needed by the Parquet code we intend to decouple from
* Hadoop. Additional methods can be added incrementally as other areas are
* ported.</p>
*
* <p>This interface lives in the <code>parquet-common</code> module so that it
* can be referenced by any other module without creating additional coupling
* between them.</p>
*/
public interface StorageProvider {

/**
* Opens the given path for reading.
*
* @param path fully-qualified file path (implementation specific semantics)
* @return an InputStream that must be closed by the caller
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JavaDoc should specify what specific IOException
subclasses might be thrown (e.g., FileNotFoundException,
AccessDeniedException) to help implementers and users handle
errors appropriately.

*/
InputStream openForRead(String path) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future if we want to use this abstraction to read parquet files, the code requires to create a SeekableInputStream
Can be more useful to use SeekableInputStream that already extends InputStream?


/**
* Opens the given path for writing. If the file already exists the behaviour
* depends on the overwrite flag.
*
* @param path fully-qualified file path
* @param overwrite whether an existing file should be replaced
* @return an OutputStream that must be closed by the caller
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interface should clarify stream ownership and closing
responsibilities. Consider returning AutoCloseable wrappers or
documenting that callers must use try-with-resources.

OutputStream openForWrite(String path, boolean overwrite) throws IOException;

/**
* Deletes the file at path if it exists.
*/
boolean delete(String path) throws IOException;

/**
* Renames the file located at source to target.
*/
boolean rename(String source, String target) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.storage.impl;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import org.apache.parquet.storage.StorageProvider;

/**
* A StorageProvider that relies solely on the Java NIO standard library. It
* can work with any FileSystem implementation available
* on the class-path.
*/
public class NioStorageProvider implements StorageProvider {

public NioStorageProvider() {}

private static Path toPath(String s) {
try {
java.net.URI uri = java.net.URI.create(s);
if (uri.getScheme() != null) {
return Paths.get(uri);
}
} catch (IllegalArgumentException ignore) {
}
return Paths.get(s);
}

@Override
public InputStream openForRead(String path) throws IOException {
return Files.newInputStream(toPath(path));
}

@Override
public OutputStream openForWrite(String path, boolean overwrite) throws IOException {
Path p = toPath(path);
if (overwrite) {
return Files.newOutputStream(
p, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
} else {
return Files.newOutputStream(p, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
}
}

@Override
public boolean delete(String path) throws IOException {
return Files.deleteIfExists(toPath(path));
}

@Override
public boolean rename(String source, String target) throws IOException {
Path src = toPath(source);
Path tgt = toPath(target);
if (Files.exists(tgt)) return false;
try {
Files.move(src, tgt, StandardCopyOption.ATOMIC_MOVE);
} catch (AtomicMoveNotSupportedException e) {
Files.move(src, tgt);
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

org.apache.parquet.storage.impl.NioStorageProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.storage;

import static org.junit.Assert.*;

import java.io.IOException;
import java.net.URI;
import org.junit.Test;

public class TestStorage {

@Test
public void testFileSchemeReturnsNioProvider() {
StorageProvider provider = Storage.select(URI.create("file:///tmp/test.parquet"));
assertTrue(
"file:// should return NioStorageProvider",
provider instanceof org.apache.parquet.storage.impl.NioStorageProvider);
}

@Test
public void testNullUriReturnsNioProvider() {
StorageProvider provider = Storage.select(null);
assertTrue(
"null URI should return NioStorageProvider",
provider instanceof org.apache.parquet.storage.impl.NioStorageProvider);
}

@Test
public void testRelativePathReturnsNioProvider() {
StorageProvider provider = Storage.select(URI.create("relative/path"));
assertTrue(
"relative path should return NioStorageProvider",
provider instanceof org.apache.parquet.storage.impl.NioStorageProvider);
}

@Test(expected = IllegalStateException.class)
public void testHdfsSchemeThrowsExceptionWhenHadoopNotAvailable() {
Storage.select(URI.create("hdfs://namenode:8020/test.parquet"));
}

@Test(expected = IllegalStateException.class)
public void testUnknownSchemeThrowsExceptionWhenHadoopNotAvailable() {
Storage.select(URI.create("s3://bucket/test.parquet"));
}

@Test
public void testNioProviderBasicOperations() throws IOException {
StorageProvider provider = Storage.select(URI.create("file:///tmp/test.parquet"));

assertTrue(provider instanceof org.apache.parquet.storage.impl.NioStorageProvider);
}

@Test
public void testProviderSelectionLogic() {
StorageProvider fileProvider = Storage.select(URI.create("file:///tmp/test.parquet"));
assertTrue(
"file:// should always return NIO provider",
fileProvider instanceof org.apache.parquet.storage.impl.NioStorageProvider);

try {
Storage.select(URI.create("hdfs://namenode:8020/test.parquet"));
fail("Should throw IllegalStateException for hdfs:// when Hadoop not available");
} catch (IllegalStateException e) {
}

try {
Storage.select(URI.create("s3://bucket/test.parquet"));
fail("Should throw IllegalStateException for s3:// when Hadoop not available");
} catch (IllegalStateException e) {
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.storage.impl.hadoop;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.storage.StorageProvider;

/**
* Hadoop-backed implementation of {@link StorageProvider}. Lives in the
* parquet-hadoop module, under the same {@code org.apache.parquet.storage.impl}
* hierarchy as other providers.
*/
public class HadoopStorageProvider implements StorageProvider {

private final FileSystem fs;

/**
* Uses default Hadoop Configuration.
*/
public HadoopStorageProvider() throws IOException {
this(new Configuration());
}

public HadoopStorageProvider(Configuration conf) throws IOException {
this.fs = FileSystem.get(conf);
}

@Override
public InputStream openForRead(String path) throws IOException {
Path p = new Path(path);
FSDataInputStream in = fs.open(p);
// Deliberately return the Hadoop stream as InputStream; callers should
// not rely on Hadoop-specific methods.
return in;
}

@Override
public OutputStream openForWrite(String path, boolean overwrite) throws IOException {
Path p = new Path(path);
FSDataOutputStream out = fs.create(p, overwrite);
return out;
}

@Override
public boolean delete(String path) throws IOException {
return fs.delete(new Path(path), false);
}

@Override
public boolean rename(String source, String target) throws IOException {
return fs.rename(new Path(source), new Path(target));
}
}
Loading