2626import java .time .Instant ;
2727import java .util .Comparator ;
2828import java .util .List ;
29+ import java .util .Map ;
2930import java .util .concurrent .ExecutorService ;
3031import java .util .concurrent .Executors ;
3132import java .util .concurrent .ThreadLocalRandom ;
@@ -63,6 +64,7 @@ public class RadarHdfsRestructure {
6364 private final Configuration conf ;
6465 private final FileStoreFactory fileStoreFactory ;
6566 private final RecordPathFactory pathFactory ;
67+ private final long maxFilesPerTopic ;
6668
6769 private LongAdder processedFileCount ;
6870 private LongAdder processedRecordsCount ;
@@ -71,6 +73,11 @@ public RadarHdfsRestructure(FileStoreFactory factory) {
7173 conf = factory .getHdfsSettings ().getConfiguration ();
7274 conf .set ("fs.defaultFS" , "hdfs://" + factory .getHdfsSettings ().getHdfsName ());
7375 this .numThreads = factory .getSettings ().getNumThreads ();
76+ long maxFiles = factory .getSettings ().getMaxFilesPerTopic ();
77+ if (maxFiles < 1 ) {
78+ maxFiles = Long .MAX_VALUE ;
79+ }
80+ this .maxFilesPerTopic = maxFiles ;
7481 this .fileStoreFactory = factory ;
7582 this .pathFactory = factory .getPathFactory ();
7683 }
@@ -93,7 +100,7 @@ public void start(String directoryName) throws IOException {
93100
94101 Instant timeStart = Instant .now ();
95102 // Get filenames to process
96- TopicFileList topicPaths = getTopicPaths (fs , path , accountant .getOffsets ());
103+ List < TopicFileList > topicPaths = getTopicPaths (fs , path , accountant .getOffsets ());
97104 logger .info ("Time retrieving file list: {}" ,
98105 formatTime (Duration .between (timeStart , Instant .now ())));
99106
@@ -104,12 +111,16 @@ public void start(String directoryName) throws IOException {
104111 }
105112 }
106113
107- private TopicFileList getTopicPaths (FileSystem fs , Path path , OffsetRangeSet seenFiles ) {
108- return new TopicFileList ( walk (fs , path )
114+ private List < TopicFileList > getTopicPaths (FileSystem fs , Path path , OffsetRangeSet seenFiles ) {
115+ Map < String , List < TopicFile >> topics = walk (fs , path )
109116 .filter (f -> f .getName ().endsWith (".avro" ))
110117 .map (f -> new TopicFile (f .getParent ().getParent ().getName (), f ))
111118 .filter (f -> !seenFiles .contains (f .range ))
112- .collect (Collectors .toList ()));
119+ .collect (Collectors .groupingBy (TopicFile ::getTopic ));
120+
121+ return topics .values ().stream ()
122+ .map (v -> new TopicFileList (v .stream ().limit (maxFilesPerTopic )))
123+ .collect (Collectors .toList ());
113124 }
114125
115126 private Stream <Path > walk (FileSystem fs , Path path ) {
@@ -133,9 +144,16 @@ private Stream<Path> walk(FileSystem fs, Path path) {
133144 });
134145 }
135146
136- private void processPaths (TopicFileList topicPaths , Accountant accountant ) throws InterruptedException {
147+ private void processPaths (List <TopicFileList > topicPaths , Accountant accountant ) throws InterruptedException {
148+ int numFiles = topicPaths .stream ()
149+ .mapToInt (TopicFileList ::numberOfFiles )
150+ .sum ();
151+ long numOffsets = topicPaths .stream ()
152+ .mapToLong (TopicFileList ::numberOfOffsets )
153+ .sum ();
154+
137155 logger .info ("Converting {} files with {} records" ,
138- topicPaths . files . size () , NumberFormat .getNumberInstance ().format (topicPaths . size ));
156+ numFiles , NumberFormat .getNumberInstance ().format (numOffsets ));
139157
140158 processedFileCount = new LongAdder ();
141159 processedRecordsCount = new LongAdder ();
@@ -144,14 +162,13 @@ private void processPaths(TopicFileList topicPaths, Accountant accountant) throw
144162
145163 ExecutorService executor = Executors .newWorkStealingPool (pathFactory .isTopicPartitioned () ? this .numThreads : 1 );
146164
147- ProgressBar progressBar = new ProgressBar (topicPaths . size , 50 , 500 , TimeUnit .MILLISECONDS );
165+ ProgressBar progressBar = new ProgressBar (numOffsets , 50 , 500 , TimeUnit .MILLISECONDS );
148166
149167 // Actually process the files
150- topicPaths .files .stream ()
151- .collect (Collectors .groupingBy (TopicFile ::getTopic )).values ().stream ()
152- .map (TopicFileList ::new )
168+
169+ topicPaths .stream ()
153170 // ensure that largest values go first on the executor queue
154- .sorted (Comparator .comparingLong (TopicFileList ::getSize ).reversed ())
171+ .sorted (Comparator .comparingLong (TopicFileList ::numberOfOffsets ).reversed ())
155172 .forEach (paths -> {
156173 String size = NumberFormat .getNumberInstance ().format (paths .size );
157174 String topic = paths .files .get (0 ).topic ;
@@ -186,7 +203,7 @@ private void processPaths(TopicFileList topicPaths, Accountant accountant) throw
186203
187204 executor .shutdown ();
188205 executor .awaitTermination (Long .MAX_VALUE , TimeUnit .SECONDS );
189- progressBar .update (topicPaths . size );
206+ progressBar .update (numOffsets );
190207 }
191208
192209 private void processFile (TopicFile file , FileCacheStore cache ,
@@ -260,14 +277,18 @@ private static class TopicFileList {
260277 private final List <TopicFile > files ;
261278 private final long size ;
262279
263- public TopicFileList (List <TopicFile > files ) {
264- this .files = files ;
265- this .size = files .stream ()
280+ public TopicFileList (Stream <TopicFile > files ) {
281+ this .files = files . collect ( Collectors . toList ()) ;
282+ this .size = this . files .stream ()
266283 .mapToInt (TopicFile ::size )
267284 .sum ();
268285 }
269286
270- public long getSize () {
287+ public int numberOfFiles () {
288+ return this .files .size ();
289+ }
290+
291+ public long numberOfOffsets () {
271292 return size ;
272293 }
273294 }
0 commit comments