Skip to content

Commit 98982ce

Browse files
authored
Merge pull request #78 from RADAR-base/release-1.1.1
Release 1.1.1
2 parents adcac94 + 6771c56 commit 98982ce

File tree

14 files changed

+267
-25
lines changed

14 files changed

+267
-25
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ When upgrading to version 0.6.0 from version 0.5.x or earlier, please follow the
6161

6262
This package is available as docker image [`radarbase/radar-output-restructure`](https://hub.docker.com/r/radarbase/radar-output-restructure). The entrypoint of the image is the current application. So in all the commands listed in usage, replace `radar-output-restructure` with for example:
6363
```shell
64-
docker run --rm -t --network hadoop -v "$PWD/output:/output" radarbase/radar-output-restructure:1.1.0-hdfs -n hdfs-namenode -o /output /myTopic
64+
docker run --rm -t --network hadoop -v "$PWD/output:/output" radarbase/radar-output-restructure:1.1.1-hdfs -n hdfs-namenode -o /output /myTopic
6565
```
6666
if your docker cluster is running in the `hadoop` network and your output directory should be `./output`.
6767

@@ -171,7 +171,7 @@ This package requires at least Java JDK 8. Build the distribution with
171171
and install the package into `/usr/local` with for example
172172
```shell
173173
sudo mkdir -p /usr/local
174-
sudo tar -xzf build/distributions/radar-output-restructure-1.1.0.tar.gz -C /usr/local --strip-components=1
174+
sudo tar -xzf build/distributions/radar-output-restructure-1.1.1.tar.gz -C /usr/local --strip-components=1
175175
```
176176

177177
Now the `radar-output-restructure` command should be available.

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ plugins {
99
}
1010

1111
group 'org.radarbase'
12-
version '1.1.0'
12+
version '1.1.1'
1313
mainClassName = 'org.radarbase.output.Application'
1414

1515
sourceCompatibility = '1.8'

restructure.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ worker:
7979
numThreads: 2
8080
# Maximum number of files to process in any given topic.
8181
maxFilesPerTopic: null
82+
# Minimum time in seconds since a source file was last modified before it is
83+
# considered for processing. This avoids synchronization issues when a file has just been
84+
# written.
85+
minimumFileAge: 60
8286

8387
cleaner:
8488
# Enable cleaning up old source files

src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@ import io.minio.PutObjectOptions
44
import io.minio.PutObjectOptions.MAX_PART_SIZE
55
import org.junit.jupiter.api.Assertions.assertEquals
66
import org.junit.jupiter.api.Test
7-
import org.radarbase.output.config.PathConfig
8-
import org.radarbase.output.config.ResourceConfig
9-
import org.radarbase.output.config.RestructureConfig
10-
import org.radarbase.output.config.S3Config
7+
import org.radarbase.output.config.*
118
import org.radarbase.output.util.Timer
129
import java.nio.charset.StandardCharsets.UTF_8
1310
import java.nio.file.Paths
@@ -29,7 +26,8 @@ class RestructureS3IntegrationTest {
2926
val config = RestructureConfig(
3027
source = ResourceConfig("s3", s3 = sourceConfig),
3128
target = ResourceConfig("s3", s3 = targetConfig),
32-
paths = PathConfig(inputs = listOf(Paths.get("in")))
29+
paths = PathConfig(inputs = listOf(Paths.get("in"))),
30+
worker = WorkerConfig(minimumFileAge = 0L)
3331
)
3432
val application = Application(config)
3533
val sourceClient = sourceConfig.createS3Client()

src/main/java/org/radarbase/output/Application.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.radarbase.output.util.ProgressBar.Companion.format
3232
import org.radarbase.output.util.Timer
3333
import org.radarbase.output.worker.FileCacheStore
3434
import org.radarbase.output.cleaner.SourceDataCleaner
35+
import org.radarbase.output.util.TimeUtil.durationSince
3536
import org.radarbase.output.worker.RadarKafkaRestructure
3637
import org.slf4j.LoggerFactory
3738
import redis.clients.jedis.JedisPool
@@ -187,8 +188,6 @@ class Application(
187188
private val logger = LoggerFactory.getLogger(Application::class.java)
188189
const val CACHE_SIZE_DEFAULT = 100
189190

190-
private fun Temporal.durationSince() = Duration.between(this, Instant.now())
191-
192191
private fun parseArgs(args: Array<String>): CommandLineArgs {
193192
val commandLineArgs = CommandLineArgs()
194193
JCommander.newBuilder()

src/main/java/org/radarbase/output/cleaner/TimestampExtractionCheck.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class TimestampExtractionCheck(
6161
TimestampFileCacheStore.FindResult.BAD_SCHEMA -> suffix += 1 // continue next suffix
6262
}
6363
} catch (ex: IOException) {
64-
logger.error("Failed to read target file for checking data integrity", ex)
64+
logger.error("Failed to read target file {} for checking data integrity", path, ex)
6565
return false
6666
}
6767
} while (true)

src/main/java/org/radarbase/output/cleaner/TimestampFileCache.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ class TimestampFileCache(
5353
if (header != null) {
5454
val recordHeader = converterFactory.headerFor(record)
5555
if (!recordHeader.contentEquals(header)) {
56-
throw IllegalArgumentException("Header mismatch")
56+
throw IllegalArgumentException(
57+
"Header mismatch: record header ${recordHeader.contentToString()}" +
58+
" does not match target header ${header.contentToString()}")
5759
}
5860
}
5961

src/main/java/org/radarbase/output/cleaner/TimestampFileCacheStore.kt

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.radarbase.output.cleaner
1919
import org.apache.avro.generic.GenericRecord
2020
import org.radarbase.output.FileStoreFactory
2121
import org.radarbase.output.util.Timer.time
22+
import org.slf4j.LoggerFactory
2223
import java.io.FileNotFoundException
2324
import java.io.IOException
2425
import java.nio.file.Path
@@ -60,13 +61,19 @@ class TimestampFileCacheStore(private val factory: FileStoreFactory) {
6061
}
6162

6263
time("cleaner.contains") {
63-
if (fileCache.contains(record)) FindResult.FOUND else FindResult.NOT_FOUND
64+
if (fileCache.contains(record)) FindResult.FOUND else {
65+
logger.warn("Target {} does not contain record {}", path, record)
66+
FindResult.NOT_FOUND
67+
}
6468
}
6569
} catch (ex: FileNotFoundException) {
70+
logger.warn("Target {} for {} has not been created yet.", path, record)
6671
FindResult.FILE_NOT_FOUND
6772
} catch (ex: IllegalArgumentException) {
73+
logger.warn("Schema of {} does not match schema of record {}: {}", path, record, ex.message)
6874
FindResult.BAD_SCHEMA
6975
} catch (ex: IndexOutOfBoundsException) {
76+
logger.warn("Schema of {} does not match schema of record {} (wrong number of columns)", path, record)
7077
FindResult.BAD_SCHEMA
7178
}
7279
}
@@ -95,4 +102,8 @@ class TimestampFileCacheStore(private val factory: FileStoreFactory) {
95102
NOT_FOUND,
96103
FOUND
97104
}
105+
106+
companion object {
107+
private val logger = LoggerFactory.getLogger(TimestampFileCacheStore::class.java)
108+
}
98109
}

