Skip to content

Commit b8bca7b

Browse files
committed
Added S3 storage driver
1 parent 75063ce commit b8bca7b

File tree

8 files changed

+117
-45
lines changed

8 files changed

+117
-45
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ dependencies {
4040
implementation group: 'com.beust', name: 'jcommander', version: jCommanderVersion
4141
implementation group: 'com.almworks.integers', name: 'integers', version: almworksVersion
4242

43+
implementation 'software.amazon.awssdk:s3:2.10.3'
44+
4345
implementation group: 'org.apache.avro', name: 'avro-mapred', version: avroVersion
4446
implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: hadoopVersion
4547

src/main/java/org/radarbase/hdfs/Plugin.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ package org.radarbase.hdfs
1919
import java.io.IOException
2020

2121
interface Plugin {
22+
23+
/**
24+
* Initialize plugin. Throws IllegalArgumentException if required properties are not provided
25+
* or if they are of the wrong format.
26+
*/
2227
@Throws(IOException::class)
2328
fun init(properties: Map<String, String>) {
2429
// do nothing

src/main/java/org/radarbase/hdfs/accounting/OffsetRangeFile.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class OffsetRangeFile(private val storage: StorageDriver, private val path: Path
6969

7070
fun read(storage: StorageDriver, path: Path): OffsetRangeFile {
7171
try {
72-
if (storage.exists(path)) {
72+
if (storage.status(path) != null) {
7373
val set = OffsetRangeSet()
7474
storage.newBufferedReader(path).use { br ->
7575
// ignore header

src/main/java/org/radarbase/hdfs/data/FileCache.kt

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,15 @@
1616

1717
package org.radarbase.hdfs.data
1818

19-
import java.io.BufferedOutputStream
20-
import java.io.ByteArrayInputStream
21-
import java.io.Closeable
22-
import java.io.Flushable
23-
import java.io.IOException
24-
import java.io.InputStream
25-
import java.io.InputStreamReader
26-
import java.io.OutputStream
27-
import java.io.OutputStreamWriter
28-
import java.io.Writer
29-
import java.nio.file.Files
30-
import java.nio.file.Path
31-
import java.util.concurrent.atomic.AtomicBoolean
3219
import org.apache.avro.generic.GenericRecord
3320
import org.radarbase.hdfs.FileStoreFactory
3421
import org.radarbase.hdfs.accounting.Accountant
35-
import org.radarbase.hdfs.util.Timer
3622
import org.radarbase.hdfs.util.Timer.time
3723
import org.slf4j.LoggerFactory
24+
import java.io.*
25+
import java.nio.file.Files
26+
import java.nio.file.Path
27+
import java.util.concurrent.atomic.AtomicBoolean
3828

3929
/** Keeps path handles of a path. */
4030
class FileCache(
@@ -60,7 +50,7 @@ class FileCache(
6050
private val hasError: AtomicBoolean = AtomicBoolean(false)
6151

6252
init {
63-
val fileIsNew = !storageDriver.exists(path) || storageDriver.size(path) == 0L
53+
val fileIsNew = storageDriver.status(path)?.takeIf { it.size > 0L } == null
6454
this.tmpPath = Files.createTempFile(tmpDir, fileName, ".tmp" + compression.extension)
6555

6656
var outStream = compression.compress(fileName,
@@ -165,7 +155,7 @@ class FileCache(
165155
var i = 0
166156
while (corruptPath == null && i < 100) {
167157
val path = source.resolveSibling(source.fileName.toString() + ".corrupted" + suffix)
168-
if (!storageDriver.exists(path)) {
158+
if (storageDriver.status(path) == null) {
169159
corruptPath = path
170160
}
171161
suffix = "-$i"

src/main/java/org/radarbase/hdfs/data/FileCacheStore.kt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,13 @@ constructor(private val factory: FileStoreFactory, private val accountant: Accou
111111
if (schemasAdded.putIfAbsent(schemaPath, schemaPath) == null) {
112112
val storage = factory.storageDriver
113113

114-
if (!storage.exists(schemaPath)) {
115-
storage.newOutputStream(schemaPath, false).use {
116-
out -> OutputStreamWriter(out).use {
117-
writer -> writer.write(schema.toString(true)) } }
114+
if (storage.status(schemaPath) == null) {
115+
val tmpSchemaPath = Files.createTempFile(tmpDir.path, "schema-$topic", ".json")
116+
Files.newOutputStream(tmpSchemaPath).use { out ->
117+
out.write(schema.toString(true).toByteArray())
118+
}
119+
storage.store(tmpSchemaPath, schemaPath)
120+
Files.delete(tmpSchemaPath)
118121
}
119122
}
120123
}

src/main/java/org/radarbase/hdfs/data/LocalStorageDriver.kt

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,11 @@ package org.radarbase.hdfs.data
1818

1919
import java.io.IOException
2020
import java.io.InputStream
21-
import java.io.OutputStream
2221
import java.nio.file.AtomicMoveNotSupportedException
2322
import java.nio.file.Files
2423
import java.nio.file.Path
2524
import java.nio.file.StandardCopyOption.ATOMIC_MOVE
2625
import java.nio.file.StandardCopyOption.REPLACE_EXISTING
27-
import java.nio.file.StandardOpenOption.APPEND
28-
import java.nio.file.StandardOpenOption.CREATE
2926
import java.nio.file.attribute.PosixFilePermissions
3027

3128
class LocalStorageDriver : StorageDriver {
@@ -37,20 +34,18 @@ class LocalStorageDriver : StorageDriver {
3734
gid = properties["local-gid"]?.toIntOrNull() ?: -1
3835
}
3936

40-
override fun exists(path: Path): Boolean = Files.exists(path)
41-
4237
@Throws(IOException::class)
43-
override fun newInputStream(path: Path): InputStream = Files.newInputStream(path)
44-
45-
@Throws(IOException::class)
46-
override fun newOutputStream(path: Path, append: Boolean): OutputStream {
47-
return if (append) {
48-
Files.newOutputStream(path, APPEND, CREATE)
38+
override fun status(path: Path): StorageDriver.PathStatus? {
39+
return if (Files.exists(path)) {
40+
StorageDriver.PathStatus(Files.size(path))
4941
} else {
50-
Files.newOutputStream(path)
42+
null
5143
}
5244
}
5345

46+
@Throws(IOException::class)
47+
override fun newInputStream(path: Path): InputStream = Files.newInputStream(path)
48+
5449
@Throws(IOException::class)
5550
override fun move(oldPath: Path, newPath: Path) {
5651
try {
@@ -72,9 +67,6 @@ class LocalStorageDriver : StorageDriver {
7267
move(localPath, newPath)
7368
}
7469

75-
@Throws(IOException::class)
76-
override fun size(path: Path): Long = Files.size(path)
77-
7870
@Throws(IOException::class)
7971
override fun delete(path: Path) = Files.delete(path)
8072
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2018 The Hyve
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.radarbase.hdfs.data
18+
19+
import org.slf4j.LoggerFactory
20+
import software.amazon.awssdk.services.s3.S3Client
21+
import software.amazon.awssdk.services.s3.model.NoSuchKeyException
22+
import java.io.IOException
23+
import java.io.InputStream
24+
import java.net.URI
25+
import java.nio.file.Path
26+
27+
class S3StorageDriver : StorageDriver {
28+
private lateinit var bucket: String
29+
private lateinit var awsClient: S3Client
30+
31+
override fun init(properties: Map<String, String>) {
32+
awsClient = S3Client.builder().also { s3Builder ->
33+
properties["s3EndpointUrl"]?.let {
34+
val endpoint = try {
35+
URI.create(it)
36+
} catch (ex: IllegalArgumentException) {
37+
logger.warn("Invalid S3 URL", ex)
38+
throw ex
39+
}
40+
41+
s3Builder.endpointOverride(endpoint)
42+
}
43+
}.build()
44+
45+
bucket = requireNotNull(properties["s3Bucket"]) { "No AWS bucket provided" }
46+
}
47+
48+
override fun status(path: Path): StorageDriver.PathStatus? {
49+
return try {
50+
awsClient.headObject { it.bucket(bucket).key(path.toString()) }
51+
.let { StorageDriver.PathStatus(it.contentLength()) }
52+
} catch (ex: NoSuchKeyException) {
53+
null
54+
}
55+
}
56+
57+
@Throws(IOException::class)
58+
override fun newInputStream(path: Path): InputStream = awsClient.getObject {
59+
it.bucket(bucket).key(path.toString())
60+
}
61+
62+
@Throws(IOException::class)
63+
override fun move(oldPath: Path, newPath: Path) {
64+
awsClient.copyObject {
65+
it.bucket(bucket).key(newPath.toString())
66+
.copySource("$bucket/$oldPath")
67+
}
68+
delete(oldPath)
69+
}
70+
71+
@Throws(IOException::class)
72+
override fun store(localPath: Path, newPath: Path) {
73+
awsClient.putObject({ it.bucket(bucket).key(newPath.toString()) }, localPath)
74+
}
75+
76+
@Throws(IOException::class)
77+
override fun delete(path: Path) {
78+
awsClient.deleteObject { it.bucket(bucket).key(path.toString()) }
79+
}
80+
81+
companion object {
82+
private val logger = LoggerFactory.getLogger(S3StorageDriver::class.java)
83+
}
84+
}

src/main/java/org/radarbase/hdfs/data/StorageDriver.kt

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,22 @@ import java.io.BufferedReader
2020
import java.io.IOException
2121
import java.io.InputStream
2222
import java.io.InputStreamReader
23-
import java.io.OutputStream
24-
import java.io.Reader
2523
import java.nio.file.Path
2624
import org.radarbase.hdfs.Plugin
2725

2826
interface StorageDriver : Plugin {
29-
fun exists(path: Path): Boolean
30-
@Throws(IOException::class)
31-
fun newInputStream(path: Path): InputStream
27+
/** Query the path status. Returns null if the file does not exist. */
28+
fun status(path: Path): PathStatus?
3229

3330
@Throws(IOException::class)
34-
fun newOutputStream(path: Path, append: Boolean): OutputStream
31+
fun newInputStream(path: Path): InputStream
3532

3633
@Throws(IOException::class)
3734
fun move(oldPath: Path, newPath: Path)
3835

3936
@Throws(IOException::class)
4037
fun store(localPath: Path, newPath: Path)
4138

42-
@Throws(IOException::class)
43-
fun size(path: Path): Long
44-
4539
@Throws(IOException::class)
4640
fun delete(path: Path)
4741

@@ -50,4 +44,6 @@ interface StorageDriver : Plugin {
5044
val reader = InputStreamReader(newInputStream(path))
5145
return BufferedReader(reader)
5246
}
47+
48+
data class PathStatus(val size: Long)
5349
}

0 commit comments

Comments
 (0)