Skip to content

Commit 318cf96

Browse files
authored
Merge pull request #55 from RADAR-base/s3Integration
S3 integration
2 parents 48f5f99 + 81f34d8 commit 318cf96

File tree

11 files changed

+155
-47
lines changed

11 files changed

+155
-47
lines changed

README.md

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,30 @@ radar-hdfs-restructure --compression gzip --nameservice <hdfs_node> --output-di
5050

5151
### Storage
5252

53-
When using local storage, to set the output user ID and group ID, specify the `-p local-uid=123` and `-p local-gid=12` properties.
53+
There are two storage drivers implemented: `org.radarbase.hdfs.storage.LocalStorageDriver` for an output directory on the local file system or `org.radarbase.hdfs.storage.S3StorageDriver` for storage on an object store.
54+
55+
`LocalStorageDriver` takes the following properties:
56+
```yaml
57+
storage:
58+
factory: org.radarbase.hdfs.storage.LocalStorageDriver
59+
properties:
60+
# User ID to write data as
61+
localUid: 123
62+
# Group ID to write data as
63+
localGid: 123
64+
```
65+
66+
With the `S3StorageDriver`, use the following configuration instead:
67+
```yaml
68+
storage:
69+
factory: org.radarbase.hdfs.storage.S3StorageDriver
70+
properties:
71+
# Object store URL
72+
s3EndpointUrl: s3://my-region.s3.aws.amazon.com
73+
# Bucket to use
74+
s3Bucket: myBucketName
75+
```
76+
Ensure that the environment variables contain the authorized AWS keys that allow the service to list, download and upload files to the respective bucket.
5477

5578
### Service
5679

