Skip to content

Commit 307c670

Browse files
authored
Merge pull request #11 from datastax/feature/logging-fix
Feature/logging fix
2 parents 1c9ed15 + ceea02d commit 307c670

File tree

12 files changed

+53
-78
lines changed

12 files changed

+53
-78
lines changed

.idea/libraries/Maven__log4j_apache_log4j_extras_1_2_17.xml

Lines changed: 0 additions & 13 deletions
This file was deleted.

.idea/libraries/Maven__log4j_log4j_1_2_17.xml

Lines changed: 0 additions & 13 deletions
This file was deleted.

cassandra-data-migrator.iml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@
8383
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.16" level="project" />
8484
<orderEntry type="library" name="Maven: org.slf4j:jul-to-slf4j:1.7.16" level="project" />
8585
<orderEntry type="library" name="Maven: org.slf4j:jcl-over-slf4j:1.7.16" level="project" />
86-
<orderEntry type="library" name="Maven: log4j:log4j:1.2.17" level="project" />
8786
<orderEntry type="library" name="Maven: org.slf4j:slf4j-log4j12:1.7.16" level="project" />
8887
<orderEntry type="library" name="Maven: com.ning:compress-lzf:1.0.3" level="project" />
8988
<orderEntry type="library" name="Maven: org.xerial.snappy:snappy-java:1.1.8.2" level="project" />
@@ -160,7 +159,6 @@
160159
<orderEntry type="library" name="Maven: org.spark-project.hive:hive-exec:1.2.1.spark2" level="project" />
161160
<orderEntry type="library" name="Maven: commons-io:commons-io:2.4" level="project" />
162161
<orderEntry type="library" name="Maven: javolution:javolution:5.5.1" level="project" />
163-
<orderEntry type="library" name="Maven: log4j:apache-log4j-extras:1.2.17" level="project" />
164162
<orderEntry type="library" name="Maven: org.antlr:ST4:4.0.4" level="project" />
165163
<orderEntry type="library" name="Maven: com.googlecode.javaewah:JavaEWAH:0.3.2" level="project" />
166164
<orderEntry type="library" name="Maven: org.iq80.snappy:snappy:0.2" level="project" />
@@ -202,6 +200,9 @@
202200
<orderEntry type="library" name="Maven: com.github.spotbugs:spotbugs-annotations:3.1.12" level="project" />
203201
<orderEntry type="library" name="Maven: com.datastax.oss:java-driver-mapper-runtime:4.10.0" level="project" />
204202
<orderEntry type="library" name="Maven: com.datastax.oss:java-driver-query-builder:4.10.0" level="project" />
203+
<orderEntry type="library" name="Maven: org.apache.logging.log4j:log4j-api:2.19.0" level="project" />
204+
<orderEntry type="library" name="Maven: org.apache.logging.log4j:log4j-core:2.19.0" level="project" />
205+
<orderEntry type="library" name="Maven: org.apache.logging.log4j:log4j-to-slf4j:2.19.0" level="project" />
205206
<orderEntry type="library" scope="TEST" name="Maven: org.scalatest:scalatest_2.11:3.2.12" level="project" />
206207
<orderEntry type="library" scope="TEST" name="Maven: org.scalatest:scalatest-core_2.11:3.2.12" level="project" />
207208
<orderEntry type="library" scope="TEST" name="Maven: org.scalatest:scalatest-compatible:3.2.12" level="project" />

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>1.4</version>
6+
<version>1.5</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/BaseJobSession.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,11 @@
33
import com.datastax.oss.driver.api.core.CqlSession;
44
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
55
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
6-
import org.slf4j.Logger;
7-
import org.slf4j.LoggerFactory;
86

97
import java.util.ArrayList;
108
import java.util.List;
119