src/main/java/org/radarbase/output/config/RestructureConfig.kt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,14 @@ data class WorkerConfig(
143143
* Number of offsets to simultaneously keep in cache. A higher size will
144144
* decrease overhead but increase memory usage.
145145
*/
146-
val cacheOffsetsSize: Long = 500_000) {
146+
val cacheOffsetsSize: Long = 500_000,
147+
/**
148+
* Minimum time since the file was last modified in seconds. Avoids
149+
* synchronization issues that may occur in a source file that is being
150+
* appended to.
151+
*/
152+
val minimumFileAge: Long = 60
153+
) {
147154
init {
148155
check(cacheSize >= 1) { "Maximum files per topic must be strictly positive" }
149156
maxFilesPerTopic?.let { check(it >= 1) { "Maximum files per topic must be strictly positive" } }

src/main/java/org/radarbase/output/util/TimeUtil.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,9 @@ import com.fasterxml.jackson.databind.JsonNode
44
import org.apache.avro.Schema
55
import org.apache.avro.generic.GenericRecord
66
import java.math.RoundingMode
7-
import java.time.Instant
8-
import java.time.LocalDate
9-
import java.time.LocalDateTime
10-
import java.time.ZoneOffset
7+
import java.time.*
118
import java.time.format.DateTimeParseException
9+
import java.time.temporal.Temporal
1210

1311
object TimeUtil {
1412
private val NANO_MULTIPLIER = 1_000_000_000.toBigDecimal()
@@ -143,4 +141,7 @@ object TimeUtil {
143141
fun Instant.toDouble() = (epochSecond.toBigDecimal()
144142
+ (nano.toBigDecimal().divide(NANO_MULTIPLIER, 9, RoundingMode.HALF_UP))
145143
).toDouble()
144+
145+
146+
fun Temporal.durationSince(): Duration = Duration.between(this, Instant.now())
146147
}

0 commit comments

Comments
 (0)