Skip to content

Commit 8dd04d0

Browse files
committed
Merge remote-tracking branch 'origin/main' into issue/CDM-67
# Conflicts: # src/main/java/com/datastax/cdm/job/CopyJobSession.java
2 parents ecba107 + 4528363 commit 8dd04d0

File tree

10 files changed

+209
-67
lines changed

10 files changed

+209
-67
lines changed

PERF/cdm-v3.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ spark.target.autocorrect.missing false
1818
spark.target.autocorrect.mismatch false
1919

2020
# Read & Write rate-limits(rows/second). Higher value will improve performance and put more load on cluster
21-
spark.readRateLimit 20000
22-
spark.writeRateLimit 20000
21+
spark.readRateLimit 5000
22+
spark.writeRateLimit 5000
2323

2424
# Used to split Cassandra token-range into slices and migrate random slices one at a time
2525
# 10K splits usually works for tables up to 100GB (uncompressed) with balanced token distribution

PERF/cdm-v4.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ spark.cdm.autocorrect.mismatch false
172172
#-----------------------------------------------------------------------------------------------------------
173173
spark.cdm.perfops.numParts 10000
174174
spark.cdm.perfops.batchSize 5
175-
spark.cdm.perfops.readRateLimit 20000
176-
spark.cdm.perfops.writeRateLimit 40000
175+
spark.cdm.perfops.readRateLimit 5000
176+
spark.cdm.perfops.writeRateLimit 5000
177177
#spark.cdm.perfops.consistency.read LOCAL_QUORUM
178178
#spark.cdm.perfops.consistency.write LOCAL_QUORUM
179179
#spark.cdm.perfops.printStatsAfter 100000

PERF/perf-iot.yaml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ params:
1212

1313
bindings:
1414
device_id: Mod(<<sources:10000>>); ToHashedUUID() -> java.util.UUID
15-
reading_date: Uniform(<<minDate:20220101>>,<<maxDate:20221231>>);LongToLocalDateDays()
15+
reading_date: StartingEpochMillis('2022-01-01 00:00:00'); AddHashRange(0L,31536000000L); EpochMillisToCqlLocalDate();
1616
reading_time: Uniform(0,86400) -> int
1717
settings: MapSized(3, Combinations('A-Z;0-9'), ToString(), ToString())
1818
alerts: ListSizedHashed(HashRange(1,5),ToString()));
@@ -66,6 +66,4 @@ blocks:
6666
insert-load: |
6767
insert into devices.sensor_data
6868
(device_id, reading_date, reading_time, settings, device_location, alerts, temperature, humidity, pressure, wind_speed)
69-
values ({device_id}, {reading_date}, {reading_time}, {settings}, {latitude:{latitude}, longitude:{longitude}}, {alerts}, {temperature}, {humidity}, {pressure}, {wind_speed})
70-
71-
69+
values ({device_id}, {reading_date}, {reading_time}, {settings}, {latitude:{latitude}, longitude:{longitude}}, {alerts}, {temperature}, {humidity}, {pressure}, {wind_speed})

PERF/testing.txt

Lines changed: 64 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11

2-
1. Created AWS t2.2xlarge instance running AWS Linux
2+
1. Created AWS t2.2xlarge instance running AWS Linux (8 vCPU, 16 GB memory)
33
2. scp -i ~/.ssh/aws-oregon-phil-miesle.pem ./perf-iot.yaml [email protected]:perf-iot.yaml
4+
scp -i ~/.ssh/aws-oregon-phil-miesle.pem ../target/cassandra-data-migrator-4.0.0-SNAPSHOT.jar [email protected]:cassandra-data-migrator-4.0.0-SNAPSHOT.jar
45
scp -i ~/.ssh/aws-oregon-phil-miesle.pem ./cdm-v3.properties [email protected]:cdm-v3.properties
56
scp -i ~/.ssh/aws-oregon-phil-miesle.pem ./cdm-v4.properties [email protected]:cdm-v4.properties
67
3. ssh -i ~/.ssh/aws-oregon-phil-miesle.pem [email protected]
@@ -19,55 +20,88 @@
1920
ln -s squashfs-root/AppRun ./nb5
2021
./nb5 --version
2122
f. ./nb5 perf-iot --progress console:5s load-cycles=1000000 host=localhost port=9042 localdc=datacenter1
22-
./nb5 perf-iot default.schema host=localhost port=9043 localdc=datacenter1
2323

