Skip to content

Commit a387b33

Browse files
committed
Add options to output partition range
1 parent ed0d750 commit a387b33

File tree

10 files changed

+112
-24
lines changed

10 files changed

+112
-24
lines changed

src/main/java/com/datastax/cdm/job/AbstractJobSession.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ public abstract class AbstractJobSession<T> extends BaseJobSession {
3434
protected EnhancedSession targetSession;
3535
protected Guardrail guardrailFeature;
3636
protected boolean guardrailEnabled;
37-
protected String partitionFile = SplitPartitions.getPartitionFile(propertyHelper);
37+
protected boolean appendPartitionOnDiff = SplitPartitions.appendPartitionOnDiff(propertyHelper);
38+
protected String partitionFileInput = SplitPartitions.getPartitionFileInput(propertyHelper);
39+
protected String partitionFileOutput = SplitPartitions.getPartitionFileOutput(propertyHelper);
3840
protected JobCounter jobCounter;
3941
protected Long printStatsAfter;
4042

@@ -62,7 +64,8 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
6264
maxRetries = propertyHelper.getInteger(KnownProperties.MAX_RETRIES);
6365

6466
logger.info("PARAM -- Max Retries: {}", maxRetries);
65-
logger.info("PARAM -- Partition file: {}", partitionFile);
67+
logger.info("PARAM -- Partition file input: {}", partitionFileInput);
68+
logger.info("PARAM -- Partition file output: {}", partitionFileOutput);
6669
logger.info("PARAM -- Origin Rate Limit: {}", rateLimiterOrigin.getRate());
6770
logger.info("PARAM -- Target Rate Limit: {}", rateLimiterTarget.getRate());
6871

src/main/java/com/datastax/cdm/job/BaseJobSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ private void appendToFile(String filePath, String content) throws IOException {
9999
StandardOpenOption.APPEND);
100100
}
101101

102-
protected void logFailedPartitionsInFile(String partitionFile, BigInteger min, BigInteger max) {
102+
protected void logPartitionsInFile(String partitionFile, BigInteger min, BigInteger max) {
103103
try {
104104
appendToFile(partitionFile, min + "," + max);
105105
} catch (Exception ee) {

src/main/java/com/datastax/cdm/job/CopyJobSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
125125
} catch (Exception e) {
126126
if (attempts == maxAttempts) {
127127
jobCounter.threadIncrement(JobCounter.CounterType.ERROR, jobCounter.getCount(JobCounter.CounterType.READ) - jobCounter.getCount(JobCounter.CounterType.WRITE) - jobCounter.getCount(JobCounter.CounterType.SKIPPED));
128-
logFailedPartitionsInFile(partitionFile, min, max);
128+
logPartitionsInFile(partitionFileOutput, min, max);
129129
}
130130
logger.error("Error occurred during Attempt#: {}", attempts, e);
131131
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}",

src/main/java/com/datastax/cdm/job/DiffJobSession.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.Collections;
4343
import java.util.List;
4444
import java.util.concurrent.CompletionStage;
45+
import java.util.concurrent.atomic.AtomicBoolean;
4546
import java.util.stream.Collectors;
4647
import java.util.stream.IntStream;
4748
import java.util.stream.StreamSupport;
@@ -112,6 +113,7 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
112113
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max));
113114
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
114115
boolean done = false;
116+
AtomicBoolean hasDiff = new AtomicBoolean(false);
115117
int maxAttempts = maxRetries + 1;
116118
for (int attempts = 1; attempts <= maxAttempts && !done; attempts++) {
117119
try {
@@ -152,19 +154,27 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
152154
r.setAsyncTargetRow(targetResult);
153155
recordsToDiff.add(r);
154156
if (recordsToDiff.size() > fetchSizeInRows) {
155-
diffAndClear(recordsToDiff);
157+
if(diffAndClear(recordsToDiff)) {
158+
hasDiff.set(true);
159+
}
156160
}
157161
} // targetRecord!=null
158162
} // recordSet iterator
159163
} // shouldFilterRecord
160164
});
161-
diffAndClear(recordsToDiff);
165+
if (diffAndClear(recordsToDiff)) {
166+
hasDiff.set(true);
167+
}
162168
done = true;
169+
170+
if (hasDiff.get() && appendPartitionOnDiff) {
171+
logPartitionsInFile(partitionFileOutput, min, max);
172+
}
163173
} catch (Exception e) {
164174
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}",
165175
Thread.currentThread().getId(), min, max, attempts, e);
166176
if (attempts == maxAttempts) {
167-
logFailedPartitionsInFile(partitionFile, min, max);
177+
logPartitionsInFile(partitionFileOutput, min, max);
168178
}
169179
} finally {
170180
jobCounter.globalIncrement();
@@ -173,18 +183,22 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
173183
}
174184
}
175185

