Skip to content

Commit c23461d

Browse files
committed
Style fixes
1 parent 3066211 commit c23461d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+592
-343
lines changed

src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ class RestructureS3IntegrationTest {
2222
endpoint = "http://localhost:9000",
2323
accessToken = "minioadmin",
2424
secretKey = "minioadmin",
25-
bucket = "source")
25+
bucket = "source",
26+
)
2627
val targetConfig = S3Config(
2728
endpoint = "http://localhost:9000",
2829
accessToken = "minioadmin",
2930
secretKey = "minioadmin",
30-
bucket = "target")
31+
bucket = "target",
32+
)
3133
val config = RestructureConfig(
3234
source = ResourceConfig("s3", s3 = sourceConfig),
3335
target = ResourceConfig("s3", s3 = targetConfig),
@@ -36,10 +38,8 @@ class RestructureS3IntegrationTest {
3638
)
3739
val application = Application(config)
3840
val sourceClient = sourceConfig.createS3Client()
39-
if (!sourceClient.bucketExists(BucketExistsArgs.Builder()
40-
.bucketBuild(sourceConfig.bucket))
41-
) {
42-
sourceClient.makeBucket(MakeBucketArgs.Builder().bucketBuild(sourceConfig.bucket))
41+
if (!sourceClient.bucketExists(BucketExistsArgs.builder().bucketBuild(sourceConfig.bucket))) {
42+
sourceClient.makeBucket(MakeBucketArgs.builder().bucketBuild(sourceConfig.bucket))
4343
}
4444

4545
val resourceFiles = listOf(
@@ -52,10 +52,12 @@ class RestructureS3IntegrationTest {
5252
launch(Dispatchers.IO) {
5353
this@RestructureS3IntegrationTest.javaClass.getResourceAsStream("/$resourceFile")
5454
.useSuspended { statusFile ->
55-
sourceClient.putObject(PutObjectArgs.Builder()
56-
.objectBuild(sourceConfig.bucket, targetFiles[i]) {
57-
stream(statusFile, -1, MAX_PART_SIZE)
58-
})
55+
sourceClient.putObject(
56+
PutObjectArgs.Builder()
57+
.objectBuild(sourceConfig.bucket, targetFiles[i]) {
58+
stream(statusFile, -1, MAX_PART_SIZE)
59+
}
60+
)
5961
}
6062
}
6163
}.joinAll()
@@ -83,23 +85,25 @@ class RestructureS3IntegrationTest {
8385
8486
""".trimIndent()
8587

86-
val targetContent = targetClient.getObject(GetObjectArgs.Builder()
87-
.bucketBuild(targetConfig.bucket) {
88+
val targetContent = targetClient.getObject(
89+
GetObjectArgs.Builder().bucketBuild(targetConfig.bucket) {
8890
`object`("$firstParticipantOutput/20200128_1300.csv")
89-
}).use { response ->
91+
}
92+
).use { response ->
9093
response.readBytes()
9194
}
9295

9396
assertEquals(csvContents, targetContent.toString(UTF_8))
9497
}
9598

9699
withContext(Dispatchers.IO) {
97-
targetClient.listObjects(ListObjectsArgs.Builder()
98-
.bucketBuild(targetConfig.bucket) {
100+
targetClient.listObjects(
101+
ListObjectsArgs.Builder().bucketBuild(targetConfig.bucket) {
99102
prefix("output")
100103
recursive(true)
101104
useUrlEncodingType(false)
102-
})
105+
}
106+
)
103107
.map { it.get().objectName() }
104108
.toHashSet()
105109
}
@@ -121,30 +125,34 @@ class RestructureS3IntegrationTest {
121125
launch {
122126
targetFiles.map {
123127
launch(Dispatchers.IO) {
124-
sourceClient.removeObject(RemoveObjectArgs.Builder()
125-
.objectBuild(sourceConfig.bucket, it))
128+
sourceClient.removeObject(
129+
RemoveObjectArgs.Builder().objectBuild(sourceConfig.bucket, it)
130+
)
126131
}
127132
}.joinAll()
128133

129134
launch(Dispatchers.IO) {
130-
sourceClient.removeBucket(RemoveBucketArgs.Builder()
131-
.bucketBuild(sourceConfig.bucket))
135+
sourceClient.removeBucket(
136+
RemoveBucketArgs.Builder().bucketBuild(sourceConfig.bucket)
137+
)
132138
}
133139
}
134140

135141
// delete target files
136142
launch {
137143
files.map {
138144
launch(Dispatchers.IO) {
139-
targetClient.removeObject(RemoveObjectArgs.Builder()
140-
.bucketBuild(targetConfig.bucket) {
145+
targetClient.removeObject(
146+
RemoveObjectArgs.Builder().bucketBuild(targetConfig.bucket) {
141147
`object`(it)
142-
})
148+
}
149+
)
143150
}
144151
}.joinAll()
145152
launch(Dispatchers.IO) {
146-
targetClient.removeBucket(RemoveBucketArgs.Builder()
147-
.bucketBuild(targetConfig.bucket))
153+
targetClient.removeBucket(
154+
RemoveBucketArgs.Builder().bucketBuild(targetConfig.bucket)
155+
)
148156
}
149157
}
150158
}

src/integrationTest/java/org/radarbase/output/accounting/OffsetRangeRedisTest.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,10 @@ class OffsetRangeRedisTest {
7979
}
8080

8181
redisHolder.execute { redis ->
82-
val range = redisOffsetReader.readValue<OffsetRedisPersistence.Companion.RedisOffsetRangeSet>(redis.get(testFile.toString()))
82+
val range =
83+
redisOffsetReader.readValue<OffsetRedisPersistence.Companion.RedisOffsetRangeSet>(
84+
redis.get(testFile.toString())
85+
)
8386
assertEquals(
8487
OffsetRedisPersistence.Companion.RedisOffsetRangeSet(
8588
listOf(

src/integrationTest/java/org/radarbase/output/accounting/RedisRemoteLockManagerTest.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ internal class RedisRemoteLockManagerTest {
5757
}
5858
}
5959

60-
6160
@Test
6261
fun testNonOverlappingLockSameManager() = runTest {
6362
lockManager1.acquireLock("t").useSuspended { l1 ->

src/main/java/org/radarbase/output/Application.kt

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ class Application(
6969

7070
override val redisHolder: RedisHolder = RedisHolder(JedisPool(config.redis.uri))
7171
override val remoteLockManager: RemoteLockManager = RedisRemoteLockManager(
72-
redisHolder, config.redis.lockPrefix)
72+
redisHolder,
73+
config.redis.lockPrefix,
74+
)
7375

7476
override val offsetPersistenceFactory: OffsetPersistenceFactory =
7577
OffsetRedisPersistence(redisHolder)
@@ -90,10 +92,14 @@ class Application(
9092
override fun newFileCacheStore(accountant: Accountant) = FileCacheStore(this, accountant)
9193

9294
fun start() {
93-
System.setProperty("kotlinx.coroutines.scheduler.max.pool.size",
94-
config.worker.numThreads.toString())
95-
System.setProperty("kotlinx.coroutines.scheduler.core.pool.size",
96-
config.worker.numThreads.toString())
95+
System.setProperty(
96+
"kotlinx.coroutines.scheduler.max.pool.size",
97+
config.worker.numThreads.toString(),
98+
)
99+
System.setProperty(
100+
"kotlinx.coroutines.scheduler.core.pool.size",
101+
config.worker.numThreads.toString(),
102+
)
97103

98104
try {
99105
config.paths.temp.createDirectories()
@@ -118,8 +124,10 @@ class Application(
118124
}
119125

120126
private fun runService() {
121-
logger.info("Running as a Service with poll interval of {} seconds",
122-
config.service.interval)
127+
logger.info(
128+
"Running as a Service with poll interval of {} seconds",
129+
config.service.interval,
130+
)
123131
logger.info("Press Ctrl+C to exit...")
124132

125133
runBlocking {
@@ -163,20 +171,24 @@ class Application(
163171
fun main(args: Array<String>) {
164172
val commandLineArgs = parseArgs(args)
165173

166-
logger.info("Starting at {}...",
167-
DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now()))
174+
logger.info(
175+
"Starting at {}...",
176+
DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now()),
177+
)
168178

169179
// Enable singleton timer statements in the code.
170180
Timer.isEnabled = commandLineArgs.enableTimer
171181

172182
val application = try {
173-
Application(RestructureConfig
174-
.load(commandLineArgs.configFile)
175-
.withEnv()
176-
.apply {
177-
addArgs(commandLineArgs)
178-
validate()
179-
})
183+
Application(
184+
RestructureConfig
185+
.load(commandLineArgs.configFile)
186+
.withEnv()
187+
.apply {
188+
addArgs(commandLineArgs)
189+
validate()
190+
}
191+
)
180192
} catch (ex: IllegalArgumentException) {
181193
logger.error("Illegal argument", ex)
182194
exitProcess(1)

src/main/java/org/radarbase/output/accounting/OffsetRangeSet.kt

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,10 @@ class OffsetRangeSet {
107107
{ it.key },
108108
{ (_, intervals) ->
109109
intervals.read { factory(OffsetIntervals(it)) }
110-
}),
111-
factory)
110+
},
111+
),
112+
factory,
113+
)
112114

113115
override fun toString(): String = "OffsetRangeSet$ranges"
114116

@@ -156,13 +158,15 @@ class OffsetRangeSet {
156158
fun copyForTopic(topic: String) = OffsetRangeSet(
157159
ranges.entries
158160
.filter { it.key.topic == topic }
159-
.associateByTo(ConcurrentHashMap(),
161+
.associateByTo(
162+
ConcurrentHashMap(),
160163
{ it.key },
161164
{ (_, intervals) ->
162165
intervals.read { factory(OffsetIntervals(it)) }
163-
}),
164-
factory)
165-
166+
},
167+
),
168+
factory,
169+
)
166170

167171
data class Range(val from: Long, val to: Long?, val lastProcessed: Instant = Instant.now()) {
168172
@JsonIgnore

src/main/java/org/radarbase/output/accounting/OffsetRedisPersistence.kt

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,10 @@ class OffsetRedisPersistence(
4848
}
4949
}
5050
} catch (ex: IOException) {
51-
logger.error("Error reading offsets from Redis: {}. Processing all offsets.",
52-
ex.toString())
51+
logger.error(
52+
"Error reading offsets from Redis: {}. Processing all offsets.",
53+
ex.toString(),
54+
)
5355
null
5456
}
5557
}
@@ -70,12 +72,15 @@ class OffsetRedisPersistence(
7072

7173
override suspend fun doWrite(): Unit = time("accounting.offsets") {
7274
try {
73-
val offsets = RedisOffsetRangeSet(offsets.map { topicPartition, offsetIntervals ->
74-
RedisOffsetIntervals(
75-
topicPartition.topic,
76-
topicPartition.partition,
77-
offsetIntervals.toList())
78-
})
75+
val offsets = RedisOffsetRangeSet(
76+
offsets.map { topicPartition, offsetIntervals ->
77+
RedisOffsetIntervals(
78+
topicPartition.topic,
79+
topicPartition.partition,
80+
offsetIntervals.toList(),
81+
)
82+
}
83+
)
7984

8085
redisHolder.execute { redis ->
8186
redis.set(path.toString(), redisOffsetWriter.writeValueAsString(offsets))

src/main/java/org/radarbase/output/accounting/TopicPartition.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ data class TopicPartition(val topic: String, val partition: Int) : Comparable<To
2424

2525
override fun hashCode() = hash
2626

27-
override fun compareTo(other: TopicPartition) = compareValuesBy(this, other,
28-
TopicPartition::topic, TopicPartition::partition)
27+
override fun compareTo(other: TopicPartition) = compareValuesBy(
28+
this,
29+
other,
30+
TopicPartition::topic,
31+
TopicPartition::partition
32+
)
2933
}

src/main/java/org/radarbase/output/accounting/TopicPartitionOffsetRange.kt

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ data class TopicPartitionOffsetRange(
4040
lastModified: Instant = Instant.now(),
4141
) : this(
4242
TopicPartition(topic, partition),
43-
OffsetRangeSet.Range(offsetFrom, offsetTo, lastModified))
43+
OffsetRangeSet.Range(offsetFrom, offsetTo, lastModified),
44+
)
4445

4546
override fun toString(): String {
4647
return if (range.to == null) {
@@ -66,11 +67,12 @@ data class TopicPartitionOffsetRange(
6667
.dropLastWhile { it.isEmpty() || it == "avro" }
6768

6869
return TopicPartitionOffsetRange(
69-
fileNameParts[0],
70-
fileNameParts[1].toInt(),
71-
fileNameParts[2].toLong(),
72-
fileNameParts.getOrNull(3)?.toLong(),
73-
lastModified)
70+
topic = fileNameParts[0],
71+
partition = fileNameParts[1].toInt(),
72+
offsetFrom = fileNameParts[2].toLong(),
73+
offsetTo = fileNameParts.getOrNull(3)?.toLong(),
74+
lastModified = lastModified,
75+
)
7476
}
7577
}
7678
}

src/main/java/org/radarbase/output/cleaner/SourceDataCleaner.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,10 @@ class SourceDataCleaner(
7979
val accountant = createResource { AccountantImpl(fileStoreFactory, topic) }
8080
.apply { initialize(this@coroutineScope) }
8181
val extractionCheck = createResource {
82-
TimestampExtractionCheck(sourceStorage,
83-
fileStoreFactory)
82+
TimestampExtractionCheck(
83+
sourceStorage,
84+
fileStoreFactory,
85+
)
8486
}
8587
deleteOldFiles(accountant, extractionCheck, topic, topicPath).toLong()
8688
}

src/main/java/org/radarbase/output/cleaner/TimestampExtractionCheck.kt

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,30 +62,39 @@ class TimestampExtractionCheck(
6262

6363
do {
6464
val (path) = pathFactory.getRecordOrganization(
65-
topicFile.topic, record, suffix)
65+
topicFile.topic,
66+
record,
67+
suffix
68+
)
6669

6770
try {
6871
when (cacheStore.contains(path, record)) {
6972
TimestampFileCacheStore.FindResult.FILE_NOT_FOUND -> {
70-
logger.warn("Target {} for record of {} (offset {}) has not been created yet.",
73+
logger.warn(
74+
"Target {} for record of {} (offset {}) has not been created yet.",
7175
path,
7276
topicFile.path,
73-
offset)
77+
offset,
78+
)
7479
return false
7580
}
7681
TimestampFileCacheStore.FindResult.NOT_FOUND -> {
77-
logger.warn("Target {} does not contain record of {} (offset {})",
82+
logger.warn(
83+
"Target {} does not contain record of {} (offset {})",
7884
path,
7985
topicFile.path,
80-
offset)
86+
offset,
87+
)
8188
return false
8289
}
8390
TimestampFileCacheStore.FindResult.FOUND -> return true
8491
TimestampFileCacheStore.FindResult.BAD_SCHEMA -> {
85-
logger.debug("Schema of {} does not match schema of {} (offset {})",
92+
logger.debug(
93+
"Schema of {} does not match schema of {} (offset {})",
8694
path,
8795
topicFile.path,
88-
offset)
96+
offset,
97+
)
8998
suffix += 1 // continue next suffix
9099
}
91100
}

0 commit comments

Comments
 (0)