Skip to content

Commit 42a7148

Browse files
authored
Refactor only change, see below for details (#326)
* Made Partition into its own class & refactored stuff to make that work * Made CounterUnit its own class & refactored JobCounter to work with it. * Made JobType (Migrate, Validate & Guardrail) independent of track-run feature and renamed slices/partitions to PartitionRanges. Also provided actual jobs access to PartitionRange class.
1 parent e9bb7e0 commit 42a7148

21 files changed

+134
-67
lines changed

src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import org.slf4j.LoggerFactory;
2626

2727
import com.datastax.cdm.feature.TrackRun;
28-
import com.datastax.cdm.feature.TrackRun.RUN_TYPE;
29-
import com.datastax.cdm.job.Partition;
28+
import com.datastax.cdm.job.IJobSessionFactory.JobType;
29+
import com.datastax.cdm.job.PartitionRange;
3030
import com.datastax.cdm.job.RunNotStartedException;
3131
import com.datastax.oss.driver.api.core.CqlSession;
3232
import com.datastax.oss.driver.api.core.cql.BoundStatement;
@@ -87,7 +87,7 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable)
8787
+ " WHERE table_name = ? AND run_id = ? AND status = ? ALLOW FILTERING");
8888
}
8989

90-
public Collection<Partition> getPendingPartitions(long prevRunId) throws RunNotStartedException {
90+
public Collection<PartitionRange> getPendingPartitions(long prevRunId) throws RunNotStartedException {
9191
if (prevRunId == 0) {
9292
return Collections.emptyList();
9393
}
@@ -104,7 +104,7 @@ public Collection<Partition> getPendingPartitions(long prevRunId) throws RunNotS
104104
}
105105
}
106106

107-
final Collection<Partition> pendingParts = new ArrayList<Partition>();
107+
final Collection<PartitionRange> pendingParts = new ArrayList<PartitionRange>();
108108
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.NOT_STARTED.toString()));
109109
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.STARTED.toString()));
110110
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.FAIL.toString()));
@@ -113,35 +113,35 @@ public Collection<Partition> getPendingPartitions(long prevRunId) throws RunNotS
113113
return pendingParts;
114114
}
115115