12-
public abstract class BaseJobSession {
10+
public abstract class BaseJobSession {
1311

1412
protected PreparedStatement sourceSelectStatement;
1513
protected PreparedStatement astraSelectStatement;

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
6262
}
6363

6464
writeLimiter.acquire(1);
65-
if (readCounter.incrementAndGet() % 1000 == 0) {
65+
if (readCounter.incrementAndGet() % printStatsAfter == 0) {
6666
logger.info("TreadID: " + Thread.currentThread().getId() + " Read Record Count: "
6767
+ readCounter.get());
6868
}
@@ -88,7 +88,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
8888
for (Row sourceRow : resultSet) {
8989
readLimiter.acquire(1);
9090
writeLimiter.acquire(1);
91-
if (readCounter.incrementAndGet() % 1000 == 0) {
91+
if (readCounter.incrementAndGet() % printStatsAfter == 0) {
9292
logger.info("TreadID: " + Thread.currentThread().getId() + " Read Record Count: " + readCounter.get());
9393
}
9494
batchStatement = batchStatement.add(bindInsert(astraInsertStatement, sourceRow, null));
@@ -133,7 +133,7 @@ private void iterateAndClearWriteResults(Collection<CompletionStage<AsyncResultS
133133
for (CompletionStage<AsyncResultSet> writeResult : writeResults) {
134134
//wait for the writes to complete for the batch. The Retry policy, if defined, should retry the write on timeouts.
135135
writeResult.toCompletableFuture().get().one();
136-
if (writeCounter.addAndGet(incrementBy) % 1000 == 0) {
136+
if (writeCounter.addAndGet(incrementBy) % printStatsAfter == 0) {
137137
logger.info("TreadID: " + Thread.currentThread().getId() + " Write Record Count: " + writeCounter.get());
138138
}
139139
}

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616

1717
public class DiffJobSession extends CopyJobSession {
1818

19-
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
2019
private static DiffJobSession diffJobSession;
21-
20+
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
21+
protected Boolean autoCorrectMissing = false;
22+
protected Boolean autoCorrectMismatch = false;
2223
private AtomicLong readCounter = new AtomicLong(0);
2324
private AtomicLong mismatchCounter = new AtomicLong(0);
2425
private AtomicLong missingCounter = new AtomicLong(0);
@@ -27,8 +28,15 @@ public class DiffJobSession extends CopyJobSession {
2728
private AtomicLong validCounter = new AtomicLong(0);
2829
private AtomicLong skippedCounter = new AtomicLong(0);
2930

30-
protected Boolean autoCorrectMissing = false;
31-
protected Boolean autoCorrectMismatch = false;
31+
private DiffJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
32+
super(sourceSession, astraSession, sparkConf);
33+
34+
autoCorrectMissing = Boolean.parseBoolean(sparkConf.get("spark.destination.autocorrect.missing", "false"));
35+
logger.info("PARAM -- Autocorrect Missing: " + autoCorrectMissing);
36+
37+
autoCorrectMismatch = Boolean.parseBoolean(sparkConf.get("spark.destination.autocorrect.mismatch", "false"));
38+
logger.info("PARAM -- Autocorrect Mismatch: " + autoCorrectMismatch);
39+
}
3240

3341
public static DiffJobSession getInstance(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
3442
if (diffJobSession == null) {
@@ -42,13 +50,6 @@ public static DiffJobSession getInstance(CqlSession sourceSession, CqlSession as
4250
return diffJobSession;
4351
}
4452

45-
private DiffJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
46-
super(sourceSession, astraSession, sparkConf);
47-
48-
autoCorrectMissing = Boolean.parseBoolean(sparkConf.get("spark.destination.autocorrect.missing", "false"));
49-
autoCorrectMismatch = Boolean.parseBoolean(sparkConf.get("spark.destination.autocorrect.mismatch", "false"));
50-
}
51-
5253
public void getDataAndDiff(BigInteger min, BigInteger max) {
5354
ForkJoinPool customThreadPool = new ForkJoinPool();
5455
logger.info("TreadID: " + Thread.currentThread().getId() + " Processing min: " + min + " max:" + max);
@@ -58,7 +59,7 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
5859
try {
5960
// cannot do batching if the writeFilter is greater than 0
6061
ResultSet resultSet = sourceSession.execute(
61-
sourceSelectStatement.bind(hasRandomPartitioner? min : min.longValueExact(), hasRandomPartitioner? max : max.longValueExact()).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM));
62+
sourceSelectStatement.bind(hasRandomPartitioner ? min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact()).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM));
6263

6364
customThreadPool.submit(() -> {
6465
StreamSupport.stream(resultSet.spliterator(), true).forEach(sRow -> {
@@ -100,7 +101,7 @@ public void printCounts(String finalStr) {
100101
+ mismatchCounter.get());
101102
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Corrected Mismatch Count: "
102103
+ correctedMismatchCounter.get());
103-
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Missing Count: "
104+
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Missing Count: "
104105
+ missingCounter.get());
105106
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Corrected Missing Count: "
106107
+ correctedMissingCounter.get());

src/main/java/datastax/astra/migrate/SplitPartitions.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,70 +12,70 @@
1212

1313
public class SplitPartitions {
1414

15-
public static Logger logger = LoggerFactory.getLogger(SplitPartitions.class.getName());
1615
public final static Long MIN_PARTITION = Long.MIN_VALUE;
17-
public final static Long MAX_PARTITION = Long.MAX_VALUE;
16+
public final static Long MAX_PARTITION = Long.MAX_VALUE;
17+
public static Logger logger = LoggerFactory.getLogger(SplitPartitions.class.getName());
1818

19-
public static void main(String[] args){
19+
public static void main(String[] args) {
2020
Collection<Partition> partitions = getSubPartitions(new BigInteger("20"), BigInteger.valueOf(MIN_PARTITION),
2121
BigInteger.valueOf(MAX_PARTITION), 20);
22-
for(Partition partition: partitions){
22+
for (Partition partition : partitions) {
2323
System.out.println(partition);
2424
}
2525
}
2626

27-
public static Collection<Partition> getRandomSubPartitions(BigInteger splitSize, BigInteger min, BigInteger max, int coveragePercent){
27+
public static Collection<Partition> getRandomSubPartitions(BigInteger splitSize, BigInteger min, BigInteger max, int coveragePercent) {
2828
logger.info("TreadID: " + Thread.currentThread().getId() + " Splitting min: " + min + " max:" + max);
29-
List<Partition> partitions = getSubPartitions(splitSize,min,max, coveragePercent);
29+
List<Partition> partitions = getSubPartitions(splitSize, min, max, coveragePercent);
3030
Collections.shuffle(partitions);
3131
Collections.shuffle(partitions);
3232
Collections.shuffle(partitions);
3333
Collections.shuffle(partitions);
3434
return partitions;
3535
}
3636

37-
private static List<Partition> getSubPartitions(BigInteger splitSize, BigInteger min, BigInteger max, int coveragePercent){
37+
private static List<Partition> getSubPartitions(BigInteger splitSize, BigInteger min, BigInteger max, int coveragePercent) {
3838
if (coveragePercent < 1 || coveragePercent > 100) {
3939
coveragePercent = 100;
4040
}
4141
BigInteger curMax = new BigInteger(min.toString());
42-
BigInteger partitionSize = max.subtract(min).divide(splitSize);
42+
BigInteger partitionSize = max.subtract(min).divide(splitSize);
4343
List<Partition> partitions = new ArrayList<Partition>();
44-
if(partitionSize.compareTo(new BigInteger("0"))==0){
45-
partitionSize=new BigInteger("100000");
44+
if (partitionSize.compareTo(new BigInteger("0")) == 0) {
45+
partitionSize = new BigInteger("100000");
4646
}
4747
boolean exausted = false;
48-
while(curMax.compareTo(max) <=0){
48+
while (curMax.compareTo(max) <= 0) {
4949
BigInteger curMin = new BigInteger(curMax.toString());
5050
BigInteger newCurMax = curMin.add(partitionSize);
5151
if (newCurMax.compareTo(curMax) == -1) {
5252
newCurMax = new BigInteger(max.toString());
5353
exausted = true;
5454
}
55-
if (newCurMax.compareTo(max)==1){
55+
if (newCurMax.compareTo(max) == 1) {
5656
newCurMax = new BigInteger(max.toString());
57-
exausted=true;
57+
exausted = true;
5858
}
5959
curMax = newCurMax;
6060

6161
BigInteger range = curMax.subtract(curMin);
6262
BigInteger curRange = range.multiply(BigInteger.valueOf(coveragePercent)).divide(BigInteger.valueOf(100));
63-
partitions.add(new Partition(curMin,curMin.add(curRange)));
64-
if(exausted){
63+
partitions.add(new Partition(curMin, curMin.add(curRange)));
64+
if (exausted) {
6565
break;
6666
}
6767
}
6868

6969
return partitions;
7070
}
7171

72-
public static class Partition implements Serializable{
72+
public static class Partition implements Serializable {
7373
private static final long serialVersionUID = 1L;
7474

7575
private BigInteger min;
7676
private BigInteger max;
7777

78-
public Partition(BigInteger min, BigInteger max){
78+
public Partition(BigInteger min, BigInteger max) {
7979
this.min = min;
8080
this.max = max;
8181
}
@@ -88,8 +88,8 @@ public BigInteger getMax() {
8888
return max;
8989
}
9090

91-
public String toString(){
92-
return "Processing partition for token range "+ min + " to " + max;
91+
public String toString() {
92+
return "Processing partition for token range " + min + " to " + max;
9393
}
9494
}
9595
}

src/main/scala/datastax/astra/migrate/AbstractJob.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
package datastax.astra.migrate
22

33
import com.datastax.spark.connector.cql.CassandraConnector
4-
import org.apache.spark.sql.SparkSession
5-
import org.slf4j.LoggerFactory
6-
7-
import java.math.BigInteger
8-
import java.lang.Long
94

105
class AbstractJob extends BaseJob {
116

7+
abstractLogger.info("PARAM -- Min Partition: " + minPartition)
8+
abstractLogger.info("PARAM -- Max Partition: " + maxPartition)
9+
abstractLogger.info("PARAM -- Split Size: " + coveragePercent)
10+
abstractLogger.info("PARAM -- Coverage Percent: " + coveragePercent)
11+
abstractLogger.info("PARAM Calculated -- Total Partitions: " + partitions.size())
12+
1213
var sourceConnection = getConnection(true, sourceIsAstra, sourceScbPath, sourceHost, sourceUsername, sourcePassword, sourceReadConsistencyLevel,
1314
sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword, sourceEnabledAlgorithms);
1415

src/main/scala/datastax/astra/migrate/BaseJob.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,11 @@ class BaseJob extends App {
4141
val destinationKeyStorePassword = sc.getConf.get("spark.destination.keyStore.password", "")
4242
val destinationEnabledAlgorithms = sc.getConf.get("spark.destination.enabledAlgorithms", "")
4343

44-
val minPartition = new BigInteger(sc.getConf.get("spark.source.minPartition","-9223372036854775808"))
45-
val maxPartition = new BigInteger(sc.getConf.get("spark.source.maxPartition","9223372036854775807"))
44+
val minPartition = new BigInteger(sc.getConf.get("spark.source.minPartition", "-9223372036854775808"))
45+
val maxPartition = new BigInteger(sc.getConf.get("spark.source.maxPartition", "9223372036854775807"))
4646
val coveragePercent = sc.getConf.get("spark.coveragePercent", "100")
4747
val splitSize = sc.getConf.get("spark.splitSize", "10000")
48-
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition,Integer.parseInt(coveragePercent))
48+
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition, Integer.parseInt(coveragePercent))
4949

5050
protected def exitSpark() = {
5151
spark.stop()

0 commit comments

Comments
 (0)