Skip to content

Commit 05c1c6b

Browse files
committed
Use coroutines in azure target storage
1 parent e3e03f6 commit 05c1c6b

File tree

2 files changed

+20
-12
lines changed

2 files changed

+20
-12
lines changed

src/main/java/org/radarbase/output/source/S3SourceStorage.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ class S3SourceStorage(
127127
return flow { emit(action()) }
128128
.retryWhen { cause, attempt ->
129129
if (cause is ErrorResponseException &&
130-
(cause.errorResponse().code() == "NoSuchKey" || cause.errorResponse().code() == "ResourceNotFound")
130+
(cause.errorResponse().code() == "NoSuchKey"
131+
|| cause.errorResponse().code() == "ResourceNotFound")
131132
) {
132133
throw FileNotFoundException()
133134
}

src/main/java/org/radarbase/output/target/AzureTargetStorage.kt

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -65,34 +65,41 @@ class AzureTargetStorage(private val config: AzureConfig) : TargetStorage {
6565
containerClient = serviceClient.getBlobContainerClient(container)
6666
}
6767

68-
override suspend fun status(path: Path): TargetStorage.PathStatus? {
69-
return try {
70-
TargetStorage.PathStatus(blob(path)
68+
override suspend fun status(path: Path): TargetStorage.PathStatus? =
69+
withContext(Dispatchers.IO) {
70+
try {
71+
TargetStorage.PathStatus(blob(path)
7172
.getPropertiesWithResponse(null, null, null)
7273
.value
7374
.blobSize)
74-
} catch (ex: Exception) {
75-
null
75+
} catch (ex: Exception) {
76+
null
77+
}
7678
}
77-
}
7879

7980
@Throws(IOException::class)
80-
override suspend fun newInputStream(path: Path): InputStream = blob(path).openInputStream()
81+
override suspend fun newInputStream(path: Path): InputStream = withContext(Dispatchers.IO) {
82+
blob(path).openInputStream()
83+
}
8184

8285
@Throws(IOException::class)
83-
override suspend fun move(oldPath: Path, newPath: Path) {
86+
override suspend fun move(oldPath: Path, newPath: Path) = withContext(Dispatchers.IO) {
8487
blob(newPath).copyFromUrl("${config.endpoint}/${config.container}/${oldPath.toKey()}")
85-
delete(oldPath)
88+
doDelete(oldPath)
8689
}
8790

8891
@Throws(IOException::class)
89-
override suspend fun store(localPath: Path, newPath: Path) {
92+
override suspend fun store(localPath: Path, newPath: Path) = withContext(Dispatchers.IO) {
9093
blob(newPath).uploadFromFile(localPath.toString(), true)
9194
localPath.deleteExisting()
9295
}
9396

9497
@Throws(IOException::class)
95-
override suspend fun delete(path: Path) {
98+
override suspend fun delete(path: Path) = withContext(Dispatchers.IO) {
99+
doDelete(path)
100+
}
101+
102+
private fun doDelete(path: Path) {
96103
blob(path).delete()
97104
}
98105

0 commit comments

Comments
 (0)