Skip to content

Commit fbf24d8

Browse files
committed
Fix GCS Tink for hadoop3
1 parent 6e3dcac commit fbf24d8

File tree

2 files changed

+163
-80
lines changed

2 files changed

+163
-80
lines changed

pom.xml

Lines changed: 80 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,86 @@
379379
</exclusion>
380380
</exclusions>
381381
</dependency>
382+
<dependency>
383+
<groupId>org.apache.hadoop</groupId>
384+
<artifactId>hadoop-common</artifactId>
385+
<version>${hadoop.version}</version>
386+
<scope>provided</scope>
387+
<exclusions>
388+
<exclusion>
389+
<groupId>com.google.protobuf</groupId>
390+
<artifactId>protobuf-java</artifactId>
391+
</exclusion>
392+
<exclusion>
393+
<groupId>commons-logging</groupId>
394+
<artifactId>commons-logging</artifactId>
395+
</exclusion>
396+
<exclusion>
397+
<groupId>log4j</groupId>
398+
<artifactId>log4j</artifactId>
399+
</exclusion>
400+
<exclusion>
401+
<groupId>org.slf4j</groupId>
402+
<artifactId>slf4j-log4j12</artifactId>
403+
</exclusion>
404+
<exclusion>
405+
<groupId>org.apache.avro</groupId>
406+
<artifactId>avro</artifactId>
407+
</exclusion>
408+
<exclusion>
409+
<groupId>org.apache.zookeeper</groupId>
410+
<artifactId>zookeeper</artifactId>
411+
</exclusion>
412+
<exclusion>
413+
<artifactId>guava</artifactId>
414+
<groupId>com.google.guava</groupId>
415+
</exclusion>
416+
<exclusion>
417+
<artifactId>jersey-core</artifactId>
418+
<groupId>com.sun.jersey</groupId>
419+
</exclusion>
420+
<exclusion>
421+
<artifactId>jersey-json</artifactId>
422+
<groupId>com.sun.jersey</groupId>
423+
</exclusion>
424+
<exclusion>
425+
<artifactId>jersey-server</artifactId>
426+
<groupId>com.sun.jersey</groupId>
427+
</exclusion>
428+
<exclusion>
429+
<artifactId>servlet-api</artifactId>
430+
<groupId>javax.servlet</groupId>
431+
</exclusion>
432+
<exclusion>
433+
<groupId>org.mortbay.jetty</groupId>
434+
<artifactId>jetty</artifactId>
435+
</exclusion>
436+
<exclusion>
437+
<groupId>org.mortbay.jetty</groupId>
438+
<artifactId>jetty-util</artifactId>
439+
</exclusion>
440+
<exclusion>
441+
<artifactId>jasper-compiler</artifactId>
442+
<groupId>tomcat</groupId>
443+
</exclusion>
444+
<exclusion>
445+
<artifactId>jasper-runtime</artifactId>
446+
<groupId>tomcat</groupId>
447+
</exclusion>
448+
<exclusion>
449+
<artifactId>jsp-api</artifactId>
450+
<groupId>javax.servlet.jsp</groupId>
451+
</exclusion>
452+
<exclusion>
453+
<artifactId>slf4j-api</artifactId>
454+
<groupId>org.slf4j</groupId>
455+
</exclusion>
456+
<exclusion>
457+
<groupId>org.slf4j</groupId>
458+
<artifactId>slf4j-reload4j</artifactId>
459+
</exclusion>
460+
</exclusions>
461+
</dependency>
382462
<dependency>
383463
<!--
384464
Required by bigtable-hbase-1.x-mapreduce instead of excluded non-shaded version.
@@ -461,86 +541,6 @@
461541
<artifactId>google-cloud-datastore</artifactId>
462542
<version>${google.cloud.datastore.version}</version>
463543
</dependency>
464-
<dependency>
465-
<groupId>org.apache.hadoop</groupId>
466-
<artifactId>hadoop-common</artifactId>
467-
<version>${hadoop.version}</version>
468-
<scope>provided</scope>
469-
<exclusions>
470-
<exclusion>
471-
<groupId>com.google.protobuf</groupId>
472-
<artifactId>protobuf-java</artifactId>
473-
</exclusion>
474-
<exclusion>
475-
<groupId>commons-logging</groupId>
476-
<artifactId>commons-logging</artifactId>
477-
</exclusion>
478-
<exclusion>
479-
<groupId>log4j</groupId>
480-
<artifactId>log4j</artifactId>
481-
</exclusion>
482-
<exclusion>
483-
<groupId>org.slf4j</groupId>
484-
<artifactId>slf4j-log4j12</artifactId>
485-
</exclusion>
486-
<exclusion>
487-
<groupId>org.apache.avro</groupId>
488-
<artifactId>avro</artifactId>
489-
</exclusion>
490-
<exclusion>
491-
<groupId>org.apache.zookeeper</groupId>
492-
<artifactId>zookeeper</artifactId>
493-
</exclusion>
494-
<exclusion>
495-
<artifactId>guava</artifactId>
496-
<groupId>com.google.guava</groupId>
497-
</exclusion>
498-
<exclusion>
499-
<artifactId>jersey-core</artifactId>
500-
<groupId>com.sun.jersey</groupId>
501-
</exclusion>
502-
<exclusion>
503-
<artifactId>jersey-json</artifactId>
504-
<groupId>com.sun.jersey</groupId>
505-
</exclusion>
506-
<exclusion>
507-
<artifactId>jersey-server</artifactId>
508-
<groupId>com.sun.jersey</groupId>
509-
</exclusion>
510-
<exclusion>
511-
<artifactId>servlet-api</artifactId>
512-
<groupId>javax.servlet</groupId>
513-
</exclusion>
514-
<exclusion>
515-
<groupId>org.mortbay.jetty</groupId>
516-
<artifactId>jetty</artifactId>
517-
</exclusion>
518-
<exclusion>
519-
<groupId>org.mortbay.jetty</groupId>
520-
<artifactId>jetty-util</artifactId>
521-
</exclusion>
522-
<exclusion>
523-
<artifactId>jasper-compiler</artifactId>
524-
<groupId>tomcat</groupId>
525-
</exclusion>
526-
<exclusion>
527-
<artifactId>jasper-runtime</artifactId>
528-
<groupId>tomcat</groupId>
529-
</exclusion>
530-
<exclusion>
531-
<artifactId>jsp-api</artifactId>
532-
<groupId>javax.servlet.jsp</groupId>
533-
</exclusion>
534-
<exclusion>
535-
<artifactId>slf4j-api</artifactId>
536-
<groupId>org.slf4j</groupId>
537-
</exclusion>
538-
<exclusion>
539-
<groupId>org.slf4j</groupId>
540-
<artifactId>slf4j-reload4j</artifactId>
541-
</exclusion>
542-
</exclusions>
543-
</dependency>
544544
<dependency>
545545
<groupId>org.apache.hadoop</groupId>
546546
<artifactId>hadoop-mapreduce-client-core</artifactId>

