Skip to content

Commit e9bb7e0

Browse files
authored
Refactor changes #1 (in prep for cluster-metrics change) (#325)
* Made Partition into its own class & refactored stuff to make that work * Made CounterUnit its own class & refactored JobCounter to work with it.
1 parent d36afdc commit e9bb7e0

16 files changed

+127
-88
lines changed

src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,8 @@
2626

2727
import com.datastax.cdm.feature.TrackRun;
2828
import com.datastax.cdm.feature.TrackRun.RUN_TYPE;
29+
import com.datastax.cdm.job.Partition;
2930
import com.datastax.cdm.job.RunNotStartedException;
30-
import com.datastax.cdm.job.SplitPartitions;
31-
import com.datastax.cdm.job.SplitPartitions.Partition;
3231
import com.datastax.oss.driver.api.core.CqlSession;
3332
import com.datastax.oss.driver.api.core.cql.BoundStatement;
3433
import com.datastax.oss.driver.api.core.cql.ResultSet;
@@ -88,7 +87,7 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable)
8887
+ " WHERE table_name = ? AND run_id = ? AND status = ? ALLOW FILTERING");
8988
}
9089

91-
public Collection<SplitPartitions.Partition> getPendingPartitions(long prevRunId) throws RunNotStartedException {
90+
public Collection<Partition> getPendingPartitions(long prevRunId) throws RunNotStartedException {
9291
if (prevRunId == 0) {
9392
return Collections.emptyList();
9493
}
@@ -105,7 +104,7 @@ public Collection<SplitPartitions.Partition> getPendingPartitions(long prevRunId
105104
}
106105
}
107106

108-
final Collection<SplitPartitions.Partition> pendingParts = new ArrayList<SplitPartitions.Partition>();
107+
final Collection<Partition> pendingParts = new ArrayList<Partition>();
109108
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.NOT_STARTED.toString()));
110109
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.STARTED.toString()));
111110
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.FAIL.toString()));
@@ -114,11 +113,11 @@ public Collection<SplitPartitions.Partition> getPendingPartitions(long prevRunId
114113
return pendingParts;
115114
}
116115