176-
private void diffAndClear(List<Record> recordsToDiff) {
186+
private boolean diffAndClear(List<Record> recordsToDiff) {
187+
boolean isDiff = false;
177188
for (Record record : recordsToDiff) {
178189
try {
179-
diff(record);
190+
if (diff(record)) {
191+
isDiff = true;
192+
}
180193
} catch (Exception e) {
181194
logger.error("Could not perform diff for key {}: {}", record.getPk(), e);
182195
}
183196
}
184197
recordsToDiff.clear();
198+
return isDiff;
185199
}
186200

187-
private void diff(Record record) {
201+
private boolean diff(Record record) {
188202
EnhancedPK originPK = record.getPk();
189203
Row originRow = record.getOriginRow();
190204
Row targetRow = record.getTargetRow();
@@ -194,7 +208,7 @@ private void diff(Record record) {
194208
logger.error("Missing target row found for key: {}", record.getPk());
195209
if (autoCorrectMissing && isCounterTable && !forceCounterWhenMissing) {
196210
logger.error("{} is true, but not Inserting as {} is not enabled; key : {}", KnownProperties.AUTOCORRECT_MISSING, KnownProperties.AUTOCORRECT_MISSING_COUNTER, record.getPk());
197-
return;
211+
return true;
198212
}
199213

200214
//correct data
@@ -204,7 +218,7 @@ private void diff(Record record) {
204218
jobCounter.threadIncrement(JobCounter.CounterType.CORRECTED_MISSING);
205219
logger.error("Inserted missing row in target: {}", record.getPk());
206220
}
207-
return;
221+
return true;
208222
}
209223

210224
String diffData = isDifferent(originPK, originRow, targetRow);
@@ -218,8 +232,11 @@ private void diff(Record record) {
218232
jobCounter.threadIncrement(JobCounter.CounterType.CORRECTED_MISMATCH);
219233
logger.error("Corrected mismatch row in target: {}", record.getPk());
220234
}
235+
236+
return true;
221237
} else {
222238
jobCounter.threadIncrement(JobCounter.CounterType.VALID);
239+
return false;
223240
}
224241
}
225242

src/main/java/com/datastax/cdm/job/SplitPartitions.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -168,13 +168,32 @@ private static BufferedReader getfileReader(String fileName) {
168168
}
169169
}
170170

171-
public static String getPartitionFile(PropertyHelper propertyHelper) {
172-
String filePath = propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE);
173-
if (StringUtils.isAllBlank(filePath)) {
174-
filePath = "./" + propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE) + "_partitions.csv";
171+
public static boolean appendPartitionOnDiff(PropertyHelper propertyHelper) {
172+
return Boolean.TRUE.equals(propertyHelper.getBoolean(KnownProperties.TOKEN_RANGE_PARTITION_FILE_APPEND_ON_DIFF));
173+
}
174+
175+
public static String getPartitionFileInput(PropertyHelper propertyHelper) {
176+
if (!StringUtils.isAllBlank(propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE_INPUT))) {
177+
return propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE_INPUT);
178+
}
179+
180+
if (!StringUtils.isAllBlank(propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE))) {
181+
return propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE);
182+
}
183+
184+
return "./" + propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE) + "_partitions.csv";
185+
}
186+
187+
public static String getPartitionFileOutput(PropertyHelper propertyHelper) {
188+
if (!StringUtils.isAllBlank(propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE_OUTPUT))) {
189+
return propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE_OUTPUT);
190+
}
191+
192+
if (!StringUtils.isAllBlank(propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE))) {
193+
return propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE);
175194
}
176195

