Skip to content

Commit c69508d

Browse files
authored
Merge pull request #76 from RADAR-base/cleaner-messages
More data cleaner logging
2 parents 0994358 + 231f212 commit c69508d

File tree

5 files changed

+218
-3
lines changed

5 files changed

+218
-3
lines changed

src/main/java/org/radarbase/output/cleaner/TimestampExtractionCheck.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class TimestampExtractionCheck(
6161
TimestampFileCacheStore.FindResult.BAD_SCHEMA -> suffix += 1 // continue next suffix
6262
}
6363
} catch (ex: IOException) {
64-
logger.error("Failed to read target file for checking data integrity", ex)
64+
logger.error("Failed to read target file {} for checking data integrity", path, ex)
6565
return false
6666
}
6767
} while (true)

src/main/java/org/radarbase/output/cleaner/TimestampFileCache.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ class TimestampFileCache(
5353
if (header != null) {
5454
val recordHeader = converterFactory.headerFor(record)
5555
if (!recordHeader.contentEquals(header)) {
56-
throw IllegalArgumentException("Header mismatch")
56+
throw IllegalArgumentException(
57+
"Header mismatch: record header ${recordHeader.contentToString()}" +
58+
" does not match target header ${header.contentToString()}")
5759
}
5860
}
5961

src/main/java/org/radarbase/output/cleaner/TimestampFileCacheStore.kt

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.radarbase.output.cleaner
1919
import org.apache.avro.generic.GenericRecord
2020
import org.radarbase.output.FileStoreFactory
2121
import org.radarbase.output.util.Timer.time
22+
import org.slf4j.LoggerFactory
2223
import java.io.FileNotFoundException
2324
import java.io.IOException
2425
import java.nio.file.Path
@@ -60,13 +61,19 @@ class TimestampFileCacheStore(private val factory: FileStoreFactory) {
6061
}
6162

6263
time("cleaner.contains") {
63-
if (fileCache.contains(record)) FindResult.FOUND else FindResult.NOT_FOUND
64+
if (fileCache.contains(record)) FindResult.FOUND else {
65+
logger.warn("Target {} does not contain record {}", path, record)
66+
FindResult.NOT_FOUND
67+
}
6468
}
6569
} catch (ex: FileNotFoundException) {
70+
logger.warn("Target {} for {} has not been created yet.", path, record)
6671
FindResult.FILE_NOT_FOUND
6772
} catch (ex: IllegalArgumentException) {
73+
logger.warn("Schema of {} does not match schema of record {}: {}", path, record, ex.message)
6874
FindResult.BAD_SCHEMA
6975
} catch (ex: IndexOutOfBoundsException) {
76+
logger.warn("Schema of {} does not match schema of record {} (wrong number of columns)", path, record)
7077
FindResult.BAD_SCHEMA
7178
}
7279
}
@@ -95,4 +102,8 @@ class TimestampFileCacheStore(private val factory: FileStoreFactory) {
95102
NOT_FOUND,
96103
FOUND
97104
}
105+
106+
companion object {
107+
private val logger = LoggerFactory.getLogger(TimestampFileCacheStore::class.java)
108+
}
98109
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package org.radarbase.output.cleaner
2+
3+
import com.nhaarman.mockitokotlin2.doReturn
4+
import com.nhaarman.mockitokotlin2.mock
5+
import org.apache.avro.Schema
6+
import org.apache.avro.generic.GenericData
7+
import org.apache.avro.generic.GenericRecord
8+
import org.apache.avro.generic.GenericRecordBuilder
9+
import org.hamcrest.MatcherAssert.assertThat
10+
import org.hamcrest.Matchers.`is`
11+
import org.junit.jupiter.api.BeforeEach
12+
import org.junit.jupiter.api.Test
13+
import org.junit.jupiter.api.assertThrows
14+
import org.junit.jupiter.api.io.TempDir
15+
import org.radarbase.output.FileStoreFactory
16+
import org.radarbase.output.compression.IdentityCompression
17+
import org.radarbase.output.config.LocalConfig
18+
import org.radarbase.output.format.CsvAvroConverterFactory
19+
import org.radarbase.output.target.LocalTargetStorage
20+
import java.io.ByteArrayInputStream
21+
import java.io.FileNotFoundException
22+
import java.io.InputStreamReader
23+
import java.nio.file.Files
24+
import java.nio.file.Path
25+
26+
internal class TimestampFileCacheTest {
27+
private lateinit var record: GenericData.Record
28+
private var now: Double = 0.0
29+
private lateinit var schema: Schema
30+
private lateinit var factory: FileStoreFactory
31+
private lateinit var csvConverter: CsvAvroConverterFactory
32+
33+
@BeforeEach
34+
fun setUp() {
35+
csvConverter = CsvAvroConverterFactory()
36+
factory = mock {
37+
on { recordConverter } doReturn csvConverter
38+
on { targetStorage } doReturn LocalTargetStorage(LocalConfig())
39+
on { compression } doReturn IdentityCompression()
40+
}
41+
schema = Schema.Parser().parse(javaClass.getResourceAsStream("android_phone_light.avsc"))
42+
now = System.currentTimeMillis() / 1000.0
43+
record = GenericRecordBuilder(schema)
44+
.set("key", GenericRecordBuilder(schema.getField("key")!!.schema())
45+
.set("projectId", "p")
46+
.set("userId", "u")
47+
.set("sourceId", "s")
48+
.build())
49+
.set("value", GenericRecordBuilder(schema.getField("value")!!.schema())
50+
.set("time", now)
51+
.set("timeReceived", now + 1.0)
52+
.set("light", 1.0f)
53+
.build())
54+
.build()
55+
}
56+
57+
@Test
58+
fun testFileCacheFound(@TempDir path: Path) {
59+
val targetPath = path.resolve("test.avro")
60+
writeRecord(targetPath, record)
61+
val timestampFileCache = TimestampFileCache(factory, targetPath)
62+
assertThat(timestampFileCache.contains(record), `is`(true))
63+
}
64+
65+
private fun writeRecord(path: Path, record: GenericRecord) {
66+
Files.newBufferedWriter(path).use { wr ->
67+
ByteArrayInputStream(ByteArray(0)).use { emptyInput ->
68+
InputStreamReader(emptyInput).use { emptyReader ->
69+
csvConverter.converterFor(wr, record, true, emptyReader).use { converter ->
70+
converter.writeRecord(record)
71+
}
72+
}
73+
}
74+
}
75+
}
76+
77+
@Test
78+
fun testFileCacheNotFound(@TempDir path: Path) {
79+
val targetPath = path.resolve("test.avro")
80+
assertThrows<FileNotFoundException> { TimestampFileCache(factory, targetPath) }
81+
}
82+
83+
@Test
84+
fun testHeaderMismatch(@TempDir path: Path) {
85+
val targetPath = path.resolve("test.avro")
86+
Files.newBufferedWriter(targetPath).use { writer ->
87+
writer.write("key.projectId,key.userId,key.sourceId,value.time,value.timeReceived,value.luminance")
88+
}
89+
val cache = TimestampFileCache(factory, targetPath)
90+
assertThrows<IllegalArgumentException> { cache.contains(record) }
91+
}
92+
93+
@Test
94+
fun testNotFound(@TempDir path: Path) {
95+
val targetPath = path.resolve("test.avro")
96+
97+
val otherRecord = GenericRecordBuilder(record)
98+
.set("value", GenericRecordBuilder(record.get("value") as GenericData.Record)
99+
.set("time", now + 1.0)
100+
.set("timeReceived", now + 2.0)
101+
.build())
102+
.build()
103+
104+
writeRecord(targetPath, otherRecord)
105+
val cache = TimestampFileCache(factory, targetPath)
106+
assertThat(cache.contains(record), `is`(false))
107+
}
108+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
{
2+
"type" : "record",
3+
"name" : "PhoneLight",
4+
"namespace" : "org.radarcns.kafka.ObservationKey_org.radarcns.passive.phone",
5+
"doc" : "combined key-value record",
6+
"fields" : [ {
7+
"name" : "key",
8+
"type" : {
9+
"type" : "record",
10+
"name" : "ObservationKey",
11+
"namespace" : "org.radarcns.kafka",
12+
"doc" : "Key of an observation.",
13+
"fields" : [ {
14+
"name" : "projectId",
15+
"type" : [ "null", {
16+
"type" : "string",
17+
"connect.doc" : "Project identifier. Null if unknown or the user is not enrolled in a project.",
18+
"connect.parameters" : {
19+
"avro.java.string" : "String"
20+
},
21+
"avro.java.string" : "String"
22+
} ],
23+
"doc" : "Project identifier. Null if unknown or the user is not enrolled in a project.",
24+
"default" : null
25+
}, {
26+
"name" : "userId",
27+
"type" : {
28+
"type" : "string",
29+
"connect.doc" : "User Identifier created during the enrolment.",
30+
"connect.parameters" : {
31+
"avro.java.string" : "String"
32+
},
33+
"avro.java.string" : "String"
34+
},
35+
"doc" : "User Identifier created during the enrolment."
36+
}, {
37+
"name" : "sourceId",
38+
"type" : {
39+
"type" : "string",
40+
"connect.doc" : "Unique identifier associated with the source.",
41+
"connect.parameters" : {
42+
"avro.java.string" : "String"
43+
},
44+
"avro.java.string" : "String"
45+
},
46+
"doc" : "Unique identifier associated with the source."
47+
} ],
48+
"connect.doc" : "Key of an observation.",
49+
"connect.version" : 1,
50+
"connect.parameters" : {
51+
"connect.record.doc" : "Key of an observation."
52+
},
53+
"connect.name" : "org.radarcns.kafka.ObservationKey"
54+
},
55+
"doc" : "Key of a Kafka SinkRecord"
56+
}, {
57+
"name" : "value",
58+
"type" : {
59+
"type" : "record",
60+
"name" : "PhoneLight",
61+
"namespace" : "org.radarcns.passive.phone",
62+
"doc" : "Data from the light sensor in luminous flux per unit area.",
63+
"fields" : [ {
64+
"name" : "time",
65+
"type" : {
66+
"type" : "double",
67+
"connect.doc" : "Device timestamp in UTC (s)."
68+
},
69+
"doc" : "Device timestamp in UTC (s)."
70+
}, {
71+
"name" : "timeReceived",
72+
"type" : {
73+
"type" : "double",
74+
"connect.doc" : "Device receiver timestamp in UTC (s)."
75+
},
76+
"doc" : "Device receiver timestamp in UTC (s)."
77+
}, {
78+
"name" : "light",
79+
"type" : {
80+
"type" : "float",
81+
"connect.doc" : "Illuminance (lx)."
82+
},
83+
"doc" : "Illuminance (lx)."
84+
} ],
85+
"connect.doc" : "Data from the light sensor in luminous flux per unit area.",
86+
"connect.version" : 1,
87+
"connect.parameters" : {
88+
"connect.record.doc" : "Data from the light sensor in luminous flux per unit area."
89+
},
90+
"connect.name" : "org.radarcns.passive.phone.PhoneLight"
91+
},
92+
"doc" : "Value of a Kafka SinkRecord"
93+
} ]
94+
}

0 commit comments

Comments
 (0)