117-
protected Collection<SplitPartitions.Partition> getPartitionsByStatus(long prevRunId, String status) {
116+
protected Collection<Partition> getPartitionsByStatus(long prevRunId, String status) {
118117
ResultSet rs = session.execute(boundSelectStatement.setString("table_name", tableName)
119118
.setLong("run_id", prevRunId).setString("status", status));
120119

121-
final Collection<SplitPartitions.Partition> pendingParts = new ArrayList<SplitPartitions.Partition>();
120+
final Collection<Partition> pendingParts = new ArrayList<Partition>();
122121
rs.forEach(row -> {
123122
Partition part = new Partition(BigInteger.valueOf(row.getLong("token_min")),
124123
BigInteger.valueOf(row.getLong("token_max")));
@@ -127,7 +126,7 @@ protected Collection<SplitPartitions.Partition> getPartitionsByStatus(long prevR
127126
return pendingParts;
128127
}
129128

130-
public void initCdmRun(long runId, long prevRunId, Collection<SplitPartitions.Partition> parts, RUN_TYPE runType) {
129+
public void initCdmRun(long runId, long prevRunId, Collection<Partition> parts, RUN_TYPE runType) {
131130
ResultSet rsInfo = session
132131
.execute(boundSelectInfoStatement.setString("table_name", tableName).setLong("run_id", runId));
133132
if (null != rsInfo.one()) {

src/main/java/com/datastax/cdm/feature/TrackRun.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import org.slf4j.LoggerFactory;
2323

2424
import com.datastax.cdm.cql.statement.TargetUpsertRunDetailsStatement;
25+
import com.datastax.cdm.job.Partition;
2526
import com.datastax.cdm.job.RunNotStartedException;
26-
import com.datastax.cdm.job.SplitPartitions;
2727
import com.datastax.oss.driver.api.core.CqlSession;
2828

2929
public class TrackRun {
@@ -42,14 +42,14 @@ public TrackRun(CqlSession session, String keyspaceTable) {
4242
this.runStatement = new TargetUpsertRunDetailsStatement(session, keyspaceTable);
4343
}
4444

45-
public Collection<SplitPartitions.Partition> getPendingPartitions(long prevRunId) throws RunNotStartedException {
46-
Collection<SplitPartitions.Partition> pendingParts = runStatement.getPendingPartitions(prevRunId);
45+
public Collection<Partition> getPendingPartitions(long prevRunId) throws RunNotStartedException {
46+
Collection<Partition> pendingParts = runStatement.getPendingPartitions(prevRunId);
4747
logger.info("###################### {} partitions pending from previous run id {} ######################",
4848
pendingParts.size(), prevRunId);
4949
return pendingParts;
5050
}
5151

52-
public void initCdmRun(long runId, long prevRunId, Collection<SplitPartitions.Partition> parts, RUN_TYPE runType) {
52+
public void initCdmRun(long runId, long prevRunId, Collection<Partition> parts, RUN_TYPE runType) {
5353
runStatement.initCdmRun(runId, prevRunId, parts, runType);
5454
logger.info("###################### Run Id for this job is: {} ######################", runId);
5555
}

src/main/java/com/datastax/cdm/job/AbstractJobSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,15 +110,15 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
110110
}
111111
}
112112

113-
public void processSlice(SplitPartitions.Partition slice, TrackRun trackRunFeature, long runId) {
113+
public void processSlice(Partition slice, TrackRun trackRunFeature, long runId) {
114114
this.trackRunFeature = trackRunFeature;
115115
this.runId = runId;
116116
this.processSlice(slice.getMin(), slice.getMax());
117117
}
118118

119119
protected abstract void processSlice(BigInteger min, BigInteger max);
120120

121-
public synchronized void initCdmRun(long runId, long prevRunId, Collection<SplitPartitions.Partition> parts,
121+
public synchronized void initCdmRun(long runId, long prevRunId, Collection<Partition> parts,
122122
TrackRun trackRunFeature, TrackRun.RUN_TYPE runType) {
123123
this.runId = runId;
124124
this.trackRunFeature = trackRunFeature;

src/main/java/com/datastax/cdm/job/CopyJobSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import com.datastax.oss.driver.api.core.cql.ResultSet;
4040
import com.datastax.oss.driver.api.core.cql.Row;
4141

42-
public class CopyJobSession extends AbstractJobSession<SplitPartitions.Partition> {
42+
public class CopyJobSession extends AbstractJobSession<Partition> {
4343

4444
private final PKFactory pkFactory;
4545
private final boolean isCounterTable;

src/main/java/com/datastax/cdm/job/CopyJobSessionFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
import com.datastax.cdm.properties.PropertyHelper;
2121
import com.datastax.oss.driver.api.core.CqlSession;
2222

23-
public class CopyJobSessionFactory implements IJobSessionFactory<SplitPartitions.Partition>, Serializable {
23+
public class CopyJobSessionFactory implements IJobSessionFactory<Partition>, Serializable {
2424
private static final long serialVersionUID = 5255029377029801421L;
2525
private static CopyJobSession jobSession = null;
2626

27-
public AbstractJobSession<SplitPartitions.Partition> getInstance(CqlSession originSession, CqlSession targetSession,
27+
public AbstractJobSession<Partition> getInstance(CqlSession originSession, CqlSession targetSession,
2828
PropertyHelper propHelper) {
2929
if (jobSession == null) {
3030
synchronized (CopyJobSession.class) {
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.cdm.job;
17+
18+
import java.io.Serializable;
19+
import java.util.concurrent.atomic.AtomicLong;
20+
21+
public class CounterUnit implements Serializable {
22+
23+
private static final long serialVersionUID = 2194336948011681878L;
24+
private final AtomicLong globalCounter = new AtomicLong(0);
25+
private final transient ThreadLocal<Long> threadLocalCounter = ThreadLocal.withInitial(() -> 0L);
26+
27+
public void incrementThreadCounter(long incrementBy) {
28+
threadLocalCounter.set(threadLocalCounter.get() + incrementBy);
29+
}
30+
31+
public long getThreadCounter() {
32+
return threadLocalCounter.get();
33+
}
34+
35+
public void resetThreadCounter() {
36+
threadLocalCounter.set(0L);
37+
}
38+
39+
public void setGlobalCounter(long value) {
40+
globalCounter.set(value);
41+
}
42+
43+
public void addThreadToGlobalCounter() {
44+
globalCounter.addAndGet(threadLocalCounter.get());
45+
}
46+
47+
public long getGlobalCounter() {
48+
return globalCounter.get();
49+
}
50+
}

src/main/java/com/datastax/cdm/job/DiffJobSessionFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
import com.datastax.cdm.properties.PropertyHelper;
2121
import com.datastax.oss.driver.api.core.CqlSession;
2222

23-
public class DiffJobSessionFactory implements IJobSessionFactory<SplitPartitions.Partition>, Serializable {
23+
public class DiffJobSessionFactory implements IJobSessionFactory<Partition>, Serializable {
2424
private static final long serialVersionUID = -3543616512495020278L;
2525
private static DiffJobSession jobSession = null;
2626

27-
public AbstractJobSession<SplitPartitions.Partition> getInstance(CqlSession originSession, CqlSession targetSession,
27+
public AbstractJobSession<Partition> getInstance(CqlSession originSession, CqlSession targetSession,
2828
PropertyHelper propHelper) {
2929
if (jobSession == null) {
3030
synchronized (DiffJobSession.class) {

src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import com.datastax.oss.driver.api.core.cql.ResultSet;
2828
import com.datastax.oss.driver.api.core.cql.Row;
2929

30-
public class GuardrailCheckJobSession extends AbstractJobSession<SplitPartitions.Partition> {
30+
public class GuardrailCheckJobSession extends AbstractJobSession<Partition> {
3131

3232
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
3333

src/main/java/com/datastax/cdm/job/GuardrailCheckJobSessionFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
import com.datastax.cdm.properties.PropertyHelper;
2121
import com.datastax.oss.driver.api.core.CqlSession;
2222

23-
public class GuardrailCheckJobSessionFactory implements IJobSessionFactory<SplitPartitions.Partition>, Serializable {
23+
public class GuardrailCheckJobSessionFactory implements IJobSessionFactory<Partition>, Serializable {
2424
private static final long serialVersionUID = -4673384128807660843L;
2525
private static GuardrailCheckJobSession jobSession = null;
2626

27-
public AbstractJobSession<SplitPartitions.Partition> getInstance(CqlSession originSession, CqlSession targetSession,
27+
public AbstractJobSession<Partition> getInstance(CqlSession originSession, CqlSession targetSession,
2828
PropertyHelper propHelper) {
2929
if (jobSession == null) {
3030
synchronized (GuardrailCheckJobSession.class) {

src/main/java/com/datastax/cdm/job/JobCounter.java

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,17 @@
1515
*/
1616
package com.datastax.cdm.job;
1717

18+
import java.io.Serializable;
1819
import java.util.HashMap;
19-
import java.util.concurrent.atomic.AtomicLong;
2020

2121
import org.slf4j.Logger;
2222
import org.slf4j.LoggerFactory;
2323

2424
import com.datastax.cdm.feature.TrackRun;
2525

26-
public class JobCounter {
26+
public class JobCounter implements Serializable {
27+
28+
private static final long serialVersionUID = 7016816604237020549L;
2729

2830
// Enumeration for counter types
2931
public enum CounterType {
@@ -33,36 +35,6 @@ public enum CounterType {
3335
// Logger instance
3436
private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());
3537

36-
// Internal class to handle atomic counting operations
37-
private static class CounterUnit {
38-
private final AtomicLong globalCounter = new AtomicLong(0);
39-
private final ThreadLocal<Long> threadLocalCounter = ThreadLocal.withInitial(() -> 0L);
40-
41-
public void incrementThreadCounter(long incrementBy) {
42-
threadLocalCounter.set(threadLocalCounter.get() + incrementBy);
43-
}
44-
45-
public long getThreadCounter() {
46-
return threadLocalCounter.get();
47-
}
48-
49-
public void resetThreadCounter() {
50-
threadLocalCounter.set(0L);
51-
}
52-
53-
public void setGlobalCounter(long value) {
54-
globalCounter.set(value);
55-
}
56-
57-
public void addThreadToGlobalCounter() {
58-
globalCounter.addAndGet(threadLocalCounter.get());
59-
}
60-
61-
public long getGlobalCounter() {
62-
return globalCounter.get();
63-
}
64-
}
65-
6638
// Declare individual counters for different operations
6739
private final HashMap<CounterType, CounterUnit> counterMap = new HashMap<>();
6840

0 commit comments

Comments
 (0)