@@ -2,9 +2,12 @@ package org.radarbase.output
2
2
3
3
import io.minio.*
4
4
import io.minio.ObjectWriteArgs.MAX_PART_SIZE
5
+ import kotlinx.coroutines.*
6
+ import kotlinx.coroutines.test.runTest
5
7
import org.junit.jupiter.api.Assertions.assertEquals
6
8
import org.junit.jupiter.api.Test
7
9
import org.radarbase.output.config.*
10
+ import org.radarbase.output.util.SuspendedCloseable.Companion.useSuspended
8
11
import org.radarbase.output.util.Timer
9
12
import org.radarbase.output.util.bucketBuild
10
13
import org.radarbase.output.util.objectBuild
@@ -13,7 +16,7 @@ import java.nio.file.Paths
13
16
14
17
class RestructureS3IntegrationTest {
15
18
@Test
16
- fun integration () {
19
+ fun integration () = runTest {
17
20
Timer .isEnabled = true
18
21
val sourceConfig = S3Config (
19
22
endpoint = " http://localhost:9000" ,
@@ -38,77 +41,109 @@ class RestructureS3IntegrationTest {
38
41
}
39
42
40
43
val resourceFiles = listOf (
41
- " application_server_status/partition=1/application_server_status+1+0000000018+0000000020.avro" ,
42
- " application_server_status/partition=1/application_server_status+1+0000000021.avro" ,
43
- " android_phone_acceleration/partition=0/android_phone_acceleration+0+0003018784.avro"
44
+ " application_server_status/partition=1/application_server_status+1+0000000018+0000000020.avro" ,
45
+ " application_server_status/partition=1/application_server_status+1+0000000021.avro" ,
46
+ " android_phone_acceleration/partition=0/android_phone_acceleration+0+0003018784.avro" ,
44
47
)
45
48
val targetFiles = resourceFiles.map { Paths .get(" in/$it " ) }
46
- resourceFiles.forEachIndexed { i, resourceFile ->
47
- javaClass.getResourceAsStream(" /$resourceFile " ).use { statusFile ->
48
- sourceClient.putObject(PutObjectArgs .Builder ().objectBuild(sourceConfig.bucket, targetFiles[i]) {
49
- stream(statusFile, - 1 , MAX_PART_SIZE )
50
- })
49
+ resourceFiles.mapIndexed { i, resourceFile ->
50
+ launch(Dispatchers .IO ) {
51
+ this @RestructureS3IntegrationTest.javaClass.getResourceAsStream(" /$resourceFile " )
52
+ .useSuspended { statusFile ->
53
+ sourceClient.putObject(PutObjectArgs .Builder ()
54
+ .objectBuild(sourceConfig.bucket, targetFiles[i]) {
55
+ stream(statusFile, - 1 , MAX_PART_SIZE )
56
+ })
57
+ }
51
58
}
52
- }
59
+ }.joinAll()
53
60
54
61
application.start()
55
62
56
63
val targetClient = targetConfig.createS3Client()
57
- val files = targetClient.listObjects(ListObjectsArgs .Builder ().bucketBuild(targetConfig.bucket) {
58
- prefix(" output" )
59
- recursive(true )
60
- useUrlEncodingType(false )
61
- })
62
- .map { it.get().objectName() }
63
- .toList()
64
64
65
65
application.redisHolder.execute { redis ->
66
- assertEquals(1L , redis.del(" offsets/application_server_status.json" ))
67
- assertEquals(1L , redis.del(" offsets/android_phone_acceleration.json" ))
66
+ launch { assertEquals(1L , redis.del(" offsets/application_server_status.json" )) }
67
+ launch { assertEquals(1L , redis.del(" offsets/android_phone_acceleration.json" )) }
68
68
}
69
69
70
70
val firstParticipantOutput = " output/STAGING_PROJECT/1543bc93-3c17-4381-89a5-c5d6272b827c/application_server_status"
71
71
val secondParticipantOutput = " output/radar-test-root/4ab9b985-6eec-4e51-9a29-f4c571c89f99/android_phone_acceleration"
72
- assertEquals(
73
- listOf (
74
- " $firstParticipantOutput /20200128_1300.csv" ,
75
- " $firstParticipantOutput /20200128_1400.csv" ,
76
- " $firstParticipantOutput /schema-application_server_status.json" ,
77
- " $secondParticipantOutput /20200528_1000.csv" ,
78
- " $secondParticipantOutput /schema-android_phone_acceleration.json" ),
79
- files)
80
-
81
- println (targetClient.getObject(GetObjectArgs .builder()
82
- .bucketBuild(targetConfig.bucket) {
83
- `object `(" $firstParticipantOutput /20200128_1300.csv" )
84
- }
85
- ).readBytes().toString(UTF_8 ))
86
72
87
- val csvContents = """
73
+ val files = coroutineScope {
74
+ launch(Dispatchers .IO ) {
75
+ val csvContents = """
88
76
key.projectId,key.userId,key.sourceId,value.time,value.serverStatus,value.ipAddress
89
77
STAGING_PROJECT,1543bc93-3c17-4381-89a5-c5d6272b827c,99caf236-bbe6-4eed-9c63-fba77349821d,1.58021982003E9,CONNECTED,
90
78
STAGING_PROJECT,1543bc93-3c17-4381-89a5-c5d6272b827c,99caf236-bbe6-4eed-9c63-fba77349821d,1.58021982003E9,CONNECTED,
91
79
92
80
""" .trimIndent()
93
- assertEquals(csvContents, targetClient.getObject(GetObjectArgs .Builder ()
94
- .bucketBuild(targetConfig.bucket) {
95
- `object `(" $firstParticipantOutput /20200128_1300.csv" )
96
- })
97
- .readBytes()
98
- .toString(UTF_8 ))
99
-
100
- targetFiles.forEach {
101
- sourceClient.removeObject(RemoveObjectArgs .Builder ()
102
- .objectBuild(sourceConfig.bucket, it))
103
- }
104
- sourceClient.removeBucket(RemoveBucketArgs .Builder ().bucketBuild(sourceConfig.bucket))
105
- files.forEach {
106
- targetClient.removeObject(RemoveObjectArgs .Builder ().bucketBuild(targetConfig.bucket) {
107
- `object `(it)
108
- })
81
+
82
+ val targetContent = targetClient.getObject(GetObjectArgs .Builder ()
83
+ .bucketBuild(targetConfig.bucket) {
84
+ `object `(" $firstParticipantOutput /20200128_1300.csv" )
85
+ }).use { response ->
86
+ response.readBytes()
87
+ }
88
+
89
+ assertEquals(csvContents, targetContent.toString(UTF_8 ))
90
+ }
91
+
92
+ withContext(Dispatchers .IO ) {
93
+ targetClient.listObjects(ListObjectsArgs .Builder ()
94
+ .bucketBuild(targetConfig.bucket) {
95
+ prefix(" output" )
96
+ recursive(true )
97
+ useUrlEncodingType(false )
98
+ })
99
+ .map { it.get().objectName() }
100
+ .toHashSet()
101
+ }
109
102
}
110
- targetClient.removeBucket(RemoveBucketArgs .Builder ().bucketBuild(targetConfig.bucket))
111
103
104
+ assertEquals(
105
+ hashSetOf(
106
+ " $firstParticipantOutput /20200128_1300.csv" ,
107
+ " $firstParticipantOutput /20200128_1400.csv" ,
108
+ " $firstParticipantOutput /schema-application_server_status.json" ,
109
+ " $secondParticipantOutput /20200528_1000.csv" ,
110
+ " $secondParticipantOutput /schema-android_phone_acceleration.json" ,
111
+ ),
112
+ files,
113
+ )
114
+
115
+ coroutineScope {
116
+ // delete source files
117
+ launch {
118
+ targetFiles.map {
119
+ launch(Dispatchers .IO ) {
120
+ sourceClient.removeObject(RemoveObjectArgs .Builder ()
121
+ .objectBuild(sourceConfig.bucket, it))
122
+ }
123
+ }.joinAll()
124
+
125
+ launch(Dispatchers .IO ) {
126
+ sourceClient.removeBucket(RemoveBucketArgs .Builder ()
127
+ .bucketBuild(sourceConfig.bucket))
128
+ }
129
+ }
130
+
131
+ // delete target files
132
+ launch {
133
+ files.map {
134
+ launch(Dispatchers .IO ) {
135
+ targetClient.removeObject(RemoveObjectArgs .Builder ()
136
+ .bucketBuild(targetConfig.bucket) {
137
+ `object `(it)
138
+ })
139
+ }
140
+ }.joinAll()
141
+ launch(Dispatchers .IO ) {
142
+ targetClient.removeBucket(RemoveBucketArgs .Builder ()
143
+ .bucketBuild(targetConfig.bucket))
144
+ }
145
+ }
146
+ }
112
147
println (Timer )
113
148
}
114
149
}
0 commit comments