Skip to content

Commit 1d47599

Browse files
committed
working non-deadlock coroutine implementation
1 parent a6680f7 commit 1d47599

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1148
-775
lines changed

build.gradle.kts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ dependencies {
5050
runtimeOnly("org.xerial.snappy:snappy-java:$snappyVersion")
5151

5252
implementation(kotlin("reflect"))
53+
val coroutinesVersion: String by project
54+
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion")
5355

5456
val jacksonVersion: String by project
5557
api(platform("com.fasterxml.jackson:jackson-bom:$jacksonVersion"))
@@ -93,6 +95,7 @@ dependencies {
9395

9496
val radarSchemasVersion: String by project
9597
testImplementation("org.radarbase:radar-schemas-commons:$radarSchemasVersion")
98+
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:$coroutinesVersion")
9699

97100
val junitVersion: String by project
98101
testImplementation("org.junit.jupiter:junit-jupiter-api:$junitVersion")

gradle.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ dependencyUpdateVersion=0.42.0
77
nexusPublishVersion=1.1.0
88
jsoupVersion=1.14.3
99

10+
coroutinesVersion=1.6.1
1011
avroVersion=1.11.0
1112
snappyVersion=1.1.8.4
1213
jacksonVersion=2.13.2.20220328

restructure.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
service:
22
# Whether to run the application as a polling service.
3-
enable: false
3+
enable: true
44
# Polling interval in seconds.
55
interval: 30
66

@@ -118,7 +118,7 @@ cleaner:
118118
# Enable cleaning up old source files
119119
enable: true
120120
# Interval in seconds to clean data
121-
interval: 1260 # 21 minutes
121+
interval: 50 # 21 minutes
122122
# Number of days after which a source file is considered old
123123
age: 7
124124

src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt

Lines changed: 82 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@ package org.radarbase.output
22

33
import io.minio.*
44
import io.minio.ObjectWriteArgs.MAX_PART_SIZE
5+
import kotlinx.coroutines.*
6+
import kotlinx.coroutines.test.runTest
57
import org.junit.jupiter.api.Assertions.assertEquals
68
import org.junit.jupiter.api.Test
79
import org.radarbase.output.config.*
10+
import org.radarbase.output.util.SuspendedCloseable.Companion.useSuspended
811
import org.radarbase.output.util.Timer
912
import org.radarbase.output.util.bucketBuild
1013
import org.radarbase.output.util.objectBuild
@@ -13,7 +16,7 @@ import java.nio.file.Paths
1316

1417
class RestructureS3IntegrationTest {
1518
@Test
16-
fun integration() {
19+
fun integration() = runTest {
1720
Timer.isEnabled = true
1821
val sourceConfig = S3Config(
1922
endpoint ="http://localhost:9000",
@@ -38,77 +41,106 @@ class RestructureS3IntegrationTest {
3841
}
3942

4043
val resourceFiles = listOf(
41-
"application_server_status/partition=1/application_server_status+1+0000000018+0000000020.avro",
42-
"application_server_status/partition=1/application_server_status+1+0000000021.avro",
43-
"android_phone_acceleration/partition=0/android_phone_acceleration+0+0003018784.avro"
44+
"application_server_status/partition=1/application_server_status+1+0000000018+0000000020.avro",
45+
"application_server_status/partition=1/application_server_status+1+0000000021.avro",
46+
"android_phone_acceleration/partition=0/android_phone_acceleration+0+0003018784.avro",
4447
)
4548
val targetFiles = resourceFiles.map { Paths.get("in/$it") }
46-
resourceFiles.forEachIndexed { i, resourceFile ->
47-
javaClass.getResourceAsStream("/$resourceFile").use { statusFile ->
48-
sourceClient.putObject(PutObjectArgs.Builder().objectBuild(sourceConfig.bucket, targetFiles[i]) {
49-
stream(statusFile, -1, MAX_PART_SIZE)
50-
})
49+
resourceFiles.mapIndexed { i, resourceFile ->
50+
launch(Dispatchers.IO) {
51+
this@RestructureS3IntegrationTest.javaClass.getResourceAsStream("/$resourceFile")
52+
.useSuspended { statusFile ->
53+
sourceClient.putObject(PutObjectArgs.Builder()
54+
.objectBuild(sourceConfig.bucket, targetFiles[i]) {
55+
stream(statusFile, -1, MAX_PART_SIZE)
56+
})
57+
}
5158
}
52-
}
59+
}.joinAll()
5360

5461
application.start()
5562

5663
val targetClient = targetConfig.createS3Client()
57-
val files = targetClient.listObjects(ListObjectsArgs.Builder().bucketBuild(targetConfig.bucket) {
58-
prefix("output")
59-
recursive(true)
60-
useUrlEncodingType(false)
61-
})
62-
.map { it.get().objectName() }
63-
.toList()
6464

6565
application.redisHolder.execute { redis ->
66-
assertEquals(1L, redis.del("offsets/application_server_status.json"))
67-
assertEquals(1L, redis.del("offsets/android_phone_acceleration.json"))
66+
launch { assertEquals(1L, redis.del("offsets/application_server_status.json")) }
67+
launch { assertEquals(1L, redis.del("offsets/android_phone_acceleration.json")) }
6868
}
6969

7070
val firstParticipantOutput = "output/STAGING_PROJECT/1543bc93-3c17-4381-89a5-c5d6272b827c/application_server_status"
7171
val secondParticipantOutput = "output/radar-test-root/4ab9b985-6eec-4e51-9a29-f4c571c89f99/android_phone_acceleration"
72-
assertEquals(
73-
listOf(
74-
"$firstParticipantOutput/20200128_1300.csv",
75-
"$firstParticipantOutput/20200128_1400.csv",
76-
"$firstParticipantOutput/schema-application_server_status.json",
77-
"$secondParticipantOutput/20200528_1000.csv",
78-
"$secondParticipantOutput/schema-android_phone_acceleration.json"),
79-
files)
8072

81-
println(targetClient.getObject(GetObjectArgs.builder()
82-
.bucketBuild(targetConfig.bucket) {
83-
`object`("$firstParticipantOutput/20200128_1300.csv")
84-
}
85-
).readBytes().toString(UTF_8))
86-
87-
val csvContents = """
73+
val files = coroutineScope {
74+
launch(Dispatchers.IO) {
75+
val csvContents = """
8876
key.projectId,key.userId,key.sourceId,value.time,value.serverStatus,value.ipAddress
8977
STAGING_PROJECT,1543bc93-3c17-4381-89a5-c5d6272b827c,99caf236-bbe6-4eed-9c63-fba77349821d,1.58021982003E9,CONNECTED,
9078
STAGING_PROJECT,1543bc93-3c17-4381-89a5-c5d6272b827c,99caf236-bbe6-4eed-9c63-fba77349821d,1.58021982003E9,CONNECTED,
9179
9280
""".trimIndent()
93-
assertEquals(csvContents, targetClient.getObject(GetObjectArgs.Builder()
94-
.bucketBuild(targetConfig.bucket) {
95-
`object`("$firstParticipantOutput/20200128_1300.csv")
96-
})
97-
.readBytes()
98-
.toString(UTF_8))
81+
assertEquals(csvContents, targetClient.getObject(GetObjectArgs.Builder()
82+
.bucketBuild(targetConfig.bucket) {
83+
`object`("$firstParticipantOutput/20200128_1300.csv")
84+
})
85+
.readBytes()
86+
.toString(UTF_8))
87+
}
9988

100-
targetFiles.forEach {
101-
sourceClient.removeObject(RemoveObjectArgs.Builder()
102-
.objectBuild(sourceConfig.bucket, it))
103-
}
104-
sourceClient.removeBucket(RemoveBucketArgs.Builder().bucketBuild(sourceConfig.bucket))
105-
files.forEach {
106-
targetClient.removeObject(RemoveObjectArgs.Builder().bucketBuild(targetConfig.bucket) {
107-
`object`(it)
108-
})
89+
withContext(Dispatchers.IO) {
90+
targetClient.listObjects(ListObjectsArgs.Builder()
91+
.bucketBuild(targetConfig.bucket) {
92+
prefix("output")
93+
recursive(true)
94+
useUrlEncodingType(false)
95+
})
96+
.map { it.get().objectName() }
97+
.toHashSet()
98+
}
10999
}
110-
targetClient.removeBucket(RemoveBucketArgs.Builder().bucketBuild(targetConfig.bucket))
111100

101+
assertEquals(
102+
hashSetOf(
103+
"$firstParticipantOutput/20200128_1300.csv",
104+
"$firstParticipantOutput/20200128_1400.csv",
105+
"$firstParticipantOutput/schema-application_server_status.json",
106+
"$secondParticipantOutput/20200528_1000.csv",
107+
"$secondParticipantOutput/schema-android_phone_acceleration.json",
108+
),
109+
files,
110+
)
111+
112+
coroutineScope {
113+
// delete source files
114+
launch {
115+
targetFiles.map {
116+
launch(Dispatchers.IO) {
117+
sourceClient.removeObject(RemoveObjectArgs.Builder()
118+
.objectBuild(sourceConfig.bucket, it))
119+
}
120+
}.joinAll()
121+
122+
launch(Dispatchers.IO) {
123+
sourceClient.removeBucket(RemoveBucketArgs.Builder()
124+
.bucketBuild(sourceConfig.bucket))
125+
}
126+
}
127+
128+
// delete target files
129+
launch {
130+
files.map {
131+
launch(Dispatchers.IO) {
132+
targetClient.removeObject(RemoveObjectArgs.Builder()
133+
.bucketBuild(targetConfig.bucket) {
134+
`object`(it)
135+
})
136+
}
137+
}.joinAll()
138+
launch(Dispatchers.IO) {
139+
targetClient.removeBucket(RemoveBucketArgs.Builder()
140+
.bucketBuild(targetConfig.bucket))
141+
}
142+
}
143+
}
112144
println(Timer)
113145
}
114146
}

src/integrationTest/java/org/radarbase/output/accounting/OffsetRangeRedisTest.kt

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package org.radarbase.output.accounting
22

3+
import kotlinx.coroutines.test.runTest
34
import org.junit.jupiter.api.AfterEach
45
import org.junit.jupiter.api.Assertions.*
56
import org.junit.jupiter.api.BeforeEach
67
import org.junit.jupiter.api.Test
78
import org.radarbase.output.accounting.OffsetRedisPersistence.Companion.redisOffsetReader
9+
import org.radarbase.output.util.SuspendedCloseable.Companion.useSuspended
810
import redis.clients.jedis.JedisPool
911
import java.io.IOException
1012
import java.nio.file.Path
@@ -27,16 +29,18 @@ class OffsetRangeRedisTest {
2729

2830
@AfterEach
2931
fun tearDown() {
30-
redisHolder.execute { it.del(testFile.toString()) }
32+
runTest {
33+
redisHolder.execute { it.del(testFile.toString()) }
34+
}
3135
}
3236

3337
@Test
3438
@Throws(IOException::class)
35-
fun readEmpty() {
39+
fun readEmpty() = runTest {
3640
assertNull(offsetPersistence.read(testFile))
3741

3842
// will create on write
39-
offsetPersistence.writer(testFile).close()
43+
offsetPersistence.writer(this@runTest, testFile).closeAndJoin()
4044

4145
assertEquals(true, offsetPersistence.read(testFile)?.isEmpty)
4246

@@ -47,8 +51,8 @@ class OffsetRangeRedisTest {
4751

4852
@Test
4953
@Throws(IOException::class)
50-
fun write() {
51-
offsetPersistence.writer(testFile).use { rangeFile ->
54+
fun write() = runTest {
55+
offsetPersistence.writer(this@runTest, testFile).useSuspended { rangeFile ->
5256
rangeFile.add(TopicPartitionOffsetRange.parseFilename("a+0+0+1", lastModified))
5357
rangeFile.add(TopicPartitionOffsetRange.parseFilename("a+0+1+2", lastModified))
5458
}
@@ -67,8 +71,8 @@ class OffsetRangeRedisTest {
6771

6872
@Test
6973
@Throws(IOException::class)
70-
fun cleanUp() {
71-
offsetPersistence.writer(testFile).use { rangeFile ->
74+
fun cleanUp() = runTest {
75+
offsetPersistence.writer(this@runTest, testFile).useSuspended { rangeFile ->
7276
rangeFile.add(TopicPartitionOffsetRange.parseFilename("a+0+0+1", lastModified))
7377
rangeFile.add(TopicPartitionOffsetRange.parseFilename("a+0+1+2", lastModified))
7478
rangeFile.add(TopicPartitionOffsetRange.parseFilename("a+0+4+4", lastModified))

src/integrationTest/java/org/radarbase/output/accounting/RedisRemoteLockManagerTest.kt

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package org.radarbase.output.accounting
22

3+
import kotlinx.coroutines.test.runTest
34
import org.hamcrest.MatcherAssert.assertThat
45
import org.hamcrest.Matchers.not
56
import org.hamcrest.Matchers.nullValue
67
import org.junit.jupiter.api.AfterEach
78
import org.junit.jupiter.api.BeforeEach
89
import org.junit.jupiter.api.Test
10+
import org.radarbase.output.util.SuspendedCloseable.Companion.useSuspended
911
import redis.clients.jedis.JedisPool
1012

1113
internal class RedisRemoteLockManagerTest {
@@ -26,42 +28,42 @@ internal class RedisRemoteLockManagerTest {
2628
}
2729

2830
@Test
29-
fun testExclusiveLock() {
30-
lockManager1.acquireLock("t").use { l1 ->
31+
fun testExclusiveLock() = runTest {
32+
lockManager1.acquireLock("t").useSuspended { l1 ->
3133
assertThat(l1, not(nullValue()))
32-
lockManager2.acquireLock("t").use { l2 ->
34+
lockManager2.acquireLock("t").useSuspended { l2 ->
3335
assertThat(l2, nullValue())
3436
}
3537
}
3638
}
3739

3840
@Test
39-
fun testGranularityLock() {
40-
lockManager1.acquireLock("t1").use { l1 ->
41+
fun testGranularityLock() = runTest {
42+
lockManager1.acquireLock("t1").useSuspended { l1 ->
4143
assertThat(l1, not(nullValue()))
42-
lockManager2.acquireLock("t2").use { l2 ->
44+
lockManager2.acquireLock("t2").useSuspended { l2 ->
4345
assertThat(l2, not(nullValue()))
4446
}
4547
}
4648
}
4749

4850
@Test
49-
fun testNonOverlappingLock() {
50-
lockManager1.acquireLock("t").use { l1 ->
51+
fun testNonOverlappingLock() = runTest {
52+
lockManager1.acquireLock("t").useSuspended { l1 ->
5153
assertThat(l1, not(nullValue()))
5254
}
53-
lockManager2.acquireLock("t").use { l2 ->
55+
lockManager2.acquireLock("t").useSuspended { l2 ->
5456
assertThat(l2, not(nullValue()))
5557
}
5658
}
5759

5860

5961
@Test
60-
fun testNonOverlappingLockSameManager() {
61-
lockManager1.acquireLock("t").use { l1 ->
62+
fun testNonOverlappingLockSameManager() = runTest {
63+
lockManager1.acquireLock("t").useSuspended { l1 ->
6264
assertThat(l1, not(nullValue()))
6365
}
64-
lockManager1.acquireLock("t").use { l2 ->
66+
lockManager1.acquireLock("t").useSuspended { l2 ->
6567
assertThat(l2, not(nullValue()))
6668
}
6769
}

0 commit comments

Comments
 (0)