Skip to content

Commit b1252f8

Browse files
committed
CDM-54 consolidating duplicate code, parameterizing filename
1 parent ecab70f commit b1252f8

20 files changed

+345
-272
lines changed

src/main/java/com/datastax/cdm/job/AbstractJobSession.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@
1212
import org.slf4j.Logger;
1313
import org.slf4j.LoggerFactory;
1414

15-
public class AbstractJobSession extends BaseJobSession {
15+
public abstract class AbstractJobSession<T> extends BaseJobSession {
16+
17+
public abstract void processSlice(T slice);
18+
public abstract void printCounts(boolean isFinal);
1619

1720
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
1821

@@ -74,7 +77,4 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
7477
this.guardrailEnabled = this.guardrailFeature.isEnabled();
7578

7679
}
77-
78-
79-
8080
}

src/main/java/com/datastax/cdm/job/CopyJobSession.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import java.util.concurrent.CompletionStage;
1919
import java.util.concurrent.atomic.AtomicLong;
2020

21-
public class CopyJobSession extends AbstractJobSession {
21+
public class CopyJobSession extends AbstractJobSession<SplitPartitions.Partition> {
2222

2323
private static CopyJobSession copyJobSession;
2424
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
@@ -51,16 +51,9 @@ protected CopyJobSession(CqlSession originSession, CqlSession targetSession, Spa
5151
logger.info("CQL -- target upsert: {}",this.targetSession.getTargetUpsertStatement().getCQL());
5252
}
5353

54-
public static CopyJobSession getInstance(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
55-
if (copyJobSession == null) {
56-
synchronized (CopyJobSession.class) {
57-
if (copyJobSession == null) {
58-
copyJobSession = new CopyJobSession(originSession, targetSession, sc);
59-
}
60-
}
61-
}
62-
63-
return copyJobSession;
54+
@Override
55+
public void processSlice(SplitPartitions.Partition slice) {
56+
this.getDataAndInsert(slice.getMin(), slice.getMax());
6457
}
6558

6659
public void getDataAndInsert(BigInteger min, BigInteger max) {
@@ -147,6 +140,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
147140
}
148141
}
149142

143+
@Override
150144
public synchronized void printCounts(boolean isFinal) {
151145
String msg = "ThreadID: " + Thread.currentThread().getId();
152146
if (isFinal) {
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.datastax.cdm.job;
2+
3+
import com.datastax.oss.driver.api.core.CqlSession;
4+
import org.apache.spark.SparkConf;
5+
6+
public class CopyJobSessionFactory implements IJobSessionFactory<SplitPartitions.Partition> {
7+
private static CopyJobSession jobSession = null;
8+
9+
public AbstractJobSession<SplitPartitions.Partition> getInstance(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
10+
if (jobSession == null) {
11+
synchronized (CopyJobSession.class) {
12+
if (jobSession == null) {
13+
jobSession = new CopyJobSession(originSession, targetSession, sc);
14+
}
15+
}
16+
}
17+
return jobSession;
18+
}
19+
}

src/main/java/com/datastax/cdm/job/CopyPKJobSession.java

Lines changed: 38 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import java.util.List;
1818
import java.util.concurrent.atomic.AtomicLong;
1919

20-
public class CopyPKJobSession extends AbstractJobSession {
20+
public class CopyPKJobSession extends AbstractJobSession<SplitPartitions.PKRows> {
2121

2222
private static CopyPKJobSession copyJobSession;
2323
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
@@ -40,66 +40,58 @@ protected CopyPKJobSession(CqlSession originSession, CqlSession targetSession, S
4040
logger.info("CQL -- origin select: {}",this.originSession.getOriginSelectByPKStatement().getCQL());
4141
}
4242

43-
public static CopyPKJobSession getInstance(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
44-
if (copyJobSession == null) {
45-
synchronized (CopyPKJobSession.class) {
46-
if (copyJobSession == null) {
47-
copyJobSession = new CopyPKJobSession(originSession, targetSession, sc);
48-
}
49-
}
50-
}
51-
52-
return copyJobSession;
43+
@Override
44+
public void processSlice(SplitPartitions.PKRows slice) {
45+
this.getRowAndInsert(slice);
5346
}
5447

55-
public void getRowAndInsert(List<SplitPartitions.PKRows> rowsList) {
48+
public void getRowAndInsert(SplitPartitions.PKRows rowsList) {
5649
originSelectByPKStatement = originSession.getOriginSelectByPKStatement();
57-
for (SplitPartitions.PKRows rows : rowsList) {
58-
rows.pkRows.parallelStream().forEach(row -> {
59-
readCounter.incrementAndGet();
60-
EnhancedPK pk = toEnhancedPK(row);
61-
if (null == pk || pk.isError()) {
62-
missingCounter.incrementAndGet();
63-
logger.error("Could not build PK object with value <{}>; error is: {}", row, (null == pk ? "null" : pk.getMessages()));
64-
return;
65-
}
50+
for (String row : rowsList.pkRows) {
51+
readCounter.incrementAndGet();
52+
EnhancedPK pk = toEnhancedPK(row);
53+
if (null == pk || pk.isError()) {
54+
missingCounter.incrementAndGet();
55+
logger.error("Could not build PK object with value <{}>; error is: {}", row, (null == pk ? "null" : pk.getMessages()));
56+
return;
57+
}
6658

67-
Record recordFromOrigin = originSelectByPKStatement.getRecord(pk);
68-
if (null == recordFromOrigin) {
69-
missingCounter.incrementAndGet();
70-
logger.error("Could not find origin row with primary-key: {}", row);
71-
return;
72-
}
73-
Row originRow = recordFromOrigin.getOriginRow();
59+
Record recordFromOrigin = originSelectByPKStatement.getRecord(pk);
60+
if (null == recordFromOrigin) {
61+
missingCounter.incrementAndGet();
62+
logger.error("Could not find origin row with primary-key: {}", row);
63+
return;
64+
}
65+
Row originRow = recordFromOrigin.getOriginRow();
66+
67+
Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null);
68+
if (originSelectByPKStatement.shouldFilterRecord(record)) {
69+
skipCounter.incrementAndGet();
70+
return;
71+
}
7472

75-
Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null);
76-
if (originSelectByPKStatement.shouldFilterRecord(record)) {
73+
if (guardrailEnabled) {
74+
String guardrailCheck = guardrailFeature.guardrailChecks(record);
75+
if (guardrailCheck != null && guardrailCheck != Guardrail.CLEAN_CHECK) {
76+
logger.error("Guardrails failed for PrimaryKey {}; {}", record.getPk(), guardrailCheck);
7777
skipCounter.incrementAndGet();
7878
return;
7979
}
80+
}
8081

81-
if (guardrailEnabled) {
82-
String guardrailCheck = guardrailFeature.guardrailChecks(record);
83-
if (guardrailCheck != null && guardrailCheck != Guardrail.CLEAN_CHECK) {
84-
logger.error("Guardrails failed for PrimaryKey {}; {}", record.getPk(), guardrailCheck);
85-
skipCounter.incrementAndGet();
86-
return;
87-
}
88-
}
89-
90-
writeLimiter.acquire(1);
91-
targetSession.getTargetUpsertStatement().putRecord(record);
92-
writeCounter.incrementAndGet();
82+
writeLimiter.acquire(1);
83+
targetSession.getTargetUpsertStatement().putRecord(record);
84+
writeCounter.incrementAndGet();
9385

94-
if (readCounter.get() % printStatsAfter == 0) {
95-
printCounts(false);
96-
}
97-
});
86+
if (readCounter.get() % printStatsAfter == 0) {
87+
printCounts(false);
88+
}
9889
}
9990

10091
printCounts(true);
10192
}
10293

94+
@Override
10395
public void printCounts(boolean isFinal) {
10496
if (isFinal) {
10597
logger.info("################################################################################################");
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.datastax.cdm.job;
2+
3+
import com.datastax.oss.driver.api.core.CqlSession;
4+
import org.apache.spark.SparkConf;
5+
6+
public class CopyPKJobSessionFactory implements IJobSessionFactory<SplitPartitions.PKRows> {
7+
private static CopyPKJobSession jobSession = null;
8+
9+
public AbstractJobSession<SplitPartitions.PKRows> getInstance(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
10+
if (jobSession == null) {
11+
synchronized (CopyPKJobSession.class) {
12+
if (jobSession == null) {
13+
jobSession = new CopyPKJobSession(originSession, targetSession, sc);
14+
}
15+
}
16+
}
17+
return jobSession;
18+
}
19+
}

src/main/java/com/datastax/cdm/job/DiffJobSession.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class DiffJobSession extends CopyJobSession {
5151
private final int explodeMapValueIndex;
5252
private final List<Integer> constantColumnIndexes;
5353

54-
private DiffJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
54+
public DiffJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
5555
super(originSession, targetSession, sc);
5656

5757
autoCorrectMissing = propertyHelper.getBoolean(KnownProperties.AUTOCORRECT_MISSING);
@@ -92,15 +92,9 @@ private DiffJobSession(CqlSession originSession, CqlSession targetSession, Spark
9292
logger.info("CQL -- target upsert: {}",this.targetSession.getTargetUpsertStatement().getCQL());
9393
}
9494

95-
public static DiffJobSession getInstance(CqlSession originSession, CqlSession targetSession, SparkConf sparkConf) {
96-
if (diffJobSession == null) {
97-
synchronized (DiffJobSession.class) {
98-
if (diffJobSession == null) {
99-
diffJobSession = new DiffJobSession(originSession, targetSession, sparkConf);
100-
}
101-
}
102-
}
103-
return diffJobSession;
95+
@Override
96+
public void processSlice(SplitPartitions.Partition slice) {
97+
this.getDataAndDiff(slice.getMin(), slice.getMax());
10498
}
10599

106100
public void getDataAndDiff(BigInteger min, BigInteger max) {
@@ -174,6 +168,7 @@ private void diffAndClear(List<Record> recordsToDiff) {
174168
recordsToDiff.clear();
175169
}
176170

171+
@Override
177172
public synchronized void printCounts(boolean isFinal) {
178173
String msg = "ThreadID: " + Thread.currentThread().getId();
179174
if (isFinal) {
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.datastax.cdm.job;
2+
3+
import com.datastax.oss.driver.api.core.CqlSession;
4+
import org.apache.spark.SparkConf;
5+
6+
public class DiffJobSessionFactory implements IJobSessionFactory<SplitPartitions.Partition> {
7+
private static DiffJobSession jobSession = null;
8+
9+
public AbstractJobSession<SplitPartitions.Partition> getInstance(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
10+
if (jobSession == null) {
11+
synchronized (DiffJobSession.class) {
12+
if (jobSession == null) {
13+
jobSession = new DiffJobSession(originSession, targetSession, sc);
14+
}
15+
}
16+
}
17+
return jobSession;
18+
}
19+
}

src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import java.math.BigInteger;
1616
import java.util.concurrent.atomic.AtomicLong;
1717

18-
public class GuardrailCheckJobSession extends AbstractJobSession {
18+
public class GuardrailCheckJobSession extends AbstractJobSession<SplitPartitions.Partition> {
1919

2020
private static GuardrailCheckJobSession guardrailJobSession;
2121
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
@@ -39,15 +39,9 @@ protected GuardrailCheckJobSession(CqlSession originSession, CqlSession targetSe
3939
logger.info("CQL -- origin select: {}",this.originSession.getOriginSelectByPartitionRangeStatement().getCQL());
4040
}
4141

42-
public static GuardrailCheckJobSession getInstance(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
43-
if (guardrailJobSession == null) {
44-
synchronized (GuardrailCheckJobSession.class) {
45-
if (guardrailJobSession == null) {
46-
guardrailJobSession = new GuardrailCheckJobSession(originSession, targetSession, sc);
47-
}
48-
}
49-
}
50-
return guardrailJobSession;
42+
@Override
43+
public void processSlice(SplitPartitions.Partition slice) {
44+
this.guardrailCheck(slice.getMin(), slice.getMax());
5145
}
5246

5347
public void guardrailCheck(BigInteger min, BigInteger max) {
@@ -91,6 +85,7 @@ public void guardrailCheck(BigInteger min, BigInteger max) {
9185
ThreadContext.remove(THREAD_CONTEXT_LABEL);
9286
}
9387

88+
@Override
9489
public synchronized void printCounts(boolean isFinal) {
9590
String msg = "ThreadID: " + Thread.currentThread().getId();
9691
if (isFinal) {
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.datastax.cdm.job;
2+
3+
import com.datastax.oss.driver.api.core.CqlSession;
4+
import org.apache.spark.SparkConf;
5+
6+
public class GuardrailCheckJobSessionFactory implements IJobSessionFactory<SplitPartitions.Partition> {
7+
private static GuardrailCheckJobSession jobSession = null;
8+
9+
public AbstractJobSession<SplitPartitions.Partition> getInstance(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
10+
if (jobSession == null) {
11+
synchronized (GuardrailCheckJobSession.class) {
12+
if (jobSession == null) {
13+
jobSession = new GuardrailCheckJobSession(originSession, targetSession, sc);
14+
}
15+
}
16+
}
17+
return jobSession;
18+
}
19+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.datastax.cdm.job;
2+
3+
import com.datastax.oss.driver.api.core.CqlSession;
4+
import org.apache.spark.SparkConf;
5+
6+
public interface IJobSessionFactory<T> {
7+
AbstractJobSession<T> getInstance(CqlSession originSession, CqlSession targetSession, SparkConf sc);
8+
}

0 commit comments

Comments
 (0)