177-
return filePath;
196+
return "./" + propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE) + "_partitions.csv";
178197
}
179198

180199
public static class PKRows implements Serializable {

src/main/java/com/datastax/cdm/properties/KnownProperties.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,12 +152,19 @@ public enum PropertyType {
152152
}
153153

154154
//==========================================================================
155-
// Error handling
155+
// Partition File
156156
//==========================================================================
157157
public static final String TOKEN_RANGE_PARTITION_FILE = "spark.cdm.tokenrange.partitionFile";
158+
public static final String TOKEN_RANGE_PARTITION_FILE_APPEND_ON_DIFF = "spark.cdm.tokenrange.partitionFile.appendOnDiff";
159+
public static final String TOKEN_RANGE_PARTITION_FILE_INPUT = "spark.cdm.tokenrange.partitionFile.input";
160+
public static final String TOKEN_RANGE_PARTITION_FILE_OUTPUT = "spark.cdm.tokenrange.partitionFile.output";
158161
static {
159162
types.put(TOKEN_RANGE_PARTITION_FILE, PropertyType.STRING);
163+
types.put(TOKEN_RANGE_PARTITION_FILE_APPEND_ON_DIFF, PropertyType.BOOLEAN);
164+
types.put(TOKEN_RANGE_PARTITION_FILE_INPUT, PropertyType.STRING);
165+
types.put(TOKEN_RANGE_PARTITION_FILE_OUTPUT, PropertyType.STRING);
160166
}
167+
161168
//==========================================================================
162169
// Guardrails and Transformations
163170
//==========================================================================

src/main/scala/com/datastax/cdm/job/BaseJob.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package com.datastax.cdm.job
1717

18-
import com.datastax.cdm.job.SplitPartitions.getPartitionFile
18+
import com.datastax.cdm.job.SplitPartitions.getPartitionFileInput
1919
import com.datastax.cdm.properties.{KnownProperties, PropertyHelper}
2020
import com.datastax.spark.connector.cql.CassandraConnector
2121
import org.apache.spark.{SparkConf, SparkContext}
@@ -53,7 +53,7 @@ abstract class BaseJob[T: ClassTag] extends App {
5353

5454
var originConnection: CassandraConnector = _
5555
var targetConnection: CassandraConnector = _
56-
var partitionFileName: String = ""
56+
var partitionFileNameInput: String = ""
5757

5858
def setup(jobName: String, jobFactory: IJobSessionFactory[T]): Unit = {
5959
logBanner(jobName + " - Starting")
@@ -66,7 +66,7 @@ abstract class BaseJob[T: ClassTag] extends App {
6666
sContext = spark.sparkContext
6767
sc = sContext.getConf
6868
propertyHelper = PropertyHelper.getInstance(sc);
69-
this.partitionFileName = getPartitionFile(propertyHelper);
69+
this.partitionFileNameInput = getPartitionFileInput(propertyHelper);
7070

7171
consistencyLevel = propertyHelper.getString(KnownProperties.READ_CL)
7272
val connectionFetcher = new ConnectionFetcher(sContext, propertyHelper)

src/main/scala/com/datastax/cdm/job/BasePKJob.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,6 @@ abstract class BasePKJob extends BaseJob[SplitPartitions.PKRows] {
2121
override def getParts(pieces: Int): util.Collection[SplitPartitions.PKRows] = {
2222
// This takes a file with N rows and divides it into pieces of size N/pieces
2323
// Each PKRows object contains a list of Strings that contain the PK to be parsed
24-
SplitPartitions.getRowPartsFromFile(pieces, this.partitionFileName)
24+
SplitPartitions.getRowPartsFromFile(pieces, this.partitionFileNameInput)
2525
}
2626
}

src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ import scala.reflect.io.File
2020

2121
abstract class BasePartitionJob extends BaseJob[SplitPartitions.Partition] {
2222
override def getParts(pieces: Int): util.Collection[SplitPartitions.Partition] = {
23-
if (!File(this.partitionFileName).exists) {
23+
if (!File(this.partitionFileNameInput).exists) {
2424
SplitPartitions.getRandomSubPartitions(pieces, minPartition, maxPartition, coveragePercent)
2525
} else {
26-
SplitPartitions.getSubPartitionsFromFile(pieces, this.partitionFileName)
26+
SplitPartitions.getSubPartitionsFromFile(pieces, this.partitionFileNameInput)
2727
}
2828
}
2929

src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414

1515
package com.datastax.cdm.job;
1616

17+
import com.datastax.cdm.properties.PropertyHelper;
18+
import org.junit.jupiter.api.AfterEach;
19+
import org.junit.jupiter.api.BeforeAll;
20+
import org.junit.jupiter.api.BeforeEach;
1721
import org.junit.jupiter.api.Test;
1822

1923
import java.io.IOException;
@@ -25,6 +29,10 @@
2529
import static org.junit.jupiter.api.Assertions.*;
2630

2731
public class SplitPartitionsTest {
32+
@AfterEach
33+
void tearDown() {
34+
PropertyHelper.destroyInstance();
35+
}
2836

2937
@Test
3038
void getRandomSubPartitionsTest() {
@@ -90,4 +98,38 @@ void PartitionMinMaxValidMinMaxTest() {
9098
assertEquals(BigInteger.valueOf(-507900353496146534l), (new SplitPartitions.PartitionMinMax(" -507900353496146534, 456")).min);
9199
assertEquals(BigInteger.valueOf(9101008634499147643l), (new SplitPartitions.PartitionMinMax(" -507900353496146534,9101008634499147643")).max);
92100
}
101+
102+
@Test
103+
void appendPartitionOnDiff() {
104+
PropertyHelper helper = PropertyHelper.getInstance();
105+
assertFalse(SplitPartitions.appendPartitionOnDiff(helper));
106+
helper.setProperty("spark.cdm.tokenrange.partitionFile.appendOnDiff", true);
107+
assertTrue(SplitPartitions.appendPartitionOnDiff(helper));
108+
}
109+
110+
@Test
111+
void getPartitionFileInput() {
112+
PropertyHelper helper = PropertyHelper.getInstance();
113+
helper.setProperty("spark.cdm.schema.origin.keyspaceTable", "tb");
114+
assertEquals("./tb_partitions.csv", SplitPartitions.getPartitionFileInput(helper));
115+
116+
helper.setProperty("spark.cdm.tokenrange.partitionFile", "./file.csv");
117+
assertEquals("./file.csv", SplitPartitions.getPartitionFileInput(helper));
118+
119+
helper.setProperty("spark.cdm.tokenrange.partitionFile.input", "./file_input.csv");
120+
assertEquals("./file_input.csv", SplitPartitions.getPartitionFileInput(helper));
121+
}
122+
123+
@Test
124+
void getPartitionFileOutput() {
125+
PropertyHelper helper = PropertyHelper.getInstance();
126+
helper.setProperty("spark.cdm.schema.origin.keyspaceTable", "tb");
127+
assertEquals("./tb_partitions.csv", SplitPartitions.getPartitionFileOutput(helper));
128+
129+
helper.setProperty("spark.cdm.tokenrange.partitionFile", "./file.csv");
130+
assertEquals("./file.csv", SplitPartitions.getPartitionFileOutput(helper));
131+
132+
helper.setProperty("spark.cdm.tokenrange.partitionFile.output", "./file_output.csv");
133+
assertEquals("./file_output.csv", SplitPartitions.getPartitionFileOutput(helper));
134+
}
93135
}

0 commit comments

Comments
 (0)