2424
cqlsh> select count(*) from devices.sensor_data;
2525

2626
count
2727
--------
28-
999417
28+
999996
2929

3030
(1 rows)
3131

32+
./nb5 perf-iot default.schema host=localhost port=9043 localdc=datacenter1
33+
3234
g. docker run --name cdm --network cassandra --ip 172.16.242.4 -d datastax/cassandra-data-migrator:3.4.4
33-
docker cp cdm-v3.properties cdm:cdm-v3.properties
3435

35-
h. docker exec -it cdm bash
36+
4. ssh -i ~/.ssh/aws-oregon-phil-miesle.pem [email protected]
37+
a. docker cp cdm-v3.properties cdm:cdm-v3.properties
38+
b. docker exec -it cdm bash
3639
cqlsh cass-target -e 'TRUNCATE TABLE devices.sensor_data'
3740

3841
time spark-submit --properties-file cdm-v3.properties \
39-
--conf spark.executor.cores=4 \
42+
--conf spark.executor.cores=2 \
4043
--conf spark.origin.keyspaceTable="devices.sensor_data" \
4144
--master "local[*]" --class datastax.astra.migrate.Migrate \
4245
/assets/cassandra-data-migrator-3.4.4.jar &> cdm-v3-Migrate_$(date +%Y%m%d_%H_%M).txt
4346

44-
real 1m2.661s
45-
user 1m50.624s
46-
sys 0m15.862s
47-
48-
################################################################################################
49-
ThreadID: 1 Final Read Record Count: 999417
50-
ThreadID: 1 Final Skipped Record Count: 0
51-
ThreadID: 1 Final Write Record Count: 999417
52-
ThreadID: 1 Final Error Record Count: 0
53-
################################################################################################
47+
23/05/26 13:05:15 INFO DAGScheduler: Job 1 finished: foreach at Migrate.scala:26, took 201.858469 s
48+
23/05/26 13:05:15 INFO CopyJobSession: ################################################################################################
49+
23/05/26 13:05:15 INFO CopyJobSession: ThreadID: 1 Final Read Record Count: 999996
50+
23/05/26 13:05:15 INFO CopyJobSession: ThreadID: 1 Final Skipped Record Count: 0
51+
23/05/26 13:05:15 INFO CopyJobSession: ThreadID: 1 Final Write Record Count: 999996
52+
23/05/26 13:05:15 INFO CopyJobSession: ThreadID: 1 Final Error Record Count: 0
53+
23/05/26 13:05:15 INFO CopyJobSession: ################################################################################################
5454

5555
time spark-submit --properties-file cdm-v3.properties \
56-
--conf spark.executor.cores=4 \
56+
--conf spark.executor.cores=2 \
5757
--conf spark.origin.keyspaceTable="devices.sensor_data" \
5858
--master "local[*]" --class datastax.astra.migrate.DiffData \
5959
/assets/cassandra-data-migrator-3.4.4.jar &> cdm-v3-DiffData_$(date +%Y%m%d_%H_%M).txt
6060