src/main/java/io/cdap/plugin/gcp/crypto/EncryptedFileSystem.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818

1919
import org.apache.hadoop.conf.Configurable;
2020
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
2122
import org.apache.hadoop.fs.FSDataInputStream;
2223
import org.apache.hadoop.fs.FSInputStream;
2324
import org.apache.hadoop.fs.FileSystem;
2425
import org.apache.hadoop.fs.FilterFileSystem;
26+
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
2527
import org.apache.hadoop.fs.Path;
28+
import org.apache.hadoop.fs.impl.OpenFileParameters;
29+
import org.jetbrains.annotations.NotNull;
2630
import org.slf4j.Logger;
2731
import org.slf4j.LoggerFactory;
2832

@@ -32,6 +36,8 @@
3236
import java.nio.channels.Channels;
3337
import java.nio.channels.SeekableByteChannel;
3438
import java.util.Map;
39+
import java.util.concurrent.CompletableFuture;
40+
import java.util.concurrent.CompletionException;
3541

3642
/**
3743
* A hadoop {@link FileSystem} that support files decryption (encryption is currently not supported).
@@ -103,6 +109,83 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
103109
return new FSDataInputStream(new SeekableByteChannelFSInputStream(decryptor.open(fs, path, bufferSize)));
104110
}
105111

112+
/**
113+
* Opens a file asynchronously and returns a {@link FutureDataInputStreamBuilder}
114+
* to build a {@link FSDataInputStream} for the specified {@link Path}.
115+
*
116+
* <p>This implementation returns a builder that constructs an input stream by using a decryptor
117+
* to open the file through a {@link SeekableByteChannelFSInputStream}. The file is read
118+
* with a buffer size of 4096 bytes.</p>
119+
*
120+
* @param path the {@link Path} of the file to open
121+
* @return a {@link FutureDataInputStreamBuilder} that asynchronously builds a {@link FSDataInputStream}
122+
* @throws UnsupportedOperationException if the operation is not supported
123+
*/
124+
@Override
125+
public FutureDataInputStreamBuilder openFile(Path path) throws UnsupportedOperationException {
126+
return new FutureDataInputStreamBuilder() {
127+
@Override
128+
public CompletableFuture<FSDataInputStream> build()
129+
throws IllegalArgumentException, UnsupportedOperationException {
130+
return CompletableFuture.supplyAsync(() -> {
131+
try {
132+
return new FSDataInputStream(new SeekableByteChannelFSInputStream(
133+
decryptor.open(fs, path, CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT)));
134+
} catch (Exception e) {
135+
throw new CompletionException(e);
136+
}
137+
});
138+
}
139+
140+
@Override
141+
public FutureDataInputStreamBuilder opt(@NotNull String s, @NotNull String s1) {
142+
return this;
143+
}
144+
145+
@Override
146+
public FutureDataInputStreamBuilder opt(@NotNull String s, @NotNull String... strings) {
147+
return this;
148+
}
149+
150+
@Override
151+
public FutureDataInputStreamBuilder must(@NotNull String s, @NotNull String s1) {
152+
return this;
153+
}
154+
155+
@Override
156+
public FutureDataInputStreamBuilder must(@NotNull String s, @NotNull String... strings) {
157+
return this;
158+
}
159+
};
160+
}
161+
162+
/**
163+
* Opens a file asynchronously using the provided {@link Path}, and returns
164+
* a {@link CompletableFuture} that supplies a {@link FSDataInputStream}.
165+
*
166+
* <p>This method uses a decryptor to open the file and wraps it in a {@link SeekableByteChannelFSInputStream}.
167+
* It uses the buffer size specified in the {@code parameters}; if the buffer size is not greater than zero,
168+
* a default of 4096 bytes is used.</p>
169+
*
170+
* @param path the {@link Path} to the file to open
171+
* @param parameters the {@link OpenFileParameters} containing optional configuration, such as buffer size
172+
* @return a {@link CompletableFuture} that will complete with the {@link FSDataInputStream}
173+
* @throws CompletionException if an exception occurs during file opening
174+
*/
175+
@Override
176+
protected CompletableFuture<FSDataInputStream> openFileWithOptions(Path path, OpenFileParameters parameters) {
177+
return CompletableFuture.supplyAsync(() -> {
178+
try {
179+
int bufferSize = parameters.getBufferSize() > 0 ? parameters.getBufferSize()
180+
: CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
181+
return new FSDataInputStream(
182+
new SeekableByteChannelFSInputStream(decryptor.open(fs, path, bufferSize)));
183+
} catch (Exception e) {
184+
throw new CompletionException(e);
185+
}
186+
});
187+
}
188+
106189
/**
107190
* A {@link FSInputStream} implementation backed by a {@link SeekableByteChannel}.
108191
*/

0 commit comments

Comments
 (0)