@@ -85,7 +85,8 @@ class SegmentDestination(
85
85
var initialDelay = flushIntervalInMillis
86
86
87
87
// If we have events in queue flush them
88
- val eventFilePaths = parseFilePaths(storage.read(Storage .Constants .Events )) // should we switch to extension function?
88
+ val eventFilePaths =
89
+ parseFilePaths(storage.read(Storage .Constants .Events )) // should we switch to extension function?
89
90
if (eventFilePaths.isNotEmpty()) {
90
91
initialDelay = 0
91
92
}
@@ -115,6 +116,9 @@ class SegmentDestination(
115
116
}
116
117
117
118
private fun performFlush () {
119
+ if (eventCount.get() < 1 ) {
120
+ return
121
+ }
118
122
val fileUrls = parseFilePaths(storage.read(Storage .Constants .Events ))
119
123
if (fileUrls.isEmpty()) {
120
124
analytics.log(" No events to upload" )
@@ -124,9 +128,14 @@ class SegmentDestination(
124
128
for (fileUrl in fileUrls) {
125
129
try {
126
130
val connection = httpClient.upload(apiHost, apiKey)
131
+ val file = File (fileUrl)
132
+ // flush is executed in a thread pool and file could have been deleted by another thread
133
+ if (! file.exists()) {
134
+ continue
135
+ }
127
136
connection.outputStream?.let {
128
137
// Write the payloads into the OutputStream.
129
- val fileInputStream = FileInputStream (File (fileUrl) )
138
+ val fileInputStream = FileInputStream (file )
130
139
fileInputStream.copyTo(connection.outputStream)
131
140
fileInputStream.close()
132
141
connection.outputStream.close()
@@ -151,11 +160,13 @@ class SegmentDestination(
151
160
)
152
161
}
153
162
} catch (e: Exception ) {
154
- analytics.log("""
163
+ analytics.log(
164
+ """
155
165
| Error uploading events from batch file
156
166
| fileUrl="$fileUrl "
157
167
| msg=${e.message}
158
- """ .trimMargin(), type = LogType .ERROR )
168
+ """ .trimMargin(), type = LogType .ERROR
169
+ )
159
170
e.printStackTrace()
160
171
}
161
172
}
0 commit comments