61-
real 1m3.673s
62-
user 2m14.517s
63-
sys 0m36.715s
64-
65-
################################################################################################
66-
ThreadID: 1 Final Read Record Count: 999417
67-
ThreadID: 1 Final Mismatch Record Count: 0
68-
ThreadID: 1 Final Corrected Mismatch Record Count: 0
69-
ThreadID: 1 Final Missing Record Count: 0
70-
ThreadID: 1 Final Corrected Missing Record Count: 0
71-
ThreadID: 1 Final Valid Record Count: 977053
72-
ThreadID: 1 Final Skipped Record Count: 0
73-
################################################################################################
61+
23/05/26 13:11:51 INFO DAGScheduler: Job 1 finished: foreach at DiffData.scala:24, took 201.887353 s
62+
23/05/26 13:11:51 INFO DiffJobSession: ################################################################################################
63+
23/05/26 13:11:51 INFO DiffJobSession: ThreadID: 1 Final Read Record Count: 999996
64+
23/05/26 13:11:51 INFO DiffJobSession: ThreadID: 1 Final Mismatch Record Count: 0
65+
23/05/26 13:11:51 INFO DiffJobSession: ThreadID: 1 Final Corrected Mismatch Record Count: 0
66+
23/05/26 13:11:51 INFO DiffJobSession: ThreadID: 1 Final Missing Record Count: 0
67+
23/05/26 13:11:51 INFO DiffJobSession: ThreadID: 1 Final Corrected Missing Record Count: 0
68+
23/05/26 13:11:51 INFO DiffJobSession: ThreadID: 1 Final Valid Record Count: 999996
69+
23/05/26 13:11:51 INFO DiffJobSession: ThreadID: 1 Final Skipped Record Count: 0
70+
23/05/26 13:11:51 INFO DiffJobSession: ################################################################################################
71+
72+
73+
c. docker cp cdm-v4.properties cdm:cdm-v4.properties
74+
docker cp cassandra-data-migrator-4.0.0-SNAPSHOT.jar cdm:cassandra-data-migrator-4.0.0-SNAPSHOT.jar
75+
76+
d. docker exec -it cdm bash
77+
cqlsh cass-target -e 'TRUNCATE TABLE devices.sensor_data'
78+
79+
time spark-submit --properties-file cdm-v4.properties \
80+
--conf spark.executor.cores=2 \
81+
--master "local[*]" --class com.datastax.cdm.job.Migrate \
82+
/cassandra-data-migrator-4.0.0-SNAPSHOT.jar &> cdm-v4-Migrate_$(date +%Y%m%d_%H_%M).txt
83+
84+
23/05/26 16:46:52 INFO DAGScheduler: Job 1 finished: foreach at Migrate.scala:9, took 200.206043 s
85+
23/05/26 16:46:52 INFO CopyJobSession: ################################################################################################
86+
23/05/26 16:46:52 INFO CopyJobSession: ThreadID: 1 Final Read Record Count: 999996
87+
23/05/26 16:46:52 INFO CopyJobSession: ThreadID: 1 Final Skipped Record Count: 0
88+
23/05/26 16:46:52 INFO CopyJobSession: ThreadID: 1 Final Write Record Count: 999996
89+
23/05/26 16:46:52 INFO CopyJobSession: ThreadID: 1 Final Error Record Count: 0
90+
23/05/26 16:46:52 INFO CopyJobSession: ################################################################################################
91+
92+
93+
time spark-submit --properties-file cdm-v4.properties \
94+
--conf spark.executor.cores=2 \
95+
--master "local[*]" --class com.datastax.cdm.job.DiffData \
96+
/cassandra-data-migrator-4.0.0-SNAPSHOT.jar &> cdm-v4-DiffData_$(date +%Y%m%d_%H_%M).txt
97+
98+
23/05/26 16:51:33 INFO DAGScheduler: Job 1 finished: foreach at DiffData.scala:9, took 200.214341 s
99+
23/05/26 16:51:33 INFO DiffJobSession: ################################################################################################
100+
23/05/26 16:51:33 INFO DiffJobSession: ThreadID: 1 Final Read Record Count: 999996
101+
23/05/26 16:51:33 INFO DiffJobSession: ThreadID: 1 Final Mismatch Record Count: 0
102+
23/05/26 16:51:33 INFO DiffJobSession: ThreadID: 1 Final Corrected Mismatch Record Count: 0
103+
23/05/26 16:51:33 INFO DiffJobSession: ThreadID: 1 Final Missing Record Count: 0
104+
23/05/26 16:51:33 INFO DiffJobSession: ThreadID: 1 Final Corrected Missing Record Count: 0
105+
23/05/26 16:51:33 INFO DiffJobSession: ThreadID: 1 Final Valid Record Count: 999992
106+
23/05/26 16:51:33 INFO DiffJobSession: ThreadID: 1 Final Skipped Record Count: 0
107+
23/05/26 16:51:33 INFO DiffJobSession: ################################################################################################

src/main/java/com/datastax/cdm/feature/WritetimeTTL.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ private void validateTTLColumns(CqlTable originTable) {
213213
}
214214
}
215215

216-
newColumnNames.add("TTL(" + ttlName + ")");
216+
newColumnNames.add("TTL(" + CqlTable.formatName(ttlName) + ")");
217217
newColumnDataTypes.add(DataTypes.INT);
218218
}
219219

@@ -225,7 +225,7 @@ private void validateTTLColumns(CqlTable originTable) {
225225
}
226226