116-
protected Collection<Partition> getPartitionsByStatus(long prevRunId, String status) {
116+
protected Collection<PartitionRange> getPartitionsByStatus(long prevRunId, String status) {
117117
ResultSet rs = session.execute(boundSelectStatement.setString("table_name", tableName)
118118
.setLong("run_id", prevRunId).setString("status", status));
119119

120-
final Collection<Partition> pendingParts = new ArrayList<Partition>();
120+
final Collection<PartitionRange> pendingParts = new ArrayList<PartitionRange>();
121121
rs.forEach(row -> {
122-
Partition part = new Partition(BigInteger.valueOf(row.getLong("token_min")),
122+
PartitionRange part = new PartitionRange(BigInteger.valueOf(row.getLong("token_min")),
123123
BigInteger.valueOf(row.getLong("token_max")));
124124
pendingParts.add(part);
125125
});
126126
return pendingParts;
127127
}
128128

129-
public void initCdmRun(long runId, long prevRunId, Collection<Partition> parts, RUN_TYPE runType) {
129+
public void initCdmRun(long runId, long prevRunId, Collection<PartitionRange> parts, JobType jobType) {
130130
ResultSet rsInfo = session
131131
.execute(boundSelectInfoStatement.setString("table_name", tableName).setLong("run_id", runId));
132132
if (null != rsInfo.one()) {
133133
throw new RuntimeException("Run id " + runId + " already exists for table " + tableName);
134134
}
135135
session.execute(boundInitInfoStatement.setString("table_name", tableName).setLong("run_id", runId)
136-
.setString("run_type", runType.toString()).setLong("prev_run_id", prevRunId)
136+
.setString("run_type", jobType.toString()).setLong("prev_run_id", prevRunId)
137137
.setString("status", TrackRun.RUN_STATUS.NOT_STARTED.toString()));
138138
parts.forEach(part -> initCdmRun(runId, part));
139139
session.execute(boundInitInfoStatement.setString("table_name", tableName).setLong("run_id", runId)
140-
.setString("run_type", runType.toString()).setLong("prev_run_id", prevRunId)
140+
.setString("run_type", jobType.toString()).setLong("prev_run_id", prevRunId)
141141
.setString("status", TrackRun.RUN_STATUS.STARTED.toString()));
142142
}
143143

144-
private void initCdmRun(long runId, Partition partition) {
144+
private void initCdmRun(long runId, PartitionRange partition) {
145145
session.execute(boundInitStatement.setString("table_name", tableName).setLong("run_id", runId)
146146
.setLong("token_min", partition.getMin().longValue())
147147
.setLong("token_max", partition.getMax().longValue())

src/main/java/com/datastax/cdm/feature/TrackRun.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,12 @@
2222
import org.slf4j.LoggerFactory;
2323

2424
import com.datastax.cdm.cql.statement.TargetUpsertRunDetailsStatement;
25-
import com.datastax.cdm.job.Partition;
25+
import com.datastax.cdm.job.IJobSessionFactory.JobType;
26+
import com.datastax.cdm.job.PartitionRange;
2627
import com.datastax.cdm.job.RunNotStartedException;
2728
import com.datastax.oss.driver.api.core.CqlSession;
2829

2930
public class TrackRun {
30-
public enum RUN_TYPE {
31-
MIGRATE, DIFF_DATA
32-
}
33-
3431
public enum RUN_STATUS {
3532
NOT_STARTED, STARTED, PASS, FAIL, DIFF, DIFF_CORRECTED, ENDED
3633
}
@@ -42,15 +39,15 @@ public TrackRun(CqlSession session, String keyspaceTable) {
4239
this.runStatement = new TargetUpsertRunDetailsStatement(session, keyspaceTable);
4340
}
4441

45-
public Collection<Partition> getPendingPartitions(long prevRunId) throws RunNotStartedException {
46-
Collection<Partition> pendingParts = runStatement.getPendingPartitions(prevRunId);
42+
public Collection<PartitionRange> getPendingPartitions(long prevRunId) throws RunNotStartedException {
43+
Collection<PartitionRange> pendingParts = runStatement.getPendingPartitions(prevRunId);
4744
logger.info("###################### {} partitions pending from previous run id {} ######################",
4845
pendingParts.size(), prevRunId);
4946
return pendingParts;
5047
}
5148

52-
public void initCdmRun(long runId, long prevRunId, Collection<Partition> parts, RUN_TYPE runType) {
53-
runStatement.initCdmRun(runId, prevRunId, parts, runType);
49+
public void initCdmRun(long runId, long prevRunId, Collection<PartitionRange> parts, JobType jobType) {
50+
runStatement.initCdmRun(runId, prevRunId, parts, jobType);
5451
logger.info("###################### Run Id for this job is: {} ######################", runId);
5552
}
5653

src/main/java/com/datastax/cdm/job/AbstractJobSession.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package com.datastax.cdm.job;
1717

18-
import java.math.BigInteger;
1918
import java.util.Collection;
2019

2120
import org.slf4j.Logger;
@@ -28,6 +27,7 @@
2827
import com.datastax.cdm.feature.Featureset;
2928
import com.datastax.cdm.feature.Guardrail;
3029
import com.datastax.cdm.feature.TrackRun;
30+
import com.datastax.cdm.job.IJobSessionFactory.JobType;
3131
import com.datastax.cdm.properties.KnownProperties;
3232
import com.datastax.cdm.properties.PropertyHelper;
3333
import com.datastax.cdm.schema.CqlTable;
@@ -110,20 +110,20 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
110110
}
111111
}
112112

113-
public void processSlice(Partition slice, TrackRun trackRunFeature, long runId) {
113+
public void processPartitionRange(PartitionRange range, TrackRun trackRunFeature, long runId) {
114114
this.trackRunFeature = trackRunFeature;
115115
this.runId = runId;
116-
this.processSlice(slice.getMin(), slice.getMax());
116+
this.processPartitionRange(range);
117117
}
118118

119-
protected abstract void processSlice(BigInteger min, BigInteger max);
119+
protected abstract void processPartitionRange(PartitionRange range);
120120

121-
public synchronized void initCdmRun(long runId, long prevRunId, Collection<Partition> parts,
122-
TrackRun trackRunFeature, TrackRun.RUN_TYPE runType) {
121+
public synchronized void initCdmRun(long runId, long prevRunId, Collection<PartitionRange> parts,
122+
TrackRun trackRunFeature, JobType jobType) {
123123
this.runId = runId;
124124
this.trackRunFeature = trackRunFeature;
125125
if (null != trackRunFeature)
126-
trackRunFeature.initCdmRun(runId, prevRunId, parts, runType);
126+
trackRunFeature.initCdmRun(runId, prevRunId, parts, jobType);
127127
DataUtility.deleteGeneratedSCB(runId);
128128
}
129129

src/main/java/com/datastax/cdm/job/CopyJobSession.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import com.datastax.oss.driver.api.core.cql.ResultSet;
4040
import com.datastax.oss.driver.api.core.cql.Row;
4141

42-
public class CopyJobSession extends AbstractJobSession<Partition> {
42+
public class CopyJobSession extends AbstractJobSession<PartitionRange> {
4343

4444
private final PKFactory pkFactory;
4545
private final boolean isCounterTable;
@@ -64,7 +64,8 @@ protected CopyJobSession(CqlSession originSession, CqlSession targetSession, Pro
6464
logger.info("CQL -- target upsert: {}", this.targetSession.getTargetUpsertStatement().getCQL());
6565
}
6666

67-
protected void processSlice(BigInteger min, BigInteger max) {
67+
protected void processPartitionRange(PartitionRange range) {
68+
BigInteger min = range.getMin(), max = range.getMax();
6869
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max));
6970
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
7071
if (null != trackRunFeature)

src/main/java/com/datastax/cdm/job/CopyJobSessionFactory.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
import com.datastax.cdm.properties.PropertyHelper;
2121
import com.datastax.oss.driver.api.core.CqlSession;
2222

23-
public class CopyJobSessionFactory implements IJobSessionFactory<Partition>, Serializable {
23+
public class CopyJobSessionFactory implements IJobSessionFactory<PartitionRange>, Serializable {
2424
private static final long serialVersionUID = 5255029377029801421L;
2525
private static CopyJobSession jobSession = null;
2626

27-
public AbstractJobSession<Partition> getInstance(CqlSession originSession, CqlSession targetSession,
27+
public AbstractJobSession<PartitionRange> getInstance(CqlSession originSession, CqlSession targetSession,
2828
PropertyHelper propHelper) {
2929
if (jobSession == null) {
3030
synchronized (CopyJobSession.class) {
@@ -35,4 +35,8 @@ public AbstractJobSession<Partition> getInstance(CqlSession originSession, CqlSe
3535
}
3636
return jobSession;
3737
}
38+
39+
public JobType getJobType() {
40+
return JobType.MIGRATE;
41+
}
3842
}

src/main/java/com/datastax/cdm/job/DiffJobSession.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ public DiffJobSession(CqlSession originSession, CqlSession targetSession, Proper
117117
logger.info("CQL -- target upsert: {}", this.targetSession.getTargetUpsertStatement().getCQL());
118118
}
119119

120-
protected void processSlice(BigInteger min, BigInteger max) {
120+
protected void processPartitionRange(PartitionRange range) {
121+
BigInteger min = range.getMin(), max = range.getMax();
121122
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max));
122123
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
123124
if (null != trackRunFeature)

src/main/java/com/datastax/cdm/job/DiffJobSessionFactory.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
import com.datastax.cdm.properties.PropertyHelper;
2121
import com.datastax.oss.driver.api.core.CqlSession;
2222

23-
public class DiffJobSessionFactory implements IJobSessionFactory<Partition>, Serializable {
23+
public class DiffJobSessionFactory implements IJobSessionFactory<PartitionRange>, Serializable {
2424
private static final long serialVersionUID = -3543616512495020278L;
2525
private static DiffJobSession jobSession = null;
2626

27-
public AbstractJobSession<Partition> getInstance(CqlSession originSession, CqlSession targetSession,
27+
public AbstractJobSession<PartitionRange> getInstance(CqlSession originSession, CqlSession targetSession,
2828
PropertyHelper propHelper) {
2929
if (jobSession == null) {
3030
synchronized (DiffJobSession.class) {
@@ -35,4 +35,8 @@ public AbstractJobSession<Partition> getInstance(CqlSession originSession, CqlSe
3535
}
3636
return jobSession;
3737
}
38+
39+
public JobType getJobType() {
40+
return JobType.VALIDATE;
41+
}
3842
}

src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import com.datastax.oss.driver.api.core.cql.ResultSet;
2828
import com.datastax.oss.driver.api.core.cql.Row;
2929

30-
public class GuardrailCheckJobSession extends AbstractJobSession<Partition> {
30+
public class GuardrailCheckJobSession extends AbstractJobSession<PartitionRange> {
3131

3232
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
3333

@@ -43,7 +43,8 @@ protected GuardrailCheckJobSession(CqlSession originSession, CqlSession targetSe
4343
logger.info("CQL -- origin select: {}", this.originSession.getOriginSelectByPartitionRangeStatement().getCQL());
4444
}
4545

46-
protected void processSlice(BigInteger min, BigInteger max) {
46+
protected void processPartitionRange(PartitionRange range) {
47+
BigInteger min = range.getMin(), max = range.getMax();
4748
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max));
4849
try {
4950
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);

src/main/java/com/datastax/cdm/job/GuardrailCheckJobSessionFactory.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
import com.datastax.cdm.properties.PropertyHelper;
2121
import com.datastax.oss.driver.api.core.CqlSession;
2222

23-
public class GuardrailCheckJobSessionFactory implements IJobSessionFactory<Partition>, Serializable {
23+
public class GuardrailCheckJobSessionFactory implements IJobSessionFactory<PartitionRange>, Serializable {
2424
private static final long serialVersionUID = -4673384128807660843L;
2525
private static GuardrailCheckJobSession jobSession = null;
2626

27-
public AbstractJobSession<Partition> getInstance(CqlSession originSession, CqlSession targetSession,
27+
public AbstractJobSession<PartitionRange> getInstance(CqlSession originSession, CqlSession targetSession,
2828
PropertyHelper propHelper) {
2929
if (jobSession == null) {
3030
synchronized (GuardrailCheckJobSession.class) {
@@ -35,4 +35,8 @@ public AbstractJobSession<Partition> getInstance(CqlSession originSession, CqlSe
3535
}
3636
return jobSession;
3737
}
38+
39+
public JobType getJobType() {
40+
return JobType.MIGRATE;
41+
}
3842
}

src/main/java/com/datastax/cdm/job/IJobSessionFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,11 @@
1919
import com.datastax.oss.driver.api.core.CqlSession;
2020

2121
public interface IJobSessionFactory<T> {
22+
public enum JobType {
23+
MIGRATE, VALIDATE, GUARDRAIL
24+
}
25+
2226
AbstractJobSession<T> getInstance(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper);
27+
28+
public JobType getJobType();
2329
}

0 commit comments

Comments
 (0)