Skip to content

Commit 37de2cd

Browse files
Modified to use the existing DiffPartitionsFromFile
1 parent 86f938d commit 37de2cd

File tree

4 files changed

+12
-46
lines changed

4 files changed

+12
-46
lines changed

src/main/java/com/datastax/cdm/job/SplitPartitions.java

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -158,40 +158,4 @@ private static BufferedReader getfileReader(String fileName) {
158158
}
159159
}
160160

161-
public static List<Partition> getFailedSubPartitionsFromFile(int splitSize, String tokenRangeFile) throws IOException {
162-
logger.info("ThreadID: {} Splitting partitions in file: {} using a split-size of {}"
163-
, Thread.currentThread().getId(), tokenRangeFile, splitSize);
164-
165-
File file = new File(tokenRangeFile);
166-
String renamedFile = tokenRangeFile+"_bkp";
167-
File rename = new File(renamedFile);
168-
if(rename.exists()) {
169-
rename.delete();
170-
}
171-
boolean flag = file.renameTo(rename);
172-
if (flag) {
173-
logger.info("File Successfully Renamed to : "+renamedFile);
174-
}
175-
else {
176-
logger.info("Operation Failed to rename file : "+tokenRangeFile);
177-
}
178-
179-
List<Partition> partitions = new ArrayList<Partition>();
180-
BufferedReader reader = getfileReader(renamedFile);
181-
String line = null;
182-
while ((line = reader.readLine()) != null) {
183-
if (line.startsWith("#")) {
184-
continue;
185-
}
186-
String[] minMax = line.split(",");
187-
try {
188-
partitions.addAll(getSubPartitions(splitSize, new BigInteger(minMax[0]), new BigInteger(minMax[1]), 100));
189-
} catch (Exception e) {
190-
logger.error("Skipping partition: {}", line, e);
191-
}
192-
}
193-
194-
return partitions;
195-
}
196-
197161
}

src/main/java/com/datastax/cdm/properties/KnownProperties.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,10 @@ public enum PropertyType {
141141
// Error handling
142142
//==========================================================================
143143
public static final String TOKEN_RANGE_EXCEPTION_DIR = "spark.tokenRange.exceptionDir"; // aaa/bbb
144-
public static final String TOKEN_RANGE_EXCEPTION_FILE = "spark.input.partitionFile"; // aaa/bbb/filename
144+
public static final String PARTITIONS_TOKEN_RANGE_FILE = "spark.input.partitionFile"; // aaa/bbb/filename
145145
static {
146146
types.put(TOKEN_RANGE_EXCEPTION_DIR, PropertyType.STRING);
147-
types.put(TOKEN_RANGE_EXCEPTION_FILE, PropertyType.STRING);
147+
types.put(PARTITIONS_TOKEN_RANGE_FILE, PropertyType.STRING);
148148
}
149149
//==========================================================================
150150
// Guardrails and Transformations

src/main/scala/com/datastax/cdm/job/BaseJob.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ abstract class BaseJob[T: ClassTag] extends App {
3232
var maxPartition: BigInteger = _
3333
var coveragePercent: Int = _
3434
var numSplits: Int = _
35-
var tokenRangeFile: String = _
3635

3736
var parts: util.Collection[T] = _
3837
var slices: RDD[T] = _
@@ -70,18 +69,16 @@ abstract class BaseJob[T: ClassTag] extends App {
7069
maxPartition = getMaxPartition(propertyHelper.getString(KnownProperties.PARTITION_MAX), hasRandomPartitioner)
7170
coveragePercent = propertyHelper.getInteger(KnownProperties.TOKEN_COVERAGE_PERCENT)
7271
numSplits = propertyHelper.getInteger(KnownProperties.PERF_NUM_PARTS)
73-
tokenRangeFile = propertyHelper.getString(KnownProperties.TOKEN_RANGE_EXCEPTION_FILE)
72+
this.fileName = propertyHelper.getString(KnownProperties.PARTITIONS_TOKEN_RANGE_FILE)
7473
abstractLogger.info("PARAM -- Min Partition: " + minPartition)
7574
abstractLogger.info("PARAM -- Max Partition: " + maxPartition)
7675
abstractLogger.info("PARAM -- Number of Splits : " + numSplits)
7776
abstractLogger.info("PARAM -- Coverage Percent: " + coveragePercent)
78-
abstractLogger.info("PARAM -- Token Range File: " + tokenRangeFile)
79-
77+
abstractLogger.info("PARAM -- Partition File: " + fileName)
8078
this.parts = getParts(numSplits)
8179
this.slices = sContext.parallelize(parts.asScala.toSeq, parts.size);
8280
abstractLogger.info("PARAM Calculated -- Total Partitions: " + parts.size())
8381
abstractLogger.info("Spark parallelize created : " + slices.count() + " slices!");
84-
8582
}
8683

8784
def getParts(pieces: Int): util.Collection[T]

src/main/scala/com/datastax/cdm/job/DiffPartitionsFromFile.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
package com.datastax.cdm.job
22

33
object DiffPartitionsFromFile extends BasePartitionJob {
4-
setup("Diff Partitions from File Job", new DiffJobSessionFactory(), tokenRangeFile)
5-
execute()
6-
finish()
4+
setup("Diff Partitions from File Job", new DiffJobSessionFactory())
5+
if ("".equals(this.fileName)) {
6+
abstractLogger.error("Please set conf for spark.input.partitionFile ")
7+
}
8+
else {
9+
execute()
10+
finish()
11+
}
712

813
override def execute(): Unit = {
914
slices.foreach(slice => {

0 commit comments

Comments
 (0)