Skip to content

Commit ae965ae

Browse files
committed
Misc fixes
1 parent 29b916e commit ae965ae

File tree

5 files changed

+55
-11
lines changed

5 files changed

+55
-11
lines changed

src/main/java/org/radarbase/hdfs/accounting/Accountant.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ constructor(factory: FileStoreFactory, topic: String) : Flushable, Closeable {
3939
val offsetsDirectory = factory.config.paths.output
4040
.resolve(OFFSETS_FILE_NAME)
4141

42-
Files.createDirectories(offsetsDirectory)
42+
factory.storageDriver.createDirectories(offsetsDirectory)
4343

4444
val offsetPath = offsetsDirectory.resolve("$topic.csv")
4545
this.offsetFile = OffsetRangeFile.read(factory.storageDriver, offsetPath)

src/main/java/org/radarbase/hdfs/storage/LocalStorageDriver.kt

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616

1717
package org.radarbase.hdfs.storage
1818

19+
import org.slf4j.LoggerFactory
1920
import java.io.IOException
2021
import java.io.InputStream
2122
import java.nio.file.AtomicMoveNotSupportedException
2223
import java.nio.file.Files
2324
import java.nio.file.Path
2425
import java.nio.file.StandardCopyOption.ATOMIC_MOVE
2526
import java.nio.file.StandardCopyOption.REPLACE_EXISTING
27+
import java.nio.file.attribute.FileAttribute
28+
import java.nio.file.attribute.PosixFileAttributes
2629
import java.nio.file.attribute.PosixFilePermissions
2730

2831
class LocalStorageDriver : StorageDriver {
@@ -32,6 +35,10 @@ class LocalStorageDriver : StorageDriver {
3235
override fun init(properties: Map<String, String>) {
3336
uid = properties["localUid"]?.toIntOrNull() ?: -1
3437
gid = properties["localGid"]?.toIntOrNull() ?: -1
38+
39+
logger.info("Local storage configured with user id {}:{} (-1 if not configured)",
40+
uid,
41+
gid)
3542
}
3643

3744
@Throws(IOException::class)
@@ -57,16 +64,31 @@ class LocalStorageDriver : StorageDriver {
5764

5865
@Throws(IOException::class)
5966
override fun store(localPath: Path, newPath: Path) {
67+
localPath.updateUser()
68+
Files.setPosixFilePermissions(localPath, PosixFilePermissions.fromString("rw-r--r--"))
69+
move(localPath, newPath)
70+
}
71+
72+
override fun createDirectories(directory: Path) {
73+
Files.createDirectories(directory, PosixFilePermissions.asFileAttribute(
74+
PosixFilePermissions.fromString("rwxr-xr-x")))
75+
76+
directory.updateUser()
77+
}
78+
79+
private fun Path.updateUser() {
6080
if (uid >= 0) {
61-
Files.setAttribute(localPath, "unix:uid", uid)
81+
Files.setAttribute(this, "unix:uid", uid)
6282
}
6383
if (gid >= 0) {
64-
Files.setAttribute(localPath, "unix:gid", gid)
84+
Files.setAttribute(this, "unix:gid", gid)
6585
}
66-
Files.setPosixFilePermissions(localPath, PosixFilePermissions.fromString("rw-r--r--"))
67-
move(localPath, newPath)
6886
}
6987

7088
@Throws(IOException::class)
7189
override fun delete(path: Path) = Files.delete(path)
90+
91+
companion object {
92+
private val logger = LoggerFactory.getLogger(LocalStorageDriver::class.java)
93+
}
7294
}

src/main/java/org/radarbase/hdfs/storage/S3StorageDriver.kt

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.io.InputStream
2424
import java.net.URI
2525
import java.nio.file.Files
2626
import java.nio.file.Path
27+
import java.nio.file.Paths
2728

2829
class S3StorageDriver : StorageDriver {
2930
private lateinit var bucket: String
@@ -44,11 +45,15 @@ class S3StorageDriver : StorageDriver {
4445
}.build()
4546

4647
bucket = requireNotNull(properties["s3Bucket"]) { "No AWS bucket provided" }
48+
49+
logger.info("Object storage configured with endpoint {} in bucket {}",
50+
properties["s3EndpointUrl"],
51+
properties["s3Bucket"])
4752
}
4853

4954
override fun status(path: Path): StorageDriver.PathStatus? {
5055
return try {
51-
awsClient.headObject { it.bucket(bucket).key(path.toString()) }
56+
awsClient.headObject { it.bucket(bucket).key(path.toKey()) }
5257
.let { StorageDriver.PathStatus(it.contentLength()) }
5358
} catch (ex: NoSuchKeyException) {
5459
null
@@ -57,30 +62,44 @@ class S3StorageDriver : StorageDriver {
5762

5863
@Throws(IOException::class)
5964
override fun newInputStream(path: Path): InputStream = awsClient.getObject {
60-
it.bucket(bucket).key(path.toString())
65+
it.bucket(bucket).key(path.toKey())
6166
}
6267

6368
@Throws(IOException::class)
6469
override fun move(oldPath: Path, newPath: Path) {
6570
awsClient.copyObject {
66-
it.bucket(bucket).key(newPath.toString())
71+
it.bucket(bucket).key(newPath.toKey())
6772
.copySource("$bucket/$oldPath")
6873
}
6974
delete(oldPath)
7075
}
7176

7277
@Throws(IOException::class)
7378
override fun store(localPath: Path, newPath: Path) {
74-
awsClient.putObject({ it.bucket(bucket).key(newPath.toString()) }, localPath)
79+
awsClient.putObject({ it.bucket(bucket).key(newPath.toKey()) }, localPath)
7580
Files.delete(localPath)
7681
}
7782

7883
@Throws(IOException::class)
7984
override fun delete(path: Path) {
80-
awsClient.deleteObject { it.bucket(bucket).key(path.toString()) }
85+
awsClient.deleteObject { it.bucket(bucket).key(path.toKey()) }
86+
}
87+
88+
override fun createDirectories(directory: Path) {
89+
// noop
8190
}
8291

8392
companion object {
8493
private val logger = LoggerFactory.getLogger(S3StorageDriver::class.java)
94+
95+
private val rootPath = Paths.get("/")
96+
97+
private fun Path.toKey(): String {
98+
return if (this.startsWith(rootPath)) {
99+
rootPath.relativize(this).toString()
100+
} else {
101+
toString()
102+
}
103+
}
85104
}
86105
}

src/main/java/org/radarbase/hdfs/storage/StorageDriver.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,8 @@ interface StorageDriver : Plugin {
4545
return BufferedReader(reader)
4646
}
4747

48+
@Throws(IOException::class)
49+
fun createDirectories(directory: Path)
50+
4851
data class PathStatus(val size: Long)
4952
}

src/main/java/org/radarbase/hdfs/worker/FileCacheStore.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ constructor(private val factory: FileStoreFactory, private val accountant: Accou
6969
ensureCapacity()
7070

7171
val dir = path.parent
72-
Files.createDirectories(dir)
72+
factory.storageDriver.createDirectories(dir)
7373

7474
try {
7575
time("write.open") { FileCache(factory, transaction.topicPartition.topic, path, record, tmpDir.path, accountant) }

0 commit comments

Comments
 (0)