build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ dependencies {
4343
implementation group: 'com.beust', name: 'jcommander', version: jCommanderVersion
4444
implementation group: 'com.almworks.integers', name: 'integers', version: almworksVersion
4545

46+
implementation 'software.amazon.awssdk:s3:2.10.3'
4647
implementation 'com.opencsv:opencsv:5.0'
4748

4849
implementation group: 'org.apache.avro', name: 'avro-mapred', version: avroVersion
@@ -106,8 +107,8 @@ test {
106107
useJUnitPlatform()
107108
testLogging {
108109
events "passed", "skipped", "failed"
109-
setExceptionFormat("full")
110110
showStandardStreams = true
111+
setExceptionFormat("full")
111112
}
112113
}
113114

src/main/java/org/radarbase/hdfs/Plugin.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ package org.radarbase.hdfs
1919
import java.io.IOException
2020

2121
interface Plugin {
22+
23+
/**
24+
* Initialize plugin. Throws IllegalArgumentException if required properties are not provided
25+
* or if they are of the wrong format.
26+
*/
2227
@Throws(IOException::class)
2328
fun init(properties: Map<String, String>) {
2429
// do nothing

src/main/java/org/radarbase/hdfs/accounting/OffsetRangeFile.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class OffsetRangeFile(private val storage: StorageDriver, private val path: Path
6969

7070
fun read(storage: StorageDriver, path: Path): OffsetRangeFile {
7171
try {
72-
if (storage.exists(path)) {
72+
if (storage.status(path) != null) {
7373
val set = OffsetRangeSet()
7474
storage.newBufferedReader(path).use { br ->
7575
// ignore header

src/main/java/org/radarbase/hdfs/storage/LocalStorageDriver.kt

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,39 +18,34 @@ package org.radarbase.hdfs.storage
1818

1919
import java.io.IOException
2020
import java.io.InputStream
21-
import java.io.OutputStream
2221
import java.nio.file.AtomicMoveNotSupportedException
2322
import java.nio.file.Files
2423
import java.nio.file.Path
2524
import java.nio.file.StandardCopyOption.ATOMIC_MOVE
2625
import java.nio.file.StandardCopyOption.REPLACE_EXISTING
27-
import java.nio.file.StandardOpenOption.APPEND
28-
import java.nio.file.StandardOpenOption.CREATE
2926
import java.nio.file.attribute.PosixFilePermissions
3027

3128
class LocalStorageDriver : StorageDriver {
3229
private var uid = -1
3330
private var gid = -1
3431

3532
override fun init(properties: Map<String, String>) {
36-
uid = properties["local-uid"]?.toIntOrNull() ?: -1
37-
gid = properties["local-gid"]?.toIntOrNull() ?: -1
33+
uid = properties["localUid"]?.toIntOrNull() ?: -1
34+
gid = properties["localGid"]?.toIntOrNull() ?: -1
3835
}
3936

40-
override fun exists(path: Path): Boolean = Files.exists(path)
41-
4237
@Throws(IOException::class)
43-
override fun newInputStream(path: Path): InputStream = Files.newInputStream(path)
44-
45-
@Throws(IOException::class)
46-
override fun newOutputStream(path: Path, append: Boolean): OutputStream {
47-
return if (append) {
48-
Files.newOutputStream(path, APPEND, CREATE)
38+
override fun status(path: Path): StorageDriver.PathStatus? {
39+
return if (Files.exists(path)) {
40+
StorageDriver.PathStatus(Files.size(path))
4941
} else {
50-
Files.newOutputStream(path)
42+
null
5143
}
5244
}
5345

46+
@Throws(IOException::class)
47+
override fun newInputStream(path: Path): InputStream = Files.newInputStream(path)
48+
5449
@Throws(IOException::class)
5550
override fun move(oldPath: Path, newPath: Path) {
5651
try {
@@ -72,9 +67,6 @@ class LocalStorageDriver : StorageDriver {
7267
move(localPath, newPath)
7368
}
7469

75-
@Throws(IOException::class)
76-
override fun size(path: Path): Long = Files.size(path)
77-
7870
@Throws(IOException::class)
7971
override fun delete(path: Path) = Files.delete(path)
8072
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2018 The Hyve
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.radarbase.hdfs.storage
18+
19+
import org.slf4j.LoggerFactory
20+
import software.amazon.awssdk.services.s3.S3Client
21+
import software.amazon.awssdk.services.s3.model.NoSuchKeyException
22+
import java.io.IOException
23+
import java.io.InputStream
24+
import java.net.URI
25+
import java.nio.file.Files
26+
import java.nio.file.Path
27+
28+
class S3StorageDriver : StorageDriver {
29+
private lateinit var bucket: String
30+
private lateinit var awsClient: S3Client
31+
32+
override fun init(properties: Map<String, String>) {
33+
awsClient = S3Client.builder().also { s3Builder ->
34+
properties["s3EndpointUrl"]?.let {
35+
val endpoint = try {
36+
URI.create(it)
37+
} catch (ex: IllegalArgumentException) {
38+
logger.warn("Invalid S3 URL", ex)
39+
throw ex
40+
}
41+
42+
s3Builder.endpointOverride(endpoint)
43+
}
44+
}.build()
45+
46+
bucket = requireNotNull(properties["s3Bucket"]) { "No AWS bucket provided" }
47+
}
48+
49+
override fun status(path: Path): StorageDriver.PathStatus? {
50+
return try {
51+
awsClient.headObject { it.bucket(bucket).key(path.toString()) }
52+
.let { StorageDriver.PathStatus(it.contentLength()) }
53+
} catch (ex: NoSuchKeyException) {
54+
null
55+
}
56+
}
57+
58+
@Throws(IOException::class)
59+
override fun newInputStream(path: Path): InputStream = awsClient.getObject {
60+
it.bucket(bucket).key(path.toString())
61+
}
62+
63+
@Throws(IOException::class)
64+
override fun move(oldPath: Path, newPath: Path) {
65+
awsClient.copyObject {
66+
it.bucket(bucket).key(newPath.toString())
67+
.copySource("$bucket/$oldPath")
68+
}
69+
delete(oldPath)
70+
}
71+
72+
@Throws(IOException::class)
73+
override fun store(localPath: Path, newPath: Path) {
74+
awsClient.putObject({ it.bucket(bucket).key(newPath.toString()) }, localPath)
75+
Files.delete(localPath)
76+
}
77+
78+
@Throws(IOException::class)
79+
override fun delete(path: Path) {
80+
awsClient.deleteObject { it.bucket(bucket).key(path.toString()) }
81+
}
82+
83+
companion object {
84+
private val logger = LoggerFactory.getLogger(S3StorageDriver::class.java)
85+
}
86+
}

src/main/java/org/radarbase/hdfs/storage/StorageDriver.kt

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,22 @@ import java.io.BufferedReader
2020
import java.io.IOException
2121
import java.io.InputStream
2222
import java.io.InputStreamReader
23-
import java.io.OutputStream
24-
import java.io.Reader
2523
import java.nio.file.Path
2624
import org.radarbase.hdfs.Plugin
2725

2826
interface StorageDriver : Plugin {
29-
fun exists(path: Path): Boolean
30-
@Throws(IOException::class)
31-
fun newInputStream(path: Path): InputStream
27+
/** Query the path status. Returns null if the file does not exist. */
28+
fun status(path: Path): PathStatus?
3229

3330
@Throws(IOException::class)
34-
fun newOutputStream(path: Path, append: Boolean): OutputStream
31+
fun newInputStream(path: Path): InputStream
3532

3633
@Throws(IOException::class)
3734
fun move(oldPath: Path, newPath: Path)
3835

3936
@Throws(IOException::class)
4037
fun store(localPath: Path, newPath: Path)
4138

42-
@Throws(IOException::class)
43-
fun size(path: Path): Long
44-
4539
@Throws(IOException::class)
4640
fun delete(path: Path)
4741

@@ -50,4 +44,6 @@ interface StorageDriver : Plugin {
5044
val reader = InputStreamReader(newInputStream(path))
5145
return BufferedReader(reader)
5246
}
47+
48+
data class PathStatus(val size: Long)
5349
}

src/main/java/org/radarbase/hdfs/util/PostponedWriter.kt

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,17 +58,18 @@ abstract class PostponedWriter
5858
* not yet taken place, the write will occur earlier.
5959
*/
6060
fun triggerWrite() {
61-
var localWriteFuture: Future<*>? = writeFuture.get()
62-
if (localWriteFuture == null) {
63-
localWriteFuture = executor.schedule({ this.startWrite() }, timeout, timeoutUnit)
64-
if (!writeFuture.compareAndSet(null, localWriteFuture)) {
65-
localWriteFuture!!.cancel(false)
66-
}
61+
if (writeFuture.get() == null) {
62+
executor.schedule(::startWrite, timeout, timeoutUnit)
63+
.also { newWriteFuture ->
64+
if (!writeFuture.compareAndSet(null, newWriteFuture)) {
65+
newWriteFuture.cancel(false)
66+
}
67+
}
6768
}
6869
}
6970

7071
/** Start the write in the writer thread. */
71-
protected fun startWrite() {
72+
private fun startWrite() {
7273
writeFuture.set(null)
7374
doWrite()
7475
}
@@ -89,8 +90,9 @@ abstract class PostponedWriter
8990

9091
@Throws(IOException::class)
9192
private fun doFlush(shutdown: Boolean) {
92-
val localFuture = executor.submit { this.startWrite() }
93-
writeFuture.set(localFuture)
93+
val localFuture = executor.submit(::startWrite)
94+
.also { writeFuture.set(it) }
95+
9496
if (shutdown) {
9597
executor.shutdown()
9698
}

src/main/java/org/radarbase/hdfs/worker/FileCache.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ class FileCache(
6363
val defaultDeduplicate = factory.config.format.deduplication
6464
deduplicate = topicConfig?.deduplication(defaultDeduplicate) ?: defaultDeduplicate
6565

66-
val fileIsNew = !storageDriver.exists(path) || storageDriver.size(path) == 0L
66+
val fileIsNew = storageDriver.status(path)?.takeIf { it.size > 0L } == null
67+
6768
this.tmpPath = Files.createTempFile(tmpDir, fileName, ".tmp" + compression.extension)
6869

6970
var outStream = compression.compress(fileName,
@@ -127,7 +128,7 @@ class FileCache(
127128
writer.close()
128129

129130
if (!hasError.get()) {
130-
if (deduplicate.enable!!) {
131+
if (deduplicate.enable == true) {
131132
time("close.deduplicate") {
132133
val dedupTmp = tmpPath.resolveSibling("${tmpPath.fileName}.dedup")
133134
converterFactory.deduplicate(fileName, tmpPath, dedupTmp, compression, deduplicate.distinctFields ?: emptySet(), deduplicate.ignoreFields ?: emptySet())
@@ -174,7 +175,7 @@ class FileCache(
174175
var i = 0
175176
while (corruptPath == null && i < 100) {
176177
val path = source.resolveSibling(source.fileName.toString() + ".corrupted" + suffix)
177-
if (!storageDriver.exists(path)) {
178+
if (storageDriver.status(path) == null) {
178179
corruptPath = path
179180
}
180181
suffix = "-$i"

src/main/java/org/radarbase/hdfs/worker/FileCacheStore.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,12 @@ constructor(private val factory: FileStoreFactory, private val accountant: Accou
110110
if (schemasAdded.putIfAbsent(schemaPath, schemaPath) == null) {
111111
val storage = factory.storageDriver
112112

113-
if (!storage.exists(schemaPath)) {
114-
storage.newOutputStream(schemaPath, false).use {
115-
out -> OutputStreamWriter(out).use {
116-
writer -> writer.write(schema.toString(true)) } }
113+
if (storage.status(schemaPath) == null) {
114+
val tmpSchemaPath = Files.createTempFile(tmpDir.path, "schema-$topic", ".json")
115+
Files.newOutputStream(tmpSchemaPath).use { out ->
116+
out.write(schema.toString(true).toByteArray())
117+
}
118+
storage.store(tmpSchemaPath, schemaPath)
117119
}
118120
}
119121
}

0 commit comments

Comments
 (0)