@@ -58,7 +58,7 @@ public class RadarHdfsRestructure {
58
58
private static final Logger logger = LoggerFactory .getLogger (RadarHdfsRestructure .class );
59
59
60
60
/** Number of offsets to process in a single task. */
61
- private static final int BATCH_SIZE = 500_000 ;
61
+ private static final long BATCH_SIZE = 500_000 ;
62
62
63
63
private final int numThreads ;
64
64
private final Configuration conf ;
@@ -174,8 +174,8 @@ private void processPaths(List<TopicFileList> topicPaths, Accountant accountant)
174
174
String topic = paths .files .get (0 ).topic ;
175
175
logger .info ("Processing {} records for topic {}" , size , topic );
176
176
executor .execute (() -> {
177
- int batchSize = ( int ) (BATCH_SIZE * ThreadLocalRandom .current ().nextDouble (0.75 , 1.25 ));
178
- int currentSize = 0 ;
177
+ long batchSize = Math . round (BATCH_SIZE * ThreadLocalRandom .current ().nextDouble (0.75 , 1.25 ));
178
+ long currentSize = 0 ;
179
179
try (FileCacheStore cache = fileStoreFactory .newFileCacheStore (accountant )) {
180
180
for (TopicFile file : paths .files ) {
181
181
try {
@@ -280,7 +280,7 @@ private static class TopicFileList {
280
280
public TopicFileList (Stream <TopicFile > files ) {
281
281
this .files = files .collect (Collectors .toList ());
282
282
this .size = this .files .stream ()
283
- .mapToInt (TopicFile ::size )
283
+ .mapToLong (TopicFile ::size )
284
284
.sum ();
285
285
}
286
286
@@ -308,8 +308,8 @@ public String getTopic() {
308
308
return topic ;
309
309
}
310
310
311
- public int size () {
312
- return 1 + ( int ) ( range .getOffsetTo () - range .getOffsetFrom () );
311
+ public long size () {
312
+ return 1 + range .getOffsetTo () - range .getOffsetFrom ();
313
313
}
314
314
}
315
315
}
0 commit comments