227227
private void validateWritetimeColumns(CqlTable originTable) {
228-
if (writetimeNames == null || writetimeNames.isEmpty()) {
228+
if (writetimeNames == null || writetimeNames.isEmpty() || customWritetime > 0) {
229229
return;
230230
}
231231

@@ -245,7 +245,7 @@ private void validateWritetimeColumns(CqlTable originTable) {
245245
}
246246
}
247247

248-
newColumnNames.add("WRITETIME(" + writetimeName + ")");
248+
newColumnNames.add("WRITETIME(" + CqlTable.formatName(writetimeName) + ")");
249249
newColumnDataTypes.add(DataTypes.BIGINT);
250250
}
251251

src/main/java/com/datastax/cdm/job/CopyJobSession.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ public class CopyJobSession extends AbstractJobSession<SplitPartitions.Partition
3434
private Integer batchSize;
3535
private final Integer fetchSize;
3636

37-
private BatchStatement batch;
38-
3937
protected CopyJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
4038
super(originSession, targetSession, sc);
4139

@@ -44,8 +42,6 @@ protected CopyJobSession(CqlSession originSession, CqlSession targetSession, Spa
4442
fetchSize = this.originSession.getCqlTable().getFetchSizeInRows();
4543
batchSize = this.originSession.getCqlTable().getBatchSize();
4644

47-
batch = BatchStatement.newInstance(BatchType.UNLOGGED);
48-
4945
logger.info("CQL -- origin select: {}",this.originSession.getOriginSelectByPartitionRangeStatement().getCQL());
5046
logger.info("CQL -- target select: {}",this.targetSession.getTargetSelectByPKStatement().getCQL());
5147
logger.info("CQL -- target upsert: {}",this.targetSession.getTargetUpsertStatement().getCQL());
@@ -59,6 +55,7 @@ public void processSlice(SplitPartitions.Partition slice) {
5955
public void getDataAndInsert(BigInteger min, BigInteger max) {
6056
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min,max));
6157
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
58+
BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED);
6259
boolean done = false;
6360
int maxAttempts = maxRetries + 1;
6461
String guardrailCheck;
@@ -105,18 +102,18 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
105102
}
106103

107104
targetLimiter.acquire(1);
108-
writeAsync(writeResults, boundUpsert);
105+
batch = writeAsync(batch, writeResults, boundUpsert);
109106
unflushedWrites++;
110107

111108
if (unflushedWrites > fetchSize) {
112-
flushAndClearWrites(writeResults);
109+
flushAndClearWrites(batch, writeResults);
113110
flushedWriteCnt += unflushedWrites;
114111
unflushedWrites = 0;
115112
}
116113
}
117114
}
118115

119-
flushAndClearWrites(writeResults);
116+
flushAndClearWrites(batch, writeResults);
120117
flushedWriteCnt += unflushedWrites;
121118

122119
readCounter.addAndGet(readCnt);
@@ -155,7 +152,7 @@ public synchronized void printCounts(boolean isFinal) {
155152
}
156153
}
157154

