Skip to content

Commit 8f33e3c

Browse files
authored
Merge pull request #83 from RADAR-base/release-1.1.4
Release 1.1.4
2 parents e128322 + 305eaf4 commit 8f33e3c

File tree

3 files changed

+25
-17
lines changed

3 files changed

+25
-17
lines changed

build.gradle

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ plugins {
99
}
1010

1111
group 'org.radarbase'
12-
version '1.1.3'
12+
version '1.1.4'
1313
mainClassName = 'org.radarbase.output.Application'
1414

1515
sourceCompatibility = '1.8'
@@ -30,6 +30,7 @@ ext {
3030
junitVersion = '5.6.1'
3131
minioVersion = '7.1.0'
3232
jedisVersion = '3.2.0'
33+
slf4jVersion = '1.7.30'
3334
azureStorageVersion = '12.6.0'
3435
}
3536

@@ -69,7 +70,9 @@ dependencies {
6970
implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: hadoopVersion
7071

7172
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
73+
implementation group: 'org.slf4j', name: 'slf4j-api', version: slf4jVersion
7274

75+
runtimeOnly group: 'org.slf4j', name: 'slf4j-log4j12', version: slf4jVersion
7376
runtimeOnly group: 'org.apache.hadoop', name: 'hadoop-hdfs-client', version: hadoopVersion
7477
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: junitVersion
7578
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-params', version: junitVersion

src/main/java/org/radarbase/output/cleaner/TimestampExtractionCheck.kt

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ class TimestampExtractionCheck(
2828
return false
2929
}
3030
RestructureWorker.extractRecords(input) { records ->
31-
records.all { record ->
32-
cachedRecords += 1
33-
containsRecord(file.topic, record)
34-
}
31+
records
32+
.mapIndexed { i, record ->
33+
cachedRecords += 1L
34+
containsRecord(file, file.range.range.from + i.toLong(), record)
35+
}
36+
.all { it }
3537
}
3638
}
3739
if (cachedRecords > batchSize) {
@@ -46,19 +48,28 @@ class TimestampExtractionCheck(
4648
}
4749

4850

49-
private fun containsRecord(topic: String, record: GenericRecord): Boolean {
51+
private fun containsRecord(topicFile: TopicFile, offset: Long, record: GenericRecord): Boolean {
5052
var suffix = 0
5153

5254
do {
5355
val (path) = pathFactory.getRecordOrganization(
54-
topic, record, suffix)
56+
topicFile.topic, record, suffix)
5557

5658
try {
5759
when (cacheStore.contains(path, record)) {
58-
TimestampFileCacheStore.FindResult.FILE_NOT_FOUND -> return false
59-
TimestampFileCacheStore.FindResult.NOT_FOUND -> return false
60+
TimestampFileCacheStore.FindResult.FILE_NOT_FOUND -> {
61+
logger.warn("Target {} for record of {} (offset {}) has not been created yet.", path, topicFile.path, offset)
62+
return false
63+
}
64+
TimestampFileCacheStore.FindResult.NOT_FOUND -> {
65+
logger.warn("Target {} does not contain record of {} (offset {})", path, topicFile.path, offset)
66+
return false
67+
}
6068
TimestampFileCacheStore.FindResult.FOUND -> return true
61-
TimestampFileCacheStore.FindResult.BAD_SCHEMA -> suffix += 1 // continue next suffix
69+
TimestampFileCacheStore.FindResult.BAD_SCHEMA -> {
70+
logger.debug("Schema of {} does not match schema of {} (offset {})", path, topicFile.path, offset)
71+
suffix += 1 // continue next suffix
72+
}
6273
}
6374
} catch (ex: IOException) {
6475
logger.error("Failed to read target file {} for checking data integrity", path, ex)

src/main/java/org/radarbase/output/cleaner/TimestampFileCacheStore.kt

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,19 +61,13 @@ class TimestampFileCacheStore(private val factory: FileStoreFactory) {
6161
}
6262

6363
time("cleaner.contains") {
64-
if (fileCache.contains(record)) FindResult.FOUND else {
65-
logger.warn("Target {} does not contain record {}", path, record)
66-
FindResult.NOT_FOUND
67-
}
64+
if (fileCache.contains(record)) FindResult.FOUND else FindResult.NOT_FOUND
6865
}
6966
} catch (ex: FileNotFoundException) {
70-
logger.warn("Target {} for {} has not been created yet.", path, record)
7167
FindResult.FILE_NOT_FOUND
7268
} catch (ex: IllegalArgumentException) {
73-
logger.warn("Schema of {} does not match schema of record {}: {}", path, record, ex.message)
7469
FindResult.BAD_SCHEMA
7570
} catch (ex: IndexOutOfBoundsException) {
76-
logger.warn("Schema of {} does not match schema of record {} (wrong number of columns)", path, record)
7771
FindResult.BAD_SCHEMA
7872
}
7973
}

0 commit comments

Comments
 (0)