Skip to content

Commit ae2b0fb

Browse files
authored
Fix Windows file deletion by closing stream before removeFile (#287)
Follow-up to #284 Move `storage.removeFile()` call outside the `use{}` block to ensure `InputStream` is fully closed before attempting file deletion. On Windows, files cannot be deleted while file handles remain open, causing silent deletion failures and infinite upload retries of the same batch files.
1 parent 32522e7 commit ae2b0fb

File tree

2 files changed

+93
-4
lines changed

2 files changed

+93
-4
lines changed

core/src/main/java/com/segment/analytics/kotlin/core/platform/EventPipeline.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@ open class EventPipeline(
132132
val fileUrlList = parseFilePaths(storage.read(Storage.Constants.Events))
133133
for (url in fileUrlList) {
134134
// upload event file
135+
var shouldCleanup = true
135136
storage.readAsStream(url)?.use { data ->
136-
var shouldCleanup = true
137137
try {
138138
val connection = httpClient.upload(apiHost)
139139
connection.outputStream?.let {
@@ -150,10 +150,10 @@ open class EventPipeline(
150150
analytics.reportInternalError(e)
151151
shouldCleanup = handleUploadException(e, url)
152152
}
153+
}
153154

154-
if (shouldCleanup) {
155-
storage.removeFile(url)
156-
}
155+
if (shouldCleanup) {
156+
storage.removeFile(url)
157157
}
158158
}
159159
}

core/src/test/kotlin/com/segment/analytics/kotlin/core/platform/EventPipelineTest.kt

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,4 +234,93 @@ internal class EventPipelineTest {
234234
// Verify that close() was called on the InputStream even when exception occurred
235235
assertTrue(isClosed, "InputStream should have been closed when exception occurs")
236236
}
237+
238+
@Test
239+
fun `removeFile is called after InputStream is closed`() {
240+
// Track order of operations to ensure stream is closed before file deletion
241+
val operationOrder = mutableListOf<String>()
242+
var streamClosed = false
243+
244+
val trackableInputStream = object : java.io.InputStream() {
245+
override fun read(): Int = -1
246+
override fun close() {
247+
streamClosed = true
248+
operationOrder.add("stream_closed")
249+
super.close()
250+
}
251+
}
252+
253+
every { storage.readAsStream(any()) } returns trackableInputStream
254+
every { storage.removeFile(any()) } answers {
255+
operationOrder.add("file_removed")
256+
// Verify stream was closed before removeFile is called (Windows file locking requirement)
257+
assertTrue(streamClosed, "InputStream must be closed before removeFile is called")
258+
true
259+
}
260+
261+
pipeline.put(event1)
262+
pipeline.put(event2)
263+
264+
// Give some time for async processing
265+
Thread.sleep(500)
266+
267+
// Verify correct operation order
268+
coVerify {
269+
storage.removeFile(any())
270+
}
271+
assertTrue(operationOrder == listOf("stream_closed", "file_removed"),
272+
"Expected order: [stream_closed, file_removed], but got: $operationOrder")
273+
}
274+
275+
@Test
276+
fun `removeFile is called when readAsStream returns null`() {
277+
every { storage.readAsStream(any()) } returns null
278+
279+
pipeline.put(event1)
280+
pipeline.put(event2)
281+
282+
// Give some time for async processing
283+
Thread.sleep(500)
284+
285+
// Verify file is still deleted even when stream is null
286+
coVerify {
287+
storage.removeFile(any())
288+
}
289+
}
290+
291+
@Test
292+
fun `InputStream is closed before removeFile even when upload throws exception`() {
293+
val operationOrder = mutableListOf<String>()
294+
var streamClosed = false
295+
296+
val trackableInputStream = object : java.io.InputStream() {
297+
override fun read(): Int = -1
298+
override fun close() {
299+
streamClosed = true
300+
operationOrder.add("stream_closed")
301+
super.close()
302+
}
303+
}
304+
305+
every { storage.readAsStream(any()) } returns trackableInputStream
306+
every { anyConstructed<HTTPClient>().upload(any()) } throws HTTPException(400, "", "", mutableMapOf())
307+
every { storage.removeFile(any()) } answers {
308+
operationOrder.add("file_removed")
309+
assertTrue(streamClosed, "InputStream must be closed before removeFile even when upload fails")
310+
true
311+
}
312+
313+
pipeline.put(event1)
314+
pipeline.put(event2)
315+
316+
// Give some time for async processing
317+
Thread.sleep(500)
318+
319+
// Verify correct operation order even with exception
320+
coVerify {
321+
storage.removeFile(any())
322+
}
323+
assertTrue(operationOrder == listOf("stream_closed", "file_removed"),
324+
"Expected order: [stream_closed, file_removed], but got: $operationOrder")
325+
}
237326
}

0 commit comments

Comments
 (0)