Skip to content

Commit adcac94

Browse files
authored
Merge pull request #75 from RADAR-base/release-1.1.0
Release 1.1.0
2 parents 85c1056 + 8e21a95 commit adcac94

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1522
-629
lines changed

README.md

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ It supports data written by [RADAR HDFS sink connector](https://github.com/RADAR
77

88
## Upgrade instructions
99

10-
When upgrading to version 1.0.0 from version 0.6.0 please follow the following instructions:
10+
When upgrading to version 1.0.0 or later from version 0.6.0 please follow the following instructions:
1111

1212
- This package now relies on Redis for locking and offset management. Please install Redis or use
1313
the docker-compose.yml file to start it.
@@ -61,7 +61,7 @@ When upgrading to version 0.6.0 from version 0.5.x or earlier, please follow the
6161

6262
This package is available as docker image [`radarbase/radar-output-restructure`](https://hub.docker.com/r/radarbase/radar-output-restructure). The entrypoint of the image is the current application. So in all the commands listed in usage, replace `radar-output-restructure` with for example:
6363
```shell
64-
docker run --rm -t --network hadoop -v "$PWD/output:/output" radarbase/radar-output-restructure:1.0.0-hdfs -n hdfs-namenode -o /output /myTopic
64+
docker run --rm -t --network hadoop -v "$PWD/output:/output" radarbase/radar-output-restructure:1.1.0-hdfs -n hdfs-namenode -o /output /myTopic
6565
```
6666
if your docker cluster is running in the `hadoop` network and your output directory should be `./output`.
6767

@@ -140,6 +140,22 @@ target:
140140
groupId: 100 # write as regular group, use -1 to use current user (default).
141141
```
142142
143+
### Cleaner
144+
145+
Source files can be automatically be removed by a cleaner process. This checks whether the file has already been extracted and is older than a configured age. This feature is not enabled by default. It can be configured in the `cleaner` configuration section:
146+
147+
```yaml
148+
cleaner:
149+
# Enable cleaning up old source files
150+
enable: true
151+
# Interval in seconds to clean data
152+
interval: 1260 # 21 minutes
153+
# Number of days after which a source file is considered old
154+
age: 7
155+
```
156+
157+
The cleaner can also be enabled with the `--cleaner` command-line flag. To run the cleaner as a separate process from output restructuring, start a process that has configuration property `worker: enable: false` or command-line argument `--no-restructure`.
158+
143159
### Service
144160

145161
To run the output generator as a service that will regularly poll the HDFS directory, add the `--service` flag and optionally the `--interval` flag to adjust the polling interval or use the corresponding configuration file parameters.
@@ -155,7 +171,7 @@ This package requires at least Java JDK 8. Build the distribution with
155171
and install the package into `/usr/local` with for example
156172
```shell
157173
sudo mkdir -p /usr/local
158-
sudo tar -xzf build/distributions/radar-output-restructure-1.0.0.tar.gz -C /usr/local --strip-components=1
174+
sudo tar -xzf build/distributions/radar-output-restructure-1.1.0.tar.gz -C /usr/local --strip-components=1
159175
```
160176

161177
Now the `radar-output-restructure` command should be available.

build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ plugins {
99
}
1010

1111
group 'org.radarbase'
12-
version '1.0.1'
12+
version '1.1.0'
1313
mainClassName = 'org.radarbase.output.Application'
1414

1515
sourceCompatibility = '1.8'
@@ -140,6 +140,7 @@ task integrationTest(type: Test) {
140140
testClassesDirs = sourceSets.integrationTest.output.classesDirs
141141
classpath = sourceSets.integrationTest.runtimeClasspath
142142
testLogging {
143+
showStandardStreams = true
143144
setExceptionFormat("full")
144145
}
145146
shouldRunAfter test

restructure.yml

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,30 @@ format:
6464
# Additional format properties
6565
# properties: {}
6666

67-
# Worker settings
67+
# Worker settings. Each worker thread has its own cache and topic, so the
68+
# settings only apply to a single thread.
6869
worker:
69-
# Maximum number of files and converters to keep open while processing
70+
# Enable processing files for extraction
71+
enable: true
72+
# Maximum number of files and converters to keep open while processing. Increasing this will
73+
# decrease memory pressure but slow down processing.
7074
cacheSize: 300
75+
# Maximum number of offsets in cache. Increasing this will decrease memory
76+
# pressure but slow down processing.
77+
cacheOffsetsSize: 500000
7178
# Number of threads to do processing on
7279
numThreads: 2
7380
# Maximum number of files to process in any given topic.
7481
maxFilesPerTopic: null
7582

83+
cleaner:
84+
# Enable cleaning up old source files
85+
enable: true
86+
# Interval in seconds to clean data
87+
interval: 1260 # 21 minutes
88+
# Number of days after which a source file is considered old
89+
age: 7
90+
7691
# Path settings
7792
paths:
7893
# Input directories in source

src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import org.radarbase.output.config.ResourceConfig
99
import org.radarbase.output.config.RestructureConfig
1010
import org.radarbase.output.config.S3Config
1111
import org.radarbase.output.util.Timer
12+
import java.nio.charset.StandardCharsets.UTF_8
1213
import java.nio.file.Paths
1314

1415
class RestructureS3IntegrationTest {
@@ -36,9 +37,16 @@ class RestructureS3IntegrationTest {
3637
sourceClient.makeBucket(sourceConfig.bucket)
3738
}
3839

39-
val statusFileName = Paths.get("in/application_server_status/partition=1/application_server_status+1+0000000018+0000000020.avro")
40-
javaClass.getResourceAsStream("/application_server_status/application_server_status+1+0000000018+0000000020.avro").use { statusFile ->
41-
sourceClient.putObject(sourceConfig.bucket, statusFileName.toString(), statusFile, PutObjectOptions(-1, MAX_PART_SIZE))
40+
val resourceFiles = listOf(
41+
"application_server_status/partition=1/application_server_status+1+0000000018+0000000020.avro",
42+
"application_server_status/partition=1/application_server_status+1+0000000021.avro",
43+
"android_phone_acceleration/partition=0/android_phone_acceleration+0+0003018784.avro"
44+
)
45+
val targetFiles = resourceFiles.map { Paths.get("in/$it") }
46+
resourceFiles.forEachIndexed { i, resourceFile ->
47+
javaClass.getResourceAsStream("/$resourceFile").use { statusFile ->
48+
sourceClient.putObject(sourceConfig.bucket, targetFiles[i].toString(), statusFile, PutObjectOptions(-1, MAX_PART_SIZE))
49+
}
4250
}
4351

4452
application.start()
@@ -50,17 +58,35 @@ class RestructureS3IntegrationTest {
5058

5159
application.redisPool.resource.use { redis ->
5260
assertEquals(1L, redis.del("offsets/application_server_status.json"))
61+
assertEquals(1L, redis.del("offsets/android_phone_acceleration.json"))
5362
}
5463

55-
val outputFolder = "output/STAGING_PROJECT/1543bc93-3c17-4381-89a5-c5d6272b827c/application_server_status"
64+
val firstParticipantOutput = "output/STAGING_PROJECT/1543bc93-3c17-4381-89a5-c5d6272b827c/application_server_status"
65+
val secondParticipantOutput = "output/radar-test-root/4ab9b985-6eec-4e51-9a29-f4c571c89f99/android_phone_acceleration"
5666
assertEquals(
5767
listOf(
58-
"$outputFolder/20200128_1300.csv",
59-
"$outputFolder/20200128_1400.csv",
60-
"$outputFolder/schema-application_server_status.json"),
68+
"$firstParticipantOutput/20200128_1300.csv",
69+
"$firstParticipantOutput/20200128_1400.csv",
70+
"$firstParticipantOutput/schema-application_server_status.json",
71+
"$secondParticipantOutput/20200528_1000.csv",
72+
"$secondParticipantOutput/schema-android_phone_acceleration.json"),
6173
files)
6274

63-
sourceClient.removeObject(sourceConfig.bucket, statusFileName.toString())
75+
println(targetClient.getObject(targetConfig.bucket, "$firstParticipantOutput/20200128_1300.csv").readBytes().toString(UTF_8))
76+
77+
val csvContents = """
78+
key.projectId,key.userId,key.sourceId,value.time,value.serverStatus,value.ipAddress
79+
STAGING_PROJECT,1543bc93-3c17-4381-89a5-c5d6272b827c,99caf236-bbe6-4eed-9c63-fba77349821d,1.58021982003E9,CONNECTED,
80+
STAGING_PROJECT,1543bc93-3c17-4381-89a5-c5d6272b827c,99caf236-bbe6-4eed-9c63-fba77349821d,1.58021982003E9,CONNECTED,
81+
82+
""".trimIndent()
83+
assertEquals(csvContents, targetClient.getObject(targetConfig.bucket, "$firstParticipantOutput/20200128_1300.csv")
84+
.readBytes()
85+
.toString(UTF_8))
86+
87+
targetFiles.forEach {
88+
sourceClient.removeObject(sourceConfig.bucket, it.toString())
89+
}
6490
sourceClient.removeBucket(sourceConfig.bucket)
6591
files.forEach {
6692
targetClient.removeObject(targetConfig.bucket, it)

src/integrationTest/java/org/radarbase/output/accounting/RedisRemoteLockManagerTest.kt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,41 +26,41 @@ internal class RedisRemoteLockManagerTest {
2626

2727
@Test
2828
fun testExclusiveLock() {
29-
lockManager1.acquireTopicLock("t").use { l1 ->
29+
lockManager1.acquireLock("t").use { l1 ->
3030
assertThat(l1, not(nullValue()))
31-
lockManager2.acquireTopicLock("t").use { l2 ->
31+
lockManager2.acquireLock("t").use { l2 ->
3232
assertThat(l2, nullValue())
3333
}
3434
}
3535
}
3636

3737
@Test
3838
fun testGranularityLock() {
39-
lockManager1.acquireTopicLock("t1").use { l1 ->
39+
lockManager1.acquireLock("t1").use { l1 ->
4040
assertThat(l1, not(nullValue()))
41-
lockManager2.acquireTopicLock("t2").use { l2 ->
41+
lockManager2.acquireLock("t2").use { l2 ->
4242
assertThat(l2, not(nullValue()))
4343
}
4444
}
4545
}
4646

4747
@Test
4848
fun testNonOverlappingLock() {
49-
lockManager1.acquireTopicLock("t").use { l1 ->
49+
lockManager1.acquireLock("t").use { l1 ->
5050
assertThat(l1, not(nullValue()))
5151
}
52-
lockManager2.acquireTopicLock("t").use { l2 ->
52+
lockManager2.acquireLock("t").use { l2 ->
5353
assertThat(l2, not(nullValue()))
5454
}
5555
}
5656

5757

5858
@Test
5959
fun testNonOverlappingLockSameManager() {
60-
lockManager1.acquireTopicLock("t").use { l1 ->
60+
lockManager1.acquireLock("t").use { l1 ->
6161
assertThat(l1, not(nullValue()))
6262
}
63-
lockManager1.acquireTopicLock("t").use { l2 ->
63+
lockManager1.acquireLock("t").use { l2 ->
6464
assertThat(l2, not(nullValue()))
6565
}
6666
}

src/integrationTest/resources/application_server_status/application_server_status+1+0000000018+0000000020.avro renamed to src/integrationTest/resources/application_server_status/partition=1/application_server_status+1+0000000018+0000000020.avro

File renamed without changes.

src/main/java/org/radarbase/output/Application.kt

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.radarbase.output.target.TargetStorageFactory
3131
import org.radarbase.output.util.ProgressBar.Companion.format
3232
import org.radarbase.output.util.Timer
3333
import org.radarbase.output.worker.FileCacheStore
34+
import org.radarbase.output.cleaner.SourceDataCleaner
3435
import org.radarbase.output.worker.RadarKafkaRestructure
3536
import org.slf4j.LoggerFactory
3637
import redis.clients.jedis.JedisPool
@@ -88,7 +89,12 @@ class Application(
8889
if (config.service.enable) {
8990
runService()
9091
} else {
91-
runRestructure()
92+
if (config.worker.enable) {
93+
runRestructure()
94+
}
95+
if (config.cleaner.enable) {
96+
runCleaner()
97+
}
9298
}
9399
}
94100

@@ -97,8 +103,15 @@ class Application(
97103
logger.info("Press Ctrl+C to exit...")
98104
val executorService = Executors.newSingleThreadScheduledExecutor()
99105

100-
executorService.scheduleAtFixedRate(::runRestructure,
101-
config.service.interval / 4, config.service.interval, TimeUnit.SECONDS)
106+
if (config.worker.enable) {
107+
executorService.scheduleAtFixedRate(::runRestructure,
108+
config.service.interval / 4, config.service.interval, TimeUnit.SECONDS)
109+
}
110+
111+
if (config.cleaner.enable) {
112+
executorService.scheduleAtFixedRate(::runCleaner,
113+
config.cleaner.interval / 4, config.cleaner.interval, TimeUnit.SECONDS)
114+
}
102115

103116
try {
104117
Thread.sleep(java.lang.Long.MAX_VALUE)
@@ -114,33 +127,53 @@ class Application(
114127
}
115128
}
116129

130+
private fun runCleaner() {
131+
val timeStart = Instant.now()
132+
133+
try {
134+
val numberFormat = NumberFormat.getNumberInstance()
135+
SourceDataCleaner(this).use { cleaner ->
136+
for (input in config.paths.inputs) {
137+
logger.info("Cleaning {}", input)
138+
cleaner.process(input.toString())
139+
}
140+
logger.info("Cleaned up {} files in {}",
141+
numberFormat.format(cleaner.deletedFileCount.sum()),
142+
timeStart.durationSince().format())
143+
}
144+
} catch (e: InterruptedException) {
145+
logger.error("Cleaning interrupted")
146+
} catch (ex: Exception) {
147+
logger.error("Failed to clean records", ex)
148+
} finally {
149+
if (Timer.isEnabled) {
150+
logger.info("{}", Timer)
151+
Timer.reset()
152+
}
153+
}
154+
}
155+
117156
private fun runRestructure() {
118157
val timeStart = Instant.now()
119158
try {
159+
val numberFormat = NumberFormat.getNumberInstance()
160+
120161
RadarKafkaRestructure(this).use { restructure ->
121162
for (input in config.paths.inputs) {
122163
logger.info("In: {}", input)
123164
logger.info("Out: {}", pathFactory.root)
124165
restructure.process(input.toString())
125166
}
126167

127-
val numberFormat = NumberFormat.getNumberInstance()
128168
logger.info("Processed {} files and {} records in {}",
129-
numberFormat.format(restructure.processedFileCount),
130-
numberFormat.format(restructure.processedRecordsCount),
169+
numberFormat.format(restructure.processedFileCount.sum()),
170+
numberFormat.format(restructure.processedRecordsCount.sum()),
131171
timeStart.durationSince().format())
132-
if (restructure.deletedFileCount.sum() > 0) {
133-
logger.info("Deleted {} old files", numberFormat.format(restructure.deletedFileCount))
134-
} else {
135-
logger.info("No files were deleted")
136-
}
137172
}
138-
} catch (ex: Exception) {
139-
logger.error("Failed to process records", ex)
140-
} catch (ex: IOException) {
141-
logger.error("Processing failed", ex)
142173
} catch (e: InterruptedException) {
143174
logger.error("Processing interrupted")
175+
} catch (ex: Exception) {
176+
logger.error("Failed to process records", ex)
144177
} finally {
145178
// Print timings and reset the timings for the next iteration.
146179
if (Timer.isEnabled) {

src/main/java/org/radarbase/output/accounting/Accountant.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ constructor(factory: FileStoreFactory, topic: String) : Flushable, Closeable {
5959
} else null
6060
}
6161

62+
open fun remove(range: TopicPartitionOffsetRange) = time("accounting.remove") {
63+
offsetFile.offsets.remove(range)
64+
offsetFile.triggerWrite()
65+
}
66+
6267
open fun process(ledger: Ledger) = time("accounting.process") {
6368
offsetFile.addAll(ledger.offsets)
6469
offsetFile.triggerWrite()

0 commit comments

Comments
 (0)