@@ -3,6 +3,8 @@ package org.radarbase.output.util
3
3
import com.fasterxml.jackson.databind.JsonNode
4
4
import org.apache.avro.Schema
5
5
import org.apache.avro.generic.GenericRecord
6
+ import org.radarbase.output.path.RecordPathFactory.Companion.getFieldOrNull
7
+ import org.radarbase.output.path.RecordPathFactory.Companion.getOrNull
6
8
import java.math.RoundingMode
7
9
import java.time.*
8
10
import java.time.format.DateTimeParseException
@@ -21,73 +23,74 @@ object TimeUtil {
21
23
fun getDate (key : GenericRecord ? ,
22
24
value : GenericRecord ? ): Instant ? {
23
25
value?.timeOrNull(" time" )
24
- ?.let { return it }
26
+ ?.let { return it }
25
27
26
28
key?.run {
27
29
timeOrNull(" timeStart" )
28
- ?.let { return it }
29
-
30
+ ?.let { return it }
30
31
31
- schema.getField(" start" )
32
- ?.takeIf { it.schema().type == Schema .Type .LONG }
33
- ?.let { return Instant .ofEpochMilli(get(it.pos()) as Long ) }
32
+ getFieldOrNull(" start" )
33
+ ?.takeIf { it.hasType(Schema .Type .LONG ) }
34
+ ?.let { get(it.pos()) as ? Long }
35
+ ?.let { return Instant .ofEpochMilli(it) }
34
36
}
35
37
36
38
value?.run {
37
39
dateTimeOrNull(" dateTime" )
38
- ?.let { return it }
40
+ ?.let { return it }
39
41
40
42
dateOrNull(" date" )
41
- ?.let { return it }
43
+ ?.let { return it }
42
44
43
45
timeOrNull(" timeReceived" )
44
- ?.let { return it }
46
+ ?.let { return it }
45
47
timeOrNull(" timeCompleted" )
46
- ?.let { return it }
48
+ ?.let { return it }
47
49
}
48
50
49
51
return null
50
52
}
51
53
52
54
fun getDate (key : JsonNode ? , value : JsonNode ? ): Double? {
53
- value?.get (" time" )
54
- ?.takeIf { it.isNumber }
55
- ?.let { return it.asDouble() }
55
+ value?.getOrNull (" time" )
56
+ ?.takeIf { it.isNumber }
57
+ ?.let { return it.asDouble() }
56
58
57
59
key?.run {
58
- get (" timeStart" )
59
- ?.takeIf { it.isNumber }
60
- ?.let { return it.asDouble() }
60
+ getOrNull (" timeStart" )
61
+ ?.takeIf { it.isNumber }
62
+ ?.let { return it.asDouble() }
61
63
62
- get (" start" )
63
- ?.takeIf { it.isNumber }
64
- ?.let { return it.asLong() / 1000.0 }
64
+ getOrNull (" start" )
65
+ ?.takeIf { it.isNumber }
66
+ ?.let { return it.asLong() / 1000.0 }
65
67
}
66
68
67
69
value?.run {
68
- get (" dateTime" )
69
- ?.takeIf { it.isTextual }
70
- ?.let { node -> return node.asText().parseDateTime()?.toDouble() }
70
+ getOrNull (" dateTime" )
71
+ ?.takeIf { it.isTextual }
72
+ ?.let { node -> return node.asText().parseDateTime()?.toDouble() }
71
73
72
- get (" date" )
73
- ?.takeIf { it.isTextual }
74
- ?.let { node -> return node.asText().parseDate()?.toDouble() }
74
+ getOrNull (" date" )
75
+ ?.takeIf { it.isTextual }
76
+ ?.let { node -> return node.asText().parseDate()?.toDouble() }
75
77
76
- get (" timeReceived" )
77
- ?.takeIf { it.isNumber }
78
- ?.let { return it.asDouble() }
78
+ getOrNull (" timeReceived" )
79
+ ?.takeIf { it.isNumber }
80
+ ?.let { return it.asDouble() }
79
81
80
- get (" timeCompleted" )
81
- ?.takeIf { it.isNumber }
82
- ?.let { return it.asDouble() }
82
+ getOrNull (" timeCompleted" )
83
+ ?.takeIf { it.isNumber }
84
+ ?.let { return it.asDouble() }
83
85
}
84
86
85
87
return null
86
88
}
87
89
88
- private fun GenericRecord.timeOrNull (fieldName : String ): Instant ? = schema.getField(fieldName)
89
- ?.takeIf { it.schema().type == Schema .Type .DOUBLE }
90
- ?.let { (get(it.pos()) as Double ).toInstant() }
90
+ private fun GenericRecord.timeOrNull (fieldName : String ): Instant ? = getFieldOrNull(fieldName)
91
+ ?.takeIf { it.hasType(Schema .Type .DOUBLE ) }
92
+ ?.let { (get(it.pos()) as ? Double ) }
93
+ ?.toInstant()
91
94
92
95
/* *
93
96
* Parse the dateTime field of a record, if present.
@@ -96,9 +99,11 @@ object TimeUtil {
96
99
* @return `Instant` representing the dateTime or `null` if the field cannot be
97
100
* found or parsed.
98
101
*/
99
- private fun GenericRecord.dateTimeOrNull (fieldName : String ): Instant ? = schema.getField(fieldName)
100
- ?.takeIf { it.schema().type == Schema .Type .STRING }
101
- ?.let { get(it.pos()).toString().parseDateTime() }
102
+ private fun GenericRecord.dateTimeOrNull (fieldName : String ): Instant ? = getFieldOrNull(fieldName)
103
+ ?.takeIf { it.hasType(Schema .Type .STRING ) }
104
+ ?.let { get(it.pos()) }
105
+ ?.toString()
106
+ ?.parseDateTime()
102
107
103
108
/* *
104
109
* Parse the date field of a record, if present.
@@ -107,9 +112,11 @@ object TimeUtil {
107
112
* @return `Instant` representing the start of given date or `null` if the field
108
113
* cannot be found or parsed.
109
114
*/
110
- private fun GenericRecord.dateOrNull (fieldName : String ): Instant ? = schema.getField(fieldName)
111
- ?.takeIf { it.schema().type == Schema .Type .STRING }
112
- ?.let { get(it.pos()).toString().parseDate() }
115
+ private fun GenericRecord.dateOrNull (fieldName : String ): Instant ? = getFieldOrNull(fieldName)
116
+ ?.takeIf { it.hasType(Schema .Type .STRING ) }
117
+ ?.let { get(it.pos()) }
118
+ ?.toString()
119
+ ?.parseDate()
113
120
114
121
private fun Double.toInstant (): Instant {
115
122
val time = toBigDecimal()
@@ -122,8 +129,8 @@ object TimeUtil {
122
129
123
130
fun String.parseDate (): Instant ? = try {
124
131
LocalDate .parse(this )
125
- .atStartOfDay(ZoneOffset .UTC )
126
- .toInstant()
132
+ .atStartOfDay(ZoneOffset .UTC )
133
+ .toInstant()
127
134
} catch (ex: DateTimeParseException ) {
128
135
null
129
136
}
@@ -142,6 +149,15 @@ object TimeUtil {
142
149
+ (nano.toBigDecimal().divide(NANO_MULTIPLIER , 9 , RoundingMode .HALF_UP ))
143
150
).toDouble()
144
151
152
+ private fun JsonNode.getOrNull (fieldName : String ): JsonNode ? = fields().asSequence()
153
+ .find { (name, _) -> name.equals(fieldName, ignoreCase = true ) }
154
+ ?.value
155
+
156
+ private fun Schema.Field.hasType (type : Schema .Type ): Boolean {
157
+ val s = schema()
158
+ return s.type == type
159
+ || (s.type == Schema .Type .UNION && s.types.any { it.type == type })
160
+ }
145
161
146
162
fun Temporal.durationSince (): Duration = Duration .between(this , Instant .now())
147
163
}
0 commit comments