Skip to content

Commit 4a0702d

Browse files
committed
Remove HDFS from code
1 parent c5b6ab0 commit 4a0702d

File tree

14 files changed

+16
-59
lines changed

14 files changed

+16
-59
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Restructure Kafka connector output files
22

33
Data streamed by a Kafka Connector will be converted to a RADAR-base oriented output directory, by organizing it by project, user and collection date.
4-
It supports data written by [RADAR S3 sink connector](https://github.com/RADAR-base/RADAR-S3-Connector) is streamed to files based on topic name only. This package transforms that output to a local directory structure as follows: `projectId/userId/topic/date_hour.csv`. The date and hour are extracted from the `time` field of each record, and is formatted in UTC time. This package is included in the [RADAR-Docker](https://github.com/RADAR-base/RADAR-Docker) repository, in the `dcompose/radar-cp-hadoop-stack/bin/hdfs-restructure` script.
4+
It supports data written by [RADAR S3 sink connector](https://github.com/RADAR-base/RADAR-S3-Connector) is streamed to files based on topic name only. This package transforms that output to a local directory structure as follows: `projectId/userId/topic/date_hour.csv`. The date and hour are extracted from the `time` field of each record, and is formatted in UTC time.
55

66
## Upgrade instructions
77

@@ -90,7 +90,7 @@ By default, this will output the data in CSV format. If JSON format is preferred
9090
radar-output-restructure --format json --output-directory <output_folder> <input_path_1> [<input_path_2> ...]
9191
```
9292

93-
By default, files records are not deduplicated after writing. To enable this behaviour, specify the option `--deduplicate` or `-d`. This set to false by default because of an issue with Biovotion data. Please see - [issue #16](https://github.com/RADAR-base/Restructure-HDFS-topic/issues/16) before enabling it. Deduplication can also be enabled or disabled per topic using the config file. If lines should be deduplicated using a subset of fields, e.g. only `sourceId` and `time` define a unique record and only the last record with duplicate values should be kept, then specify `topics: <topicName>: deduplication: distinctFields: [key.sourceId, value.time]`.
93+
By default, files records are not deduplicated after writing. To enable this behaviour, specify the option `--deduplicate` or `-d`. This set to false by default because of an issue with Biovotion data. Please see - [issue #16](https://github.com/RADAR-base/radar-output-restructure/issues/16) before enabling it. Deduplication can also be enabled or disabled per topic using the config file. If lines should be deduplicated using a subset of fields, e.g. only `sourceId` and `time` define a unique record and only the last record with duplicate values should be kept, then specify `topics: <topicName>: deduplication: distinctFields: [key.sourceId, value.time]`.
9494

9595
### Compression
9696

restructure.yml

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ service:
77
# Source data resource
88
# @since: 0.7.0
99
source:
10-
type: s3 # hdfs, azure or s3
10+
type: s3 # azure or s3
1111
s3:
1212
endpoint: http://localhost:9000 # using AWS S3 endpoint is also possible.
1313
bucket: radar
@@ -31,9 +31,6 @@ source:
3131
# If true, try to read the metadata property "endOffset" to determine the
3232
# final offset of an input object.
3333
#endOffsetFromMetadata: false
34-
# only actually needed if source type is hdfs
35-
hdfs:
36-
nameNodes: [hdfs-namenode-1, hdfs-namenode-2]
3734
index:
3835
# Interval to fully synchronize the index with the storage
3936
fullSyncInterval: 3600
@@ -80,7 +77,7 @@ compression:
8077
# Compression type: none, zip or gzip
8178
type: gzip
8279
# Compression Factory class
83-
# factory: org.radarbase.hdfs.compression.CompressionFactory
80+
# factory: org.radarbase.output.compression.CompressionFactory
8481
# Additional compression properties
8582
# properties: {}
8683

@@ -96,7 +93,7 @@ format:
9693
# Ignore specific fields to consider records distinct. Disregarded if empty.
9794
# ignoreFields: []
9895
# Format factory class
99-
# factory: org.radarbase.hdfs.format.FormatFactory
96+
# factory: org.radarbase.output.format.FormatFactory
10097
# Additional format properties
10198
# properties: {}
10299
# Do not write certain fields to file

src/main/java/org/radarbase/output/cleaner/TimestampExtractionCheck.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class TimestampExtractionCheck(
2424
val result = resourceContext {
2525
val input = createResource { reader.newInput(file) }
2626
// processing zero-length files may trigger a stall. See:
27-
// https://github.com/RADAR-base/Restructure-HDFS-topic/issues/3
27+
// https://github.com/RADAR-base/radar-output-restructure/issues/3
2828
if (input.length() == 0L) {
2929
logger.warn("File {} has zero length, skipping.", file.path)
3030
return@resourceContext false

src/main/java/org/radarbase/output/config/CommandLineArgs.kt

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,13 @@ class CommandLineArgs {
5151
)
5252
var compression: String? = null
5353

54-
// Default set to false because causes loss of records from Biovotion data. https://github.com/RADAR-base/Restructure-HDFS-topic/issues/16
54+
// Default set to false because causes loss of records from Biovotion data. https://github.com/RADAR-base/radar-output-restructure/issues/16
5555
@Parameter(
5656
names = ["-d", "--deduplicate"],
5757
description = "Boolean to define if to use deduplication or not.",
5858
)
5959
var deduplicate: Boolean? = null
6060

61-
@Parameter(
62-
names = ["-n", "--nameservice"],
63-
description = "The HDFS name services to connect to. Eg - '<HOST>' for single configurations or <CLUSTER_ID> for high availability web services.",
64-
validateWith = [NonEmptyValidator::class],
65-
)
66-
var hdfsName: String? = null
67-
6861
@Parameter(
6962
names = ["-o", "--output-directory"],
7063
description = "The output folder where the files are to be extracted.",

src/main/java/org/radarbase/output/config/HdfsConfig.kt

Lines changed: 0 additions & 12 deletions
This file was deleted.

src/main/java/org/radarbase/output/config/PathConfig.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ data class PathConfig(
2020
val path: PathFormatterConfig = PathFormatterConfig(),
2121
/**
2222
* Bucket formatting rules for the target storage. If no configuration is provided, this
23-
* will not format any bucket for local or HDFS storage, and it will use the target bucket (s3)
23+
* will not format any bucket for local storage, and it will use the target bucket (s3)
2424
* or container (azure) as the default target bucket.
2525
*/
2626
val bucket: BucketFormatterConfig? = null,

src/main/java/org/radarbase/output/config/ResourceConfig.kt

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,31 +5,28 @@ import org.radarbase.output.config.ResourceType.Companion.toResourceType
55
import org.radarbase.output.config.RestructureConfig.Companion.copyOnChange
66

77
data class ResourceConfig(
8-
/** Resource type. One of s3, hdfs or local. */
8+
/** Resource type. One of s3, azure or local. */
99
val type: String,
1010
val s3: S3Config? = null,
11-
val hdfs: HdfsConfig? = null,
1211
val local: LocalConfig? = null,
1312
val azure: AzureConfig? = null,
1413
val index: StorageIndexConfig? = null,
1514
) {
1615
@get:JsonIgnore
1716
val sourceType: ResourceType by lazy {
18-
requireNotNull(type.toResourceType()) { "Unknown resource type $type, choose s3, hdfs or local" }
17+
requireNotNull(type.toResourceType()) { "Unknown resource type $type, choose s3, azure or local" }
1918
}
2019

2120
fun validate() {
2221
when (sourceType) {
2322
ResourceType.S3 -> checkNotNull(s3) { "No S3 configuration provided." }
24-
ResourceType.HDFS -> checkNotNull(hdfs) { "No HDFS configuration provided." }.also { it.validate() }
2523
ResourceType.LOCAL -> checkNotNull(local) { "No local configuration provided." }
2624
ResourceType.AZURE -> checkNotNull(azure) { "No Azure configuration provided." }
2725
}
2826
}
2927

3028
fun withEnv(prefix: String): ResourceConfig = when (sourceType) {
3129
ResourceType.S3 -> copyOnChange(s3, { it?.withEnv(prefix) }) { copy(s3 = it) }
32-
ResourceType.HDFS -> this
3330
ResourceType.LOCAL -> this
3431
ResourceType.AZURE -> copyOnChange(azure, { it?.withEnv(prefix) }) { copy(azure = it) }
3532
}

src/main/java/org/radarbase/output/config/ResourceType.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
package org.radarbase.output.config
22

33
enum class ResourceType {
4-
S3, HDFS, LOCAL, AZURE;
4+
S3, LOCAL, AZURE;
55

66
companion object {
77
fun String.toResourceType() = when (lowercase()) {
88
"s3" -> S3
9-
"hdfs" -> HDFS
109
"local" -> LOCAL
1110
"azure" -> AZURE
1211
else -> null

src/main/java/org/radarbase/output/config/RestructureConfig.kt

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,6 @@ data class RestructureConfig(
4343
args.tmpDir?.let { copy(paths = paths.copy(temp = Paths.get(it))) }
4444
args.inputPaths?.let { inputs -> copy(paths = paths.copy(inputs = inputs.map { Paths.get(it) })) }
4545
args.outputDirectory?.let { copy(paths = paths.copy(output = Paths.get(it))) }
46-
args.hdfsName?.let {
47-
copy(
48-
source = source.copy(
49-
hdfs = source.hdfs?.copy(nameNodes = listOf(it))
50-
?: HdfsConfig(nameNodes = listOf(it)),
51-
),
52-
)
53-
}
5446
args.format?.let { copy(format = format.copy(type = it)) }
5547
args.deduplicate?.let {
5648
copy(format = format.copy(deduplication = format.deduplication.copy(enable = it)))

src/main/java/org/radarbase/output/source/SourceStorageFactory.kt

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,6 @@ class SourceStorageFactory(
3131
val minioClient = requireNotNull(s3SourceClient) { "Missing S3 client configuration for source storage" }
3232
S3SourceStorage(minioClient, s3Config, tempPath)
3333
}
34-
ResourceType.HDFS -> {
35-
val storage = Class.forName("org.radarbase.output.source.HdfsSourceStorageFactory")
36-
val constructor =
37-
storage.getDeclaredConstructor(ResourceConfig::class.java, Path::class.java)
38-
val factory = constructor.newInstance(resourceConfig, tempPath)
39-
val createSourceStorage = storage.getDeclaredMethod("createSourceStorage")
40-
createSourceStorage.invoke(factory) as SourceStorage
41-
}
4234
ResourceType.AZURE -> {
4335
val azureClient = requireNotNull(azureSourceClient) { "Missing Azure client configuration for source storage" }
4436
val azureConfig = requireNotNull(resourceConfig.azure) { "Missing Azure configuration for source storage" }

0 commit comments

Comments
 (0)