@@ -30,6 +30,7 @@ import java.util.Locale
3030import java.util.concurrent.atomic.AtomicBoolean
3131import java.util.concurrent.atomic.AtomicInteger
3232import java.util.concurrent.atomic.AtomicLong
33+ import java.util.concurrent.atomic.AtomicReference
3334import kotlin.math.roundToLong
3435
3536@Suppress(" TooManyFunctions" )
@@ -50,10 +51,7 @@ internal class BatchFileOrchestrator(
5051 @Suppress(" UnsafeThirdPartyFunctionCall" ) // rounded Double isn't NaN
5152 private val recentWriteDelayMs = (config.recentDelayMs * DECREASE_PERCENT ).roundToLong()
5253
53- // keep track of how many items were written in the last known file
54- private var previousFile: File ? = null
55- private var previousFileItemCount: Long = 0
56- private var lastFileAccessTimestamp: Long = 0L
54+ private val currentBatchState = AtomicReference (CurrentBatchState (null , 0L , 0L ))
5755 private val lastCleanupTimestamp = AtomicLong (0L )
5856 private val isFileObserverStarted = AtomicBoolean (false )
5957
@@ -64,17 +62,16 @@ internal class BatchFileOrchestrator(
6462 override fun onEvent (event : Int , name : String? ) {
6563 if (! name.isNullOrEmpty() && name.isBatchFileName) {
6664 val file = File (rootDir, name)
67- when ( event) {
68- MOVED_TO , CREATE -> {
69- synchronized(knownFiles ) {
70- knownFiles.add(file)
71- }
65+ val isAdded = event and ( MOVED_TO or CREATE ) != 0
66+ val isRemoved = event and ( MOVED_FROM or DELETE ) != 0
67+ if (isAdded ) {
68+ synchronized(knownFiles) {
69+ knownFiles.add(file)
7270 }
73-
74- MOVED_FROM , DELETE -> {
75- synchronized(knownFiles) {
76- knownFiles.remove(file)
77- }
71+ }
72+ if (isRemoved) {
73+ synchronized(knownFiles) {
74+ knownFiles.remove(file)
7875 }
7976 }
8077 }
@@ -110,7 +107,7 @@ internal class BatchFileOrchestrator(
110107 pendingFiles.set(files.count())
111108
112109 return files.firstOrNull {
113- (it !in excludeFiles) && ! isFileRecent(it, recentReadDelayMs)
110+ (it !in excludeFiles) && ! isFileRecent(it, recentReadDelayMs) && it.existsSafe(internalLogger)
114111 }
115112 }
116113
@@ -226,28 +223,25 @@ internal class BatchFileOrchestrator(
226223 }
227224 }
228225
229- private fun createNewFile (): File {
230- val newFileName = timeProvider.getDeviceTimestampMillis().toString()
231- val newFile = File (rootDir, newFileName)
232- val closedFile = previousFile
233- val closedFileLastAccessTimestamp = lastFileAccessTimestamp
234- if (closedFile != null ) {
226+ private fun createNewFile (): File = synchronized(currentBatchState) {
227+ val now = timeProvider.getDeviceTimestampMillis()
228+ val newFile = File (rootDir, now.toString())
229+ val previousState = currentBatchState.get()
230+ if (previousState.file != null ) {
235231 metricsDispatcher.sendBatchClosedMetric(
236- closedFile ,
232+ previousState.file ,
237233 BatchClosedMetadata (
238- lastTimeWasUsedInMs = closedFileLastAccessTimestamp ,
239- eventsCount = previousFileItemCount
234+ lastTimeWasUsedInMs = previousState.lastAccessTimestamp ,
235+ eventsCount = previousState.itemCount
240236 )
241237 )
242238 }
243- previousFile = newFile
244- previousFileItemCount = 1
245- lastFileAccessTimestamp = timeProvider.getDeviceTimestampMillis()
239+ currentBatchState.set(CurrentBatchState (newFile, 1L , now))
246240 pendingFiles.incrementAndGet()
247241 synchronized(knownFiles) {
248242 knownFiles.add(newFile)
249243 }
250- return newFile
244+ newFile
251245 }
252246
253247 @Suppress(" ReturnCount" )
@@ -259,27 +253,32 @@ internal class BatchFileOrchestrator(
259253 return null
260254 }
261255
262- val lastKnownFile = previousFile
263- val lastKnownFileItemCount = previousFileItemCount
264- if (lastKnownFile != lastFile) {
265- // this situation can happen because:
266- // 1. `lastFile` is a file written during a previous session
267- // 2. `lastFile` was created by another system/process
268- // 3. `lastKnownFile ` was deleted
269- // In any case, we don't know the item count, so to be safe, we create a new file
270- return null
271- }
256+ synchronized(currentBatchState) {
257+ val state = currentBatchState.get()
258+ if (state.file != lastFile) {
259+ // this situation can happen because:
260+ // 1. `lastFile` is a file written during a previous session
261+ // 2. `lastFile` was created by another system/process
262+ // 3. `state.file ` was deleted
263+ // In any case, we don't know the item count, so to be safe, we create a new file
264+ return null
265+ }
272266
273- val isRecentEnough = isFileRecent(lastFile, recentWriteDelayMs)
274- val hasRoomForMore = lastFile.lengthSafe(internalLogger) < config.maxBatchSize
275- val hasSlotForMore = (lastKnownFileItemCount < config.maxItemsPerBatch)
267+ val isRecentEnough = isFileRecent(lastFile, recentWriteDelayMs)
268+ val hasRoomForMore = lastFile.lengthSafe(internalLogger) < config.maxBatchSize
269+ val hasSlotForMore = (state.itemCount < config.maxItemsPerBatch)
276270
277- return if (isRecentEnough && hasRoomForMore && hasSlotForMore) {
278- previousFileItemCount = lastKnownFileItemCount + 1
279- lastFileAccessTimestamp = timeProvider.getDeviceTimestampMillis()
280- lastFile
281- } else {
282- null
271+ if (! isRecentEnough || ! hasRoomForMore || ! hasSlotForMore) {
272+ return null
273+ }
274+
275+ currentBatchState.set(
276+ state.copy(
277+ itemCount = state.itemCount + 1 ,
278+ lastAccessTimestamp = timeProvider.getDeviceTimestampMillis()
279+ )
280+ )
281+ return lastFile
283282 }
284283 }
285284
@@ -379,6 +378,12 @@ internal class BatchFileOrchestrator(
379378
380379 // endregion
381380
381+ private data class CurrentBatchState (
382+ val file : File ? ,
383+ val itemCount : Long ,
384+ val lastAccessTimestamp : Long
385+ )
386+
382387 companion object {
383388
384389 private const val FILE_OBSERVER_MASK = CREATE or DELETE or MOVED_TO or MOVED_FROM
0 commit comments