158-
private void flushAndClearWrites(Collection<CompletionStage<AsyncResultSet>> writeResults) throws Exception {
155+
private void flushAndClearWrites(BatchStatement batch, Collection<CompletionStage<AsyncResultSet>> writeResults) throws Exception {
159156
if (batch.size() > 0) {
160157
writeResults.add(targetUpsertStatement.executeAsync(batch));
161158
}
@@ -177,16 +174,18 @@ private BoundStatement bind(Record r) {
177174
return targetUpsertStatement.bindRecord(r);
178175
}
179176

180-
private void writeAsync(Collection<CompletionStage<AsyncResultSet>> writeResults, BoundStatement boundUpsert) {
177+
private BatchStatement writeAsync(BatchStatement batch, Collection<CompletionStage<AsyncResultSet>> writeResults, BoundStatement boundUpsert) {
181178
if (batchSize > 1) {
182179
batch = batch.add(boundUpsert);
183180
if (batch.size() >= batchSize) {
184181
writeResults.add(targetUpsertStatement.executeAsync(batch));
185-
batch = BatchStatement.newInstance(BatchType.UNLOGGED);
182+
return BatchStatement.newInstance(BatchType.UNLOGGED);
186183
}
184+
return batch;
187185
}
188186
else {
189187
writeResults.add(targetUpsertStatement.executeAsync(boundUpsert));
188+
return batch;
190189
}
191190
}
192191

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,28 @@
11
package com.datastax.cdm.schema;
22

3-
import com.datastax.oss.driver.api.core.type.DataType;
43
import com.datastax.cdm.data.CqlConversion;
4+
import com.datastax.cdm.properties.IPropertyHelper;
55
import com.datastax.cdm.properties.KnownProperties;
6-
import com.datastax.cdm.properties.PropertyHelper;
6+
import com.datastax.oss.driver.api.core.type.DataType;
7+
import org.apache.commons.lang3.StringUtils;
78

9+
import javax.validation.constraints.NotNull;
810
import java.util.List;
911

1012
public class BaseTable implements Table {
11-
protected final PropertyHelper propertyHelper;
13+
protected final IPropertyHelper propertyHelper;
1214
protected final boolean isOrigin;
13-
1415
protected String keyspaceName;
1516
protected String tableName;
1617
protected List<String> columnNames;
1718
protected List<DataType> columnCqlTypes;
1819
protected List<CqlConversion> cqlConversions;
1920

20-
public BaseTable(PropertyHelper propertyHelper, boolean isOrigin) {
21+
public BaseTable(IPropertyHelper propertyHelper, boolean isOrigin) {
2122
this.propertyHelper = propertyHelper;
2223
this.isOrigin = isOrigin;
2324

24-
String keyspaceTableString = (this.isOrigin ? propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE) : propertyHelper.getString(KnownProperties.TARGET_KEYSPACE_TABLE)).trim();
25+
String keyspaceTableString = getKeyspaceTableAsString(propertyHelper, isOrigin);
2526
if (keyspaceTableString.contains(".")) {
2627
String[] keyspaceTable = keyspaceTableString.split("\\.");
2728
this.keyspaceName = keyspaceTable[0];
@@ -32,11 +33,47 @@ public BaseTable(PropertyHelper propertyHelper, boolean isOrigin) {
3233
}
3334
}
3435

35-
public String getKeyspaceName() {return this.keyspaceName;}
36-
public String getTableName() {return this.tableName;}
37-
public String getKeyspaceTable() {return this.keyspaceName + "." + this.tableName;}
38-
public List<String> getColumnNames(boolean format) { return this.columnNames; }
39-
public List<DataType> getColumnCqlTypes() {return this.columnCqlTypes;}
40-
public List<CqlConversion> getConversions() {return this.cqlConversions;}
41-
public boolean isOrigin() {return this.isOrigin;}
42-
}
36+
@NotNull
37+
private String getKeyspaceTableAsString(IPropertyHelper propertyHelper, boolean isOrigin) {
38+
String keyspaceTableString = (isOrigin ? propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE) :
39+
propertyHelper.getString(KnownProperties.TARGET_KEYSPACE_TABLE));
40+
41+
// Use origin keyspaceTable property if target not specified
42+
if (!isOrigin && StringUtils.isBlank(keyspaceTableString)) {
43+
keyspaceTableString = propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE);
44+
}
45+
if (StringUtils.isBlank(keyspaceTableString)) {
46+
throw new RuntimeException("Value for required property " + KnownProperties.ORIGIN_KEYSPACE_TABLE + " not provided!!");
47+
}
48+
49+
return keyspaceTableString.trim();
50+
}
51+
52+
public String getKeyspaceName() {
53+
return this.keyspaceName;
54+
}
55+
56+
public String getTableName() {
57+
return this.tableName;
58+
}
59+
60+
public String getKeyspaceTable() {
61+
return this.keyspaceName + "." + this.tableName;
62+
}
63+
64+
public List<String> getColumnNames(boolean format) {
65+
return this.columnNames;
66+
}
67+
68+
public List<DataType> getColumnCqlTypes() {
69+
return this.columnCqlTypes;
70+
}
71+
72+
public List<CqlConversion> getConversions() {
73+
return this.cqlConversions;
74+
}
75+
76+
public boolean isOrigin() {
77+
return this.isOrigin;
78+
}
79+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package com.datastax.cdm.schema;
22

33
import com.datastax.oss.driver.api.core.type.DataType;
4+
45
import java.util.List;
56

67
public interface Table {
78
String getKeyspaceName();
9+
810
String getTableName();
11+
912
List<String> getColumnNames(boolean asCql);
13+
1014
List<DataType> getColumnCqlTypes();
1115
}

0 commit comments

Comments
 (0)