Skip to content

Commit 0994358

Browse files
authored
Merge pull request #77 from RADAR-base/skipRecentFiles
Only process files if they are a minimum age.
2 parents 10cf0aa + aa69eba commit 0994358

File tree

6 files changed

+41
-18
lines changed

6 files changed

+41
-18
lines changed

restructure.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ worker:
7979
numThreads: 2
8080
# Maximum number of files to process in any given topic.
8181
maxFilesPerTopic: null
82+
# Minimum time in seconds since a source file was last modified before it is
83+
# considered for processing. This avoids synchronization issues when a file has just been
84+
# written.
85+
minimumFileAge: 60
8286

8387
cleaner:
8488
# Enable cleaning up old source files

src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@ import io.minio.PutObjectOptions
44
import io.minio.PutObjectOptions.MAX_PART_SIZE
55
import org.junit.jupiter.api.Assertions.assertEquals
66
import org.junit.jupiter.api.Test
7-
import org.radarbase.output.config.PathConfig
8-
import org.radarbase.output.config.ResourceConfig
9-
import org.radarbase.output.config.RestructureConfig
10-
import org.radarbase.output.config.S3Config
7+
import org.radarbase.output.config.*
118
import org.radarbase.output.util.Timer
129
import java.nio.charset.StandardCharsets.UTF_8
1310
import java.nio.file.Paths
@@ -29,7 +26,8 @@ class RestructureS3IntegrationTest {
2926
val config = RestructureConfig(
3027
source = ResourceConfig("s3", s3 = sourceConfig),
3128
target = ResourceConfig("s3", s3 = targetConfig),
32-
paths = PathConfig(inputs = listOf(Paths.get("in")))
29+
paths = PathConfig(inputs = listOf(Paths.get("in"))),
30+
worker = WorkerConfig(minimumFileAge = 0L)
3331
)
3432
val application = Application(config)
3533
val sourceClient = sourceConfig.createS3Client()

src/main/java/org/radarbase/output/Application.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.radarbase.output.util.ProgressBar.Companion.format
3232
import org.radarbase.output.util.Timer
3333
import org.radarbase.output.worker.FileCacheStore
3434
import org.radarbase.output.cleaner.SourceDataCleaner
35+
import org.radarbase.output.util.TimeUtil.durationSince
3536
import org.radarbase.output.worker.RadarKafkaRestructure
3637
import org.slf4j.LoggerFactory
3738
import redis.clients.jedis.JedisPool
@@ -187,8 +188,6 @@ class Application(
187188
private val logger = LoggerFactory.getLogger(Application::class.java)
188189
const val CACHE_SIZE_DEFAULT = 100
189190

190-
private fun Temporal.durationSince() = Duration.between(this, Instant.now())
191-
192191
private fun parseArgs(args: Array<String>): CommandLineArgs {
193192
val commandLineArgs = CommandLineArgs()
194193
JCommander.newBuilder()

src/main/java/org/radarbase/output/config/RestructureConfig.kt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,14 @@ data class WorkerConfig(
143143
* Number of offsets to simultaneously keep in cache. A higher size will
144144
* decrease overhead but increase memory usage.
145145
*/
146-
val cacheOffsetsSize: Long = 500_000) {
146+
val cacheOffsetsSize: Long = 500_000,
147+
/**
148+
* Minimum time since the file was last modified in seconds. Avoids
149+
* synchronization issues that may occur in a source file that is being
150+
* appended to.
151+
*/
152+
val minimumFileAge: Long = 60
153+
) {
147154
init {
148155
check(cacheSize >= 1) { "Maximum files per topic must be strictly positive" }
149156
maxFilesPerTopic?.let { check(it >= 1) { "Maximum files per topic must be strictly positive" } }

src/main/java/org/radarbase/output/util/TimeUtil.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,9 @@ import com.fasterxml.jackson.databind.JsonNode
44
import org.apache.avro.Schema
55
import org.apache.avro.generic.GenericRecord
66
import java.math.RoundingMode
7-
import java.time.Instant
8-
import java.time.LocalDate
9-
import java.time.LocalDateTime
10-
import java.time.ZoneOffset
7+
import java.time.*
118
import java.time.format.DateTimeParseException
9+
import java.time.temporal.Temporal
1210

1311
object TimeUtil {
1412
private val NANO_MULTIPLIER = 1_000_000_000.toBigDecimal()
@@ -143,4 +141,7 @@ object TimeUtil {
143141
fun Instant.toDouble() = (epochSecond.toBigDecimal()
144142
+ (nano.toBigDecimal().divide(NANO_MULTIPLIER, 9, RoundingMode.HALF_UP))
145143
).toDouble()
144+
145+
146+
fun Temporal.durationSince(): Duration = Duration.between(this, Instant.now())
146147
}

src/main/java/org/radarbase/output/worker/RadarKafkaRestructure.kt

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ import org.radarbase.output.FileStoreFactory
2020
import org.radarbase.output.accounting.Accountant
2121
import org.radarbase.output.accounting.OffsetRangeSet
2222
import org.radarbase.output.source.TopicFileList
23+
import org.radarbase.output.util.TimeUtil.durationSince
2324
import org.slf4j.LoggerFactory
2425
import java.io.Closeable
2526
import java.io.IOException
2627
import java.nio.file.Path
2728
import java.nio.file.Paths
29+
import java.time.Duration
2830
import java.util.concurrent.atomic.AtomicBoolean
2931
import java.util.concurrent.atomic.LongAdder
3032

@@ -39,17 +41,28 @@ import java.util.concurrent.atomic.LongAdder
3941
class RadarKafkaRestructure(
4042
private val fileStoreFactory: FileStoreFactory
4143
): Closeable {
42-
private val maxFilesPerTopic: Int = fileStoreFactory.config.worker.maxFilesPerTopic ?: Int.MAX_VALUE
4344
private val sourceStorage = fileStoreFactory.sourceStorage
4445

4546
private val lockManager = fileStoreFactory.remoteLockManager
46-
private val excludeTopics: Set<String> = fileStoreFactory.config.topics
47-
.mapNotNullTo(HashSet()) { (topic, conf) ->
48-
topic.takeIf { conf.exclude }
49-
}
5047

5148
private val isClosed = AtomicBoolean(false)
5249

50+
private val excludeTopics: Set<String>
51+
private val maxFilesPerTopic: Int
52+
private val minimumFileAge: Duration
53+
54+
init {
55+
val config = fileStoreFactory.config
56+
excludeTopics = config.topics
57+
.mapNotNullTo(HashSet()) { (topic, conf) ->
58+
topic.takeIf { conf.exclude }
59+
}
60+
61+
val workerConfig = config.worker
62+
maxFilesPerTopic = workerConfig.maxFilesPerTopic ?: Int.MAX_VALUE
63+
minimumFileAge = Duration.ofSeconds(workerConfig.minimumFileAge.coerceAtLeast(0L))
64+
}
65+
5366
val processedFileCount = LongAdder()
5467
val processedRecordsCount = LongAdder()
5568

@@ -103,7 +116,8 @@ class RadarKafkaRestructure(
103116
return RestructureWorker(sourceStorage, accountant, fileStoreFactory, isClosed).use { worker ->
104117
try {
105118
val topicPaths = TopicFileList(topic, sourceStorage.walker.walkRecords(topic, topicPath)
106-
.filter { f -> !seenFiles.contains(f.range) }
119+
.filter { f -> !seenFiles.contains(f.range)
120+
&& f.lastModified.durationSince() >= minimumFileAge }
107121
.take(maxFilesPerTopic)
108122
.toList())
109123

0 commit comments

Comments
 (0)