Skip to content

Commit bcb43cb

Browse files
committed
Clean code and fix exception propagation
1 parent 05c1c6b commit bcb43cb

File tree

14 files changed

+125
-147
lines changed

14 files changed

+125
-147
lines changed

src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,15 @@ class RestructureS3IntegrationTest {
7878
STAGING_PROJECT,1543bc93-3c17-4381-89a5-c5d6272b827c,99caf236-bbe6-4eed-9c63-fba77349821d,1.58021982003E9,CONNECTED,
7979
8080
""".trimIndent()
81-
assertEquals(csvContents, targetClient.getObject(GetObjectArgs.Builder()
81+
82+
val targetContent = targetClient.getObject(GetObjectArgs.Builder()
8283
.bucketBuild(targetConfig.bucket) {
8384
`object`("$firstParticipantOutput/20200128_1300.csv")
84-
})
85-
.readBytes()
86-
.toString(UTF_8))
85+
}).use { response ->
86+
response.readBytes()
87+
}
88+
89+
assertEquals(csvContents, targetContent.toString(UTF_8))
8790
}
8891

8992
withContext(Dispatchers.IO) {

src/main/java/org/radarbase/output/accounting/RedisHolder.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ import redis.clients.jedis.exceptions.JedisException
88
import java.io.Closeable
99
import java.io.IOException
1010

11-
class RedisHolder(private val jedisPool: JedisPool): Closeable {
11+
class RedisHolder(
12+
private val jedisPool: JedisPool,
13+
): Closeable {
1214
@Throws(IOException::class)
13-
suspend fun <T> execute(routine: suspend (Jedis) -> T): T {
14-
return try {
15-
withContext(Dispatchers.IO) {
16-
jedisPool.resource.use {
17-
routine(it)
18-
}
15+
suspend fun <T> execute(routine: suspend (Jedis) -> T): T = withContext(Dispatchers.IO) {
16+
try {
17+
jedisPool.resource.use {
18+
routine(it)
1919
}
2020
} catch (ex: JedisException) {
2121
throw IOException(ex)

src/main/java/org/radarbase/output/accounting/RedisRemoteLockManager.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class RedisRemoteLockManager(
2525
}
2626

2727
private inner class RemoteLock(
28-
private val lockKey: String
28+
private val lockKey: String
2929
) : RemoteLockManager.RemoteLock {
3030
override suspend fun closeAndJoin() {
3131
redisHolder.execute { redis ->

src/main/java/org/radarbase/output/cleaner/SourceDataCleaner.kt

Lines changed: 34 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package org.radarbase.output.cleaner
22

33
import kotlinx.coroutines.*
44
import kotlinx.coroutines.sync.Mutex
5-
import kotlinx.coroutines.sync.withLock
65
import kotlinx.coroutines.sync.withPermit
76
import org.radarbase.output.Application.Companion.format
87
import org.radarbase.output.FileStoreFactory
@@ -20,13 +19,12 @@ import java.nio.file.Path
2019
import java.nio.file.Paths
2120
import java.time.Instant
2221
import java.time.temporal.ChronoUnit
23-
import java.util.concurrent.atomic.AtomicBoolean
2422
import java.util.concurrent.atomic.LongAdder
23+
import kotlin.coroutines.coroutineContext
2524

2625
class SourceDataCleaner(
27-
private val fileStoreFactory: FileStoreFactory
26+
private val fileStoreFactory: FileStoreFactory
2827
) : Closeable {
29-
private val isClosed = AtomicBoolean(false)
3028
private val lockManager = fileStoreFactory.remoteLockManager
3129
private val sourceStorage = fileStoreFactory.sourceStorage
3230
private val excludeTopics: Set<String> = fileStoreFactory.config.topics
@@ -38,7 +36,7 @@ class SourceDataCleaner(
3836
.minus(fileStoreFactory.config.cleaner.age.toLong(), ChronoUnit.DAYS)
3937

4038
val deletedFileCount = LongAdder()
41-
private val scope = CoroutineScope(Dispatchers.Default)
39+
private val supervisor = SupervisorJob()
4240

4341
@Throws(IOException::class, InterruptedException::class)
4442
suspend fun process(directoryName: String) {
@@ -49,28 +47,26 @@ class SourceDataCleaner(
4947

5048
logger.info("{} topics found", paths.size)
5149

52-
paths.map { p ->
53-
scope.launch {
54-
try {
55-
val deleteCount = fileStoreFactory.workerSemaphore.withPermit {
56-
mapTopic(p)
57-
}
58-
if (deleteCount > 0) {
59-
logger.info("Removed {} files in topic {}", deleteCount, p.fileName)
60-
deletedFileCount.add(deleteCount)
50+
withContext(coroutineContext + supervisor) {
51+
paths.forEach { p ->
52+
launch {
53+
try {
54+
val deleteCount = fileStoreFactory.workerSemaphore.withPermit {
55+
mapTopic(p)
56+
}
57+
if (deleteCount > 0) {
58+
logger.info("Removed {} files in topic {}", deleteCount, p.fileName)
59+
deletedFileCount.add(deleteCount)
60+
}
61+
} catch (ex: Exception) {
62+
logger.warn("Failed to map topic {}", p, ex)
6163
}
62-
} catch (ex: Exception) {
63-
logger.warn("Failed to map topic", ex)
6464
}
6565
}
66-
}.joinAll()
66+
}
6767
}
6868

6969
private suspend fun mapTopic(topicPath: Path): Long {
70-
if (isClosed.get()) {
71-
return 0L
72-
}
73-
7470
val topic = topicPath.fileName.toString()
7571
return try {
7672
lockManager.tryWithLock(topic) {
@@ -104,41 +100,30 @@ class SourceDataCleaner(
104100
offsets.contains(f.range.mapRange { r -> r.incrementTo() })
105101
}
106102

107-
val accountantMutex = Mutex()
108-
109-
return coroutineScope {
110-
paths
111-
.map { file ->
112-
async {
113-
if (extractionCheck.isExtracted(file)) {
114-
logger.info("Removing {}", file.path)
115-
Timer.time("cleaner.delete") {
116-
sourceStorage.delete(file.path)
117-
}
118-
true
119-
} else {
120-
// extract the file again at a later time
121-
logger.warn("Source file was not completely extracted: {}", file.path)
122-
val fullRange = file.range.mapRange { it.ensureToOffset() }
123-
accountantMutex.withLock {
124-
accountant.remove(fullRange)
125-
}
126-
false
127-
}
103+
return paths
104+
.count { file ->
105+
if (extractionCheck.isExtracted(file)) {
106+
logger.info("Removing {}", file.path)
107+
Timer.time("cleaner.delete") {
108+
sourceStorage.delete(file.path)
128109
}
110+
true
111+
} else {
112+
// extract the file again at a later time
113+
logger.warn("Source file was not completely extracted: {}", file.path)
114+
val fullRange = file.range.mapRange { it.ensureToOffset() }
115+
accountant.remove(fullRange)
116+
false
129117
}
130-
.awaitAll()
131-
.count { it }
132-
}
118+
}
133119
}
134120

135121
private suspend fun topicPaths(path: Path): List<Path> = sourceStorage.listTopics(path, excludeTopics)
136-
.toMutableList()
137-
// different services start on different topics to decrease lock contention
138-
.also { it.shuffle() }
122+
// different services start on different topics to decrease lock contention
123+
.shuffled()
139124

140125
override fun close() {
141-
scope.cancel()
126+
supervisor.cancel()
142127
}
143128

144129
companion object {

src/main/java/org/radarbase/output/cleaner/TimestampFileCacheStore.kt

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ package org.radarbase.output.cleaner
1919
import org.apache.avro.generic.GenericRecord
2020
import org.radarbase.output.FileStoreFactory
2121
import org.radarbase.output.util.Timer.time
22-
import org.slf4j.LoggerFactory
2322
import java.io.FileNotFoundException
2423
import java.io.IOException
2524
import java.nio.file.Path
25+
import java.util.concurrent.ConcurrentHashMap
2626

2727
/**
2828
* Caches open file handles. If more than the limit is cached, the half of the files that were used
@@ -36,7 +36,7 @@ class TimestampFileCacheStore(private val factory: FileStoreFactory) {
3636
init {
3737
val config = factory.config
3838
this.maxCacheSize = config.worker.cacheSize
39-
this.caches = HashMap(maxCacheSize * 4 / 3 + 1)
39+
this.caches = ConcurrentHashMap(maxCacheSize * 4 / 3 + 1)
4040
this.schemasAdded = HashMap()
4141
}
4242

@@ -97,8 +97,4 @@ class TimestampFileCacheStore(private val factory: FileStoreFactory) {
9797
NOT_FOUND,
9898
FOUND
9999
}
100-
101-
companion object {
102-
private val logger = LoggerFactory.getLogger(TimestampFileCacheStore::class.java)
103-
}
104100
}

src/main/java/org/radarbase/output/source/AzureSourceStorage.kt

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.radarbase.output.source
22

33
import com.azure.storage.blob.BlobServiceClient
4+
import com.azure.storage.blob.models.BlobItem
45
import kotlinx.coroutines.Dispatchers
56
import kotlinx.coroutines.withContext
67
import org.apache.avro.file.SeekableFileInput
@@ -23,9 +24,19 @@ class AzureSourceStorage(
2324

2425
private fun blobClient(path: Path) = blobContainerClient.getBlobClient(path.toKey())
2526

26-
override suspend fun list(path: Path): List<SimpleFileStatus> =
27-
withContext(Dispatchers.IO) { blobContainerClient.listBlobsByHierarchy("$path/") }
28-
.map { SimpleFileStatus(Paths.get(it.name), it.isPrefix ?: false, it.properties?.lastModified?.toInstant()) }
27+
override suspend fun list(path: Path, maxKeys: Int?): List<SimpleFileStatus> =
28+
withContext(Dispatchers.IO) {
29+
var iterable: Iterable<BlobItem> = blobContainerClient.listBlobsByHierarchy("$path/")
30+
if (maxKeys != null) {
31+
iterable = iterable.take(maxKeys)
32+
}
33+
iterable.map {
34+
SimpleFileStatus(Paths.get(it.name),
35+
it.isPrefix ?: false,
36+
it.properties?.lastModified?.toInstant())
37+
}
38+
}
39+
2940

3041
override suspend fun createTopicFile(topic: String, status: SimpleFileStatus): TopicFile {
3142
var topicFile = super.createTopicFile(topic, status)

src/main/java/org/radarbase/output/source/S3SourceStorage.kt

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,23 @@ class S3SourceStorage(
3131
private val bucket = config.bucket
3232
private val readEndOffset = config.endOffsetFromTags
3333

34-
override suspend fun list(path: Path): List<SimpleFileStatus> {
34+
override suspend fun list(
35+
path: Path,
36+
maxKeys: Int?,
37+
): List<SimpleFileStatus> {
3538
val listRequest = ListObjectsArgs.Builder().bucketBuild(bucket) {
39+
if (maxKeys != null) {
40+
maxKeys(maxKeys.coerceAtMost(1000))
41+
}
3642
prefix("$path/")
3743
recursive(false)
3844
useUrlEncodingType(false)
3945
}
40-
return faultTolerant { s3Client.listObjects(listRequest) }
46+
var iterable = faultTolerant { s3Client.listObjects(listRequest) }
47+
if (maxKeys != null) {
48+
iterable = iterable.take(maxKeys)
49+
}
50+
return iterable
4151
.map {
4252
val item = it.get()
4353
SimpleFileStatus(
@@ -94,9 +104,12 @@ class S3SourceStorage(
94104
try {
95105
faultTolerant {
96106
tempFile.outputStream(StandardOpenOption.TRUNCATE_EXISTING).use { out ->
97-
s3Client.getObject(GetObjectArgs.Builder()
98-
.objectBuild(bucket, file.path))
99-
.copyTo(out)
107+
s3Client.getObject(
108+
GetObjectArgs.Builder()
109+
.objectBuild(bucket, file.path)
110+
).use { input ->
111+
input.copyTo(out)
112+
}
100113
}
101114
}
102115
} catch (ex: Exception) {

src/main/java/org/radarbase/output/source/SourceStorage.kt

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,18 @@ interface SourceStorage {
1313
/** Create a reader for the storage medium. It should be closed by the caller. */
1414
fun createReader(): SourceStorageReader
1515
/** List all files in the given directory. */
16-
suspend fun list(path: Path): List<SimpleFileStatus>
16+
suspend fun list(
17+
path: Path,
18+
maxKeys: Int? = null,
19+
): List<SimpleFileStatus>
1720

1821
/** Delete given file. Will not delete any directories. */
1922
suspend fun delete(path: Path)
20-
suspend fun createTopicFile(topic: String, status: SimpleFileStatus): TopicFile {
21-
val lastModified = status.lastModified ?: Instant.now()
22-
val range = TopicPartitionOffsetRange.parseFilename(status.path.fileName.toString(), lastModified)
23-
return TopicFile(topic, status.path, lastModified, range)
24-
}
23+
suspend fun createTopicFile(topic: String, status: SimpleFileStatus): TopicFile = TopicFile(
24+
topic = topic,
25+
path = status.path,
26+
lastModified = status.lastModified ?: Instant.now(),
27+
)
2528

2629
/**
2730
* Recursively returns all record files in a sequence of a given topic with path.

src/main/java/org/radarbase/output/util/AvroFileLister.kt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
package org.radarbase.output.util
22

3-
import kotlinx.coroutines.CoroutineScope
4-
import kotlinx.coroutines.launch
53
import org.radarbase.output.source.SourceStorage
64
import org.radarbase.output.source.TopicFile
75

8-
class AvroFileLister(private val storage: SourceStorage) :
9-
TreeLister.LevelLister<TopicFile, TopicPath> {
10-
override fun CoroutineScope.listLevel(
6+
class AvroFileLister(
7+
private val storage: SourceStorage
8+
) : TreeLister.LevelLister<TopicFile, TopicPath> {
9+
10+
override suspend fun listLevel(
1111
context: TopicPath,
12-
descend: CoroutineScope.(TopicPath) -> Unit,
13-
emit: suspend (TopicFile) -> Unit,
14-
) = launch {
12+
descend: suspend (TopicPath) -> Unit,
13+
emit: suspend (TopicFile) -> Unit
14+
) {
1515
storage.list(context.path).forEach { status ->
1616
val filename = status.path.fileName.toString()
1717
when {

src/main/java/org/radarbase/output/util/AvroTopicLister.kt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
package org.radarbase.output.util
22

3-
import kotlinx.coroutines.CoroutineScope
4-
import kotlinx.coroutines.launch
53
import org.radarbase.output.source.SourceStorage
64
import java.nio.file.Path
75

8-
class AvroTopicLister(private val storage: SourceStorage) : TreeLister.LevelLister<Path, Path> {
9-
override fun CoroutineScope.listLevel(
6+
class AvroTopicLister(
7+
private val storage: SourceStorage,
8+
) : TreeLister.LevelLister<Path, Path> {
9+
override suspend fun listLevel(
1010
context: Path,
11-
descend: CoroutineScope.(Path) -> Unit,
12-
emit: suspend (Path) -> Unit,
13-
) = launch {
14-
val fileStatuses = storage.list(context)
11+
descend: suspend (Path) -> Unit,
12+
emit: suspend (Path) -> Unit
13+
) {
14+
val fileStatuses = storage.list(context, maxKeys = 256)
1515

1616
val avroFile = fileStatuses.find { file ->
1717
!file.isDirectory

0 commit comments

Comments
 (0)