Skip to content

Commit 81f34d8

Browse files
committed
Null safety fixes
1 parent a5da65e commit 81f34d8

File tree

3 files changed

+14
-12
lines changed

3 files changed

+14
-12
lines changed

src/main/java/org/radarbase/hdfs/util/PostponedWriter.kt

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,17 +58,18 @@ abstract class PostponedWriter
5858
* not yet taken place, the write will occur earlier.
5959
*/
6060
fun triggerWrite() {
61-
var localWriteFuture: Future<*>? = writeFuture.get()
62-
if (localWriteFuture == null) {
63-
localWriteFuture = executor.schedule({ this.startWrite() }, timeout, timeoutUnit)
64-
if (!writeFuture.compareAndSet(null, localWriteFuture)) {
65-
localWriteFuture!!.cancel(false)
66-
}
61+
if (writeFuture.get() == null) {
62+
executor.schedule(::startWrite, timeout, timeoutUnit)
63+
.also { newWriteFuture ->
64+
if (!writeFuture.compareAndSet(null, newWriteFuture)) {
65+
newWriteFuture.cancel(false)
66+
}
67+
}
6768
}
6869
}
6970

7071
/** Start the write in the writer thread. */
71-
protected fun startWrite() {
72+
private fun startWrite() {
7273
writeFuture.set(null)
7374
doWrite()
7475
}
@@ -89,8 +90,9 @@ abstract class PostponedWriter
8990

9091
@Throws(IOException::class)
9192
private fun doFlush(shutdown: Boolean) {
92-
val localFuture = executor.submit { this.startWrite() }
93-
writeFuture.set(localFuture)
93+
val localFuture = executor.submit(::startWrite)
94+
.also { writeFuture.set(it) }
95+
9496
if (shutdown) {
9597
executor.shutdown()
9698
}

src/main/java/org/radarbase/hdfs/worker/FileCache.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ class FileCache(
128128
writer.close()
129129

130130
if (!hasError.get()) {
131-
if (deduplicate.enable!!) {
131+
if (deduplicate.enable == true) {
132132
time("close.deduplicate") {
133133
val dedupTmp = tmpPath.resolveSibling("${tmpPath.fileName}.dedup")
134134
converterFactory.deduplicate(fileName, tmpPath, dedupTmp, compression, deduplicate.distinctFields ?: emptySet(), deduplicate.ignoreFields ?: emptySet())

src/main/java/org/radarbase/hdfs/worker/RestructureWorker.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,11 @@ internal class RestructureWorker(
130130
}
131131

132132
@Throws(IOException::class)
133-
private fun writeRecord(topicPartition: TopicPartition, record: GenericRecord?,
133+
private fun writeRecord(topicPartition: TopicPartition, record: GenericRecord,
134134
offset: Long, suffix: Int = 0) {
135135
var currentSuffix = suffix
136136
val (path) = pathFactory.getRecordOrganization(
137-
topicPartition.topic, record!!, currentSuffix)
137+
topicPartition.topic, record, currentSuffix)
138138

139139
val transaction = time("accounting.create") {
140140
Accountant.Transaction(topicPartition, offset)

0 commit comments

Comments
 (0)