Skip to content

Commit 480e809

Browse files
Merge pull request #40 from RADAR-base/exclude_option
Add option to exclude topics
2 parents 85aa502 + f253eff commit 480e809

File tree

4 files changed

+23
-0
lines changed

4 files changed

+23
-0
lines changed

src/main/java/org/radarcns/hdfs/Application.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ public static void main(String [] args) {
115115
.tempDir(commandLineArgs.tmpDir)
116116
.numThreads(commandLineArgs.numThreads)
117117
.maxFilesPerTopic(commandLineArgs.maxFilesPerTopic)
118+
.excludeTopics(commandLineArgs.excludeTopics)
118119
.build();
119120

120121
HdfsSettings hdfsSettings = new HdfsSettings.Builder(commandLineArgs.hdfsName)

src/main/java/org/radarcns/hdfs/RadarHdfsRestructure.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class RadarHdfsRestructure {
6565
private final FileStoreFactory fileStoreFactory;
6666
private final RecordPathFactory pathFactory;
6767
private final long maxFilesPerTopic;
68+
private List<String> excludeTopics;
6869

6970
private LongAdder processedFileCount;
7071
private LongAdder processedRecordsCount;
@@ -78,6 +79,7 @@ public RadarHdfsRestructure(FileStoreFactory factory) {
7879
maxFiles = Long.MAX_VALUE;
7980
}
8081
this.maxFilesPerTopic = maxFiles;
82+
this.excludeTopics = factory.getSettings().getExcludeTopics();
8183
this.fileStoreFactory = factory;
8284
this.pathFactory = factory.getPathFactory();
8385
}
@@ -115,6 +117,7 @@ private List<TopicFileList> getTopicPaths(FileSystem fs, Path path, OffsetRangeS
115117
Map<String, List<TopicFile>> topics = walk(fs, path)
116118
.filter(f -> f.getName().endsWith(".avro"))
117119
.map(f -> new TopicFile(f.getParent().getParent().getName(), f))
120+
.filter(f -> !excludeTopics.contains(f.topic))
118121
.filter(f -> !seenFiles.contains(f.range))
119122
.collect(Collectors.groupingBy(TopicFile::getTopic));
120123

src/main/java/org/radarcns/hdfs/config/RestructureSettings.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.nio.file.Files;
2525
import java.nio.file.Path;
2626
import java.nio.file.Paths;
27+
import java.util.ArrayList;
28+
import java.util.List;
2729

2830
public class RestructureSettings {
2931
private final String compression;
@@ -34,6 +36,7 @@ public class RestructureSettings {
3436
private final Path outputPath;
3537
private final int numThreads;
3638
private final int maxFilesPerTopic;
39+
private final List<String> excludeTopics;
3740

3841
private RestructureSettings(Builder builder) {
3942
this.compression = builder.compression;
@@ -44,6 +47,7 @@ private RestructureSettings(Builder builder) {
4447
this.outputPath = builder.outputPath;
4548
this.numThreads = builder.numThreads;
4649
this.maxFilesPerTopic = builder.maxFilesPerTopic;
50+
this.excludeTopics = builder.excludeTopics;
4751
}
4852

4953
public String getCompression() {
@@ -78,6 +82,10 @@ public int getMaxFilesPerTopic() {
7882
return maxFilesPerTopic;
7983
}
8084

85+
public List<String> getExcludeTopics() {
86+
return excludeTopics;
87+
}
88+
8189
public static class Builder {
8290
private int numThreads = 1;
8391
private String compression;
@@ -87,6 +95,7 @@ public static class Builder {
8795
private Path tempDir;
8896
private final Path outputPath;
8997
public int maxFilesPerTopic;
98+
private List<String> excludeTopics;
9099

91100
public Builder(String outputPath) {
92101
this.outputPath = Paths.get(outputPath.replaceAll("/+$", ""));
@@ -136,6 +145,11 @@ public Builder maxFilesPerTopic(int num) {
136145
return this;
137146
}
138147

148+
public Builder excludeTopics(List<String> topics) {
149+
this.excludeTopics = topics;
150+
return this;
151+
}
152+
139153
public RestructureSettings build() {
140154
compression = nonNullOrDefault(compression, () -> "identity");
141155
format = nonNullOrDefault(format, () -> "csv");
@@ -146,6 +160,8 @@ public RestructureSettings build() {
146160
throw new UncheckedIOException("Cannot create temporary directory", ex);
147161
}
148162
});
163+
excludeTopics = nonNullOrDefault(excludeTopics, ArrayList::new);
164+
149165
return new RestructureSettings(this);
150166
}
151167
}

src/main/java/org/radarcns/hdfs/util/commandline/CommandLineArgs.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ public class CommandLineArgs {
9393
@Parameter(names = {"--max-files-per-topic"}, description = "Maximum number of records to process, per topic. Set below 1 to disable this option.")
9494
public int maxFilesPerTopic = 0;
9595

96+
@Parameter(names = {"--exclude-topic"}, description = "Topic to exclude when processing the records. Can be supplied more than once to exclude multiple topics.")
97+
public List<String> excludeTopics = new ArrayList<>();
98+
9699
public static <T> T nonNullOrDefault(T value, Supplier<T> defaultValue) {
97100
return value != null ? value : defaultValue.get();
98101
}

0 commit comments

Comments
 (0)