Skip to content

Commit b00c735

Browse files
authored
Merge pull request #252 from datastax/issue/242-partitionfile-outofmemoryerror
Fix OOM issue # 242
2 parents c6107a3 + 08dbf9e commit b00c735

File tree

2 files changed

+77
-10
lines changed

2 files changed

+77
-10
lines changed

src/main/java/com/datastax/cdm/job/SplitPartitions.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,17 @@
2424
import java.io.*;
2525
import java.math.BigInteger;
2626
import java.util.ArrayList;
27-
import java.util.Collection;
2827
import java.util.Collections;
2928
import java.util.List;
29+
import java.util.regex.Pattern;
3030
import java.util.stream.Collectors;
3131
import java.util.stream.IntStream;
3232
import java.util.stream.Stream;
3333

3434
public class SplitPartitions {
3535

3636
public static Logger logger = LoggerFactory.getLogger(SplitPartitions.class.getName());
37+
private static final int MAX_NUM_PARTS_FOR_PARTITION_FILE = 10;
3738

3839
public static List<Partition> getRandomSubPartitions(int numSplits, BigInteger min, BigInteger max, int coveragePercent) {
3940
logger.info("ThreadID: {} Splitting min: {} max: {}", Thread.currentThread().getId(), min, max);
@@ -48,16 +49,23 @@ public static List<Partition> getRandomSubPartitions(int numSplits, BigInteger m
4849
public static List<Partition> getSubPartitionsFromFile(int numSplits, String inputFilename) throws IOException {
4950
logger.info("ThreadID: {} Splitting partitions in file: {} using a split-size of {}"
5051
, Thread.currentThread().getId(), inputFilename, numSplits);
52+
if (numSplits > 10) {
53+
logger.warn("Resetting spark.cdm.perfops.numParts value of {} to max allowed value of {} when using a partition file: {}",
54+
numSplits, MAX_NUM_PARTS_FOR_PARTITION_FILE, inputFilename);
55+
numSplits = MAX_NUM_PARTS_FOR_PARTITION_FILE;
56+
}
5157
List<Partition> partitions = new ArrayList<Partition>();
5258
BufferedReader reader = getfileReader(inputFilename);
5359
String line = null;
60+
PartitionMinMax pMinMax;
5461
while ((line = reader.readLine()) != null) {
55-
if (line.startsWith("#")) {
56-
continue;
57-
}
58-
String[] minMax = line.split(",");
5962
try {
60-
partitions.addAll(getSubPartitions(numSplits, new BigInteger(minMax[0]), new BigInteger(minMax[1]), 100));
63+
pMinMax = new PartitionMinMax(line);
64+
if (pMinMax.hasError) {
65+
logger.error("Skipping " + pMinMax.error);
66+
continue;
67+
}
68+
partitions.addAll(getSubPartitions(numSplits, pMinMax.min, pMinMax.max, 100));
6169
} catch (Exception e) {
6270
logger.error("Skipping partition: {}", line, e);
6371
}
@@ -66,6 +74,26 @@ public static List<Partition> getSubPartitionsFromFile(int numSplits, String inp
6674
return partitions;
6775
}
6876

77+
static class PartitionMinMax {
78+
static final Pattern pat = Pattern.compile("^-?[0-9]*,-?[0-9]*");
79+
public BigInteger min;
80+
public BigInteger max;
81+
public boolean hasError = false;
82+
public String error;
83+
84+
public PartitionMinMax(String line) {
85+
line = line.replaceAll(" ", "");
86+
if (!pat.matcher(line).matches()) {
87+
error = "Invaliding partition line: " + line;
88+
hasError = true;
89+
return;
90+
}
91+
String[] minMax = line.split(",");
92+
min = new BigInteger(minMax[0]);
93+
max = new BigInteger(minMax[1]);
94+
}
95+
}
96+
6997
public static List<PKRows> getRowPartsFromFile(int numSplits, String inputFilename) throws IOException {
7098
logger.info("ThreadID: {} Splitting rows in file: {} using a split-size of {}"
7199
, Thread.currentThread().getId(), inputFilename, numSplits);

src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,15 @@
1414

1515
package com.datastax.cdm.job;
1616

17-
import org.junit.jupiter.api.AfterEach;
18-
import org.junit.jupiter.api.BeforeEach;
1917
import org.junit.jupiter.api.Test;
2018

19+
import java.io.IOException;
2120
import java.math.BigInteger;
2221
import java.util.Arrays;
23-
import java.util.Collection;
2422
import java.util.List;
2523
import java.util.stream.Stream;
2624

27-
import static org.junit.jupiter.api.Assertions.assertEquals;
25+
import static org.junit.jupiter.api.Assertions.*;
2826

2927
public class SplitPartitionsTest {
3028

@@ -44,11 +42,52 @@ void getRandomSubPartitionsTestOver100() {
4442
BigInteger.valueOf(44), 200);
4543
assertEquals(8, partitions.size());
4644
}
45+
46+
@Test
47+
void getSubPartitionsFromFileNoFileTest() throws IOException {
48+
Exception excp = assertThrows(RuntimeException.class,
49+
() -> SplitPartitions.getSubPartitionsFromFile(8, "partfile.csv"));
50+
assertTrue(excp.getMessage().contains("No 'partfile.csv' file found!! Add this file in the current folder & rerun!"));
51+
}
52+
53+
@Test
54+
void getSubPartitionsFromFileTest() throws IOException {
55+
List<SplitPartitions.Partition> partitions = SplitPartitions.getSubPartitionsFromFile(5, "./src/resources/partitions.csv");
56+
assertEquals(25, partitions.size());
57+
}
58+
59+
@Test
60+
void getSubPartitionsFromHighNumPartTest() throws IOException {
61+
List<SplitPartitions.Partition> partitions = SplitPartitions.getSubPartitionsFromFile(1000, "./src/resources/partitions.csv");
62+
assertEquals(50, partitions.size());
63+
}
64+
65+
4766
@Test
4867
void batchesTest() {
4968
List<String> mutable_list = Arrays.asList("e1", "e2", "e3", "e4", "e5", "e6");
5069
Stream<List<String>> out = SplitPartitions.batches(mutable_list, 2);
5170
assertEquals(3, out.count());
5271
}
5372

73+
@Test
74+
void PartitionMinMaxBlankTest() {
75+
assertTrue((new SplitPartitions.PartitionMinMax("")).hasError);
76+
}
77+
78+
@Test
79+
void PartitionMinMaxInvalidTest() {
80+
assertTrue((new SplitPartitions.PartitionMinMax(" # min, max")).hasError);
81+
}
82+
83+
@Test
84+
void PartitionMinMaxValidTest() {
85+
assertTrue(!(new SplitPartitions.PartitionMinMax(" -123, 456")).hasError);
86+
}
87+
88+
@Test
89+
void PartitionMinMaxValidMinMaxTest() {
90+
assertEquals(BigInteger.valueOf(-507900353496146534l), (new SplitPartitions.PartitionMinMax(" -507900353496146534, 456")).min);
91+
assertEquals(BigInteger.valueOf(9101008634499147643l), (new SplitPartitions.PartitionMinMax(" -507900353496146534,9101008634499147643")).max);
92+
}
5493
}

0 commit comments

Comments
 (0)