Skip to content

Commit 7ce675e

Browse files
mieslepmsmygit
andauthored
Issue/cdm 61 (#157)
* CDM-57 min/max partition defaults based on partitioner rather than config setting * CDM-57 fixing feature post-merge * CDM-61 : demonstrate performance equivalence * CDM-61 fixing rate limits to match what was tested * CDM-61 : moving line to simplify merge from CDM-67 --------- Co-authored-by: Phil Miesle <[email protected]> Co-authored-by: Madhavan <[email protected]>
1 parent c7759d2 commit 7ce675e

File tree

5 files changed

+81
-51
lines changed

5 files changed

+81
-51
lines changed

PERF/cdm-v3.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ spark.target.autocorrect.missing false
1818
spark.target.autocorrect.mismatch false
1919

2020
# Read & Write rate-limits(rows/second). Higher value will improve performance and put more load on cluster
21-
spark.readRateLimit 20000
22-
spark.writeRateLimit 20000
21+
spark.readRateLimit 5000
22+
spark.writeRateLimit 5000
2323

2424
# Used to split Cassandra token-range into slices and migrate random slices one at a time
2525
# 10K splits usually works for tables up to 100GB (uncompressed) with balanced token distribution

PERF/cdm-v4.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ spark.cdm.autocorrect.mismatch false
172172
#-----------------------------------------------------------------------------------------------------------
173173
spark.cdm.perfops.numParts 10000
174174
spark.cdm.perfops.batchSize 5
175-
spark.cdm.perfops.readRateLimit 20000
176-
spark.cdm.perfops.writeRateLimit 40000
175+
spark.cdm.perfops.readRateLimit 5000
176+
spark.cdm.perfops.writeRateLimit 5000
177177
#spark.cdm.perfops.consistency.read LOCAL_QUORUM
178178
#spark.cdm.perfops.consistency.write LOCAL_QUORUM
179179
#spark.cdm.perfops.printStatsAfter 100000

PERF/perf-iot.yaml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ params:
1212

1313
bindings:
1414
device_id: Mod(<<sources:10000>>); ToHashedUUID() -> java.util.UUID
15-
reading_date: Uniform(<<minDate:20220101>>,<<maxDate:20221231>>);LongToLocalDateDays()
15+
reading_date: StartingEpochMillis('2022-01-01 00:00:00'); AddHashRange(0L,31536000000L); EpochMillisToCqlLocalDate();
1616
reading_time: Uniform(0,86400) -> int
1717
settings: MapSized(3, Combinations('A-Z;0-9'), ToString(), ToString())
1818
alerts: ListSizedHashed(HashRange(1,5),ToString()));
@@ -66,6 +66,4 @@ blocks:
6666
insert-load: |
6767
insert into devices.sensor_data
6868
(device_id, reading_date, reading_time, settings, device_location, alerts, temperature, humidity, pressure, wind_speed)
69-
values ({device_id}, {reading_date}, {reading_time}, {settings}, {latitude:{latitude}, longitude:{longitude}}, {alerts}, {temperature}, {humidity}, {pressure}, {wind_speed})
70-
71-
69+
values ({device_id}, {reading_date}, {reading_time}, {settings}, {latitude:{latitude}, longitude:{longitude}}, {alerts}, {temperature}, {humidity}, {pressure}, {wind_speed})

PERF/testing.txt

Lines changed: 64 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11

2-
1. Created AWS t2.2xlarge instance running AWS Linux
2+
1. Created AWS t2.2xlarge instance running AWS Linux (8 vCPU, 16 GB memory)
33
2. scp -i ~/.ssh/aws-oregon-phil-miesle.pem ./perf-iot.yaml [email protected]:perf-iot.yaml
4+
scp -i ~/.ssh/aws-oregon-phil-miesle.pem ../target/cassandra-data-migrator-4.0.0-SNAPSHOT.jar [email protected]:cassandra-data-migrator-4.0.0-SNAPSHOT.jar
45
scp -i ~/.ssh/aws-oregon-phil-miesle.pem ./cdm-v3.properties [email protected]:cdm-v3.properties
56
scp -i ~/.ssh/aws-oregon-phil-miesle.pem ./cdm-v4.properties [email protected]:cdm-v4.properties
67
3. ssh -i ~/.ssh/aws-oregon-phil-miesle.pem [email protected]
@@ -19,55 +20,88 @@
1920
ln -s squashfs-root/AppRun ./nb5
2021
./nb5 --version
2122
f. ./nb5 perf-iot --progress console:5s load-cycles=1000000 host=localhost port=9042 localdc=datacenter1
22-
./nb5 perf-iot default.schema host=localhost port=9043 localdc=datacenter1
2323

2424
cqlsh> select count(*) from devices.sensor_data;
2525

2626
count
2727
--------
28-
999417
28+
999996
2929

3030
(1 rows)
3131

32+
./nb5 perf-iot default.schema host=localhost port=9043 localdc=datacenter1
33+
3234
g. docker run --name cdm --network cassandra --ip 172.16.242.4 -d datastax/cassandra-data-migrator:3.4.4
33-
docker cp cdm-v3.properties cdm:cdm-v3.properties
3435

35-
h. docker exec -it cdm bash
36+
4. ssh -i ~/.ssh/aws-oregon-phil-miesle.pem [email protected]
37+
a. docker cp cdm-v3.properties cdm:cdm-v3.properties
38+
b. docker exec -it cdm bash
3639
cqlsh cass-target -e 'TRUNCATE TABLE devices.sensor_data'
3740

3841
time spark-submit --properties-file cdm-v3.properties \
39-
--conf spark.executor.cores=4 \
42+
--conf spark.executor.cores=2 \
4043
--conf spark.origin.keyspaceTable="devices.sensor_data" \
4144
--master "local[*]" --class datastax.astra.migrate.Migrate \
4245
/assets/cassandra-data-migrator-3.4.4.jar &> cdm-v3-Migrate_$(date +%Y%m%d_%H_%M).txt
4346

44-
real 1m2.661s
45-
user 1m50.624s
46-
sys 0m15.862s
47-
48-
################################################################################################
49-
ThreadID: 1 Final Read Record Count: 999417
50-
ThreadID: 1 Final Skipped Record Count: 0
51-
ThreadID: 1 Final Write Record Count: 999417
52-
ThreadID: 1 Final Error Record Count: 0
53-
################################################################################################
47+
23/05/26 13:05:15 INFO DAGScheduler: Job 1 finished: foreach at Migrate.scala:26, took 201.858469 s
48+
23/05/26 13:05:15 INFO CopyJobSession: ################################################################################################
49+
23/05/26 13:05:15 INFO CopyJobSession: ThreadID: 1 Final Read Record Count: 999996
50+
23/05/26 13:05:15 INFO CopyJobSession: ThreadID: 1 Final Skipped Record Count: 0
51+
23/05/26 13:05:15 INFO CopyJobSession: ThreadID: 1 Final Write Record Count: 999996
52+
23/05/26 13:05:15 INFO CopyJobSession: ThreadID: 1 Final Error Record Count: 0
53+
23/05/26 13:05:15 INFO CopyJobSession: ################################################################################################
5454

5555
time spark-submit --properties-file cdm-v3.properties \
56-
--conf spark.executor.cores=4 \
56+
--conf spark.executor.cores=2 \
5757
--conf spark.origin.keyspaceTable="devices.sensor_data" \
5858
--master "local[*]" --class datastax.astra.migrate.DiffData \
5959
/assets/cassandra-data-migrator-3.4.4.jar &> cdm-v3-DiffData_$(date +%Y%m%d_%H_%M).txt
6060

61-
real 1m3.673s
62-
user 2m14.517s
63-
sys 0m36.715s
64-
65-
################################################################################################
66-
ThreadID: 1 Final Read Record Count: 999417
67-
ThreadID: 1 Final Mismatch Record Count: 0
68-
ThreadID: 1 Final Corrected Mismatch Record Count: 0
69-
ThreadID: 1 Final Missing Record Count: 0
70-
ThreadID: 1 Final Corrected Missing Record Count: 0
71-
ThreadID: 1 Final Valid Record Count: 977053
72-
ThreadID: 1 Final Skipped Record Count: 0
73-
################################################################################################
61+
23/05/26 13:11:51 INFO DAGScheduler: Job 1 finished: foreach at DiffData.scala:24, took 201.887353 s
62+
23/05/26 13:11:51 INFO DiffJobSession: ################################################################################################
63+
23/05/26 13:11:51 INFO DiffJobSession: ThreadID: 1 Final Read Record Count: 999996
64+
23/05/26 13:11:51 INFO DiffJobSession: ThreadID: 1 Final Mismatch Record Count: 0
65+
23/05/26 13:11:51 INFO DiffJobSession: ThreadID: 1 Final Corrected Mismatch Record Count: 0
66+
23/05/26 13:11:51 INFO DiffJobSession: ThreadID: 1 Final Missing Record Count: 0
67+
23/05/26 13:11:51 INFO DiffJobSession: ThreadID: 1 Final Corrected Missing Record Count: 0
68+
23/05/26 13:11:51 INFO DiffJobSession: ThreadID: 1 Final Valid Record Count: 999996
69+
23/05/26 13:11:51 INFO DiffJobSession: ThreadID: 1 Final Skipped Record Count: 0
70+
23/05/26 13:11:51 INFO DiffJobSession: ################################################################################################
71+
72+
73+
c. docker cp cdm-v4.properties cdm:cdm-v4.properties
74+
docker cp cassandra-data-migrator-4.0.0-SNAPSHOT.jar cdm:cassandra-data-migrator-4.0.0-SNAPSHOT.jar
75+
76+
d. docker exec -it cdm bash
77+
cqlsh cass-target -e 'TRUNCATE TABLE devices.sensor_data'
78+
79+
time spark-submit --properties-file cdm-v4.properties \
80+
--conf spark.executor.cores=2 \
81+
--master "local[*]" --class com.datastax.cdm.job.Migrate \
82+
/cassandra-data-migrator-4.0.0-SNAPSHOT.jar &> cdm-v4-Migrate_$(date +%Y%m%d_%H_%M).txt
83+
84+
23/05/26 16:46:52 INFO DAGScheduler: Job 1 finished: foreach at Migrate.scala:9, took 200.206043 s
85+
23/05/26 16:46:52 INFO CopyJobSession: ################################################################################################
86+
23/05/26 16:46:52 INFO CopyJobSession: ThreadID: 1 Final Read Record Count: 999996
87+
23/05/26 16:46:52 INFO CopyJobSession: ThreadID: 1 Final Skipped Record Count: 0
88+
23/05/26 16:46:52 INFO CopyJobSession: ThreadID: 1 Final Write Record Count: 999996
89+
23/05/26 16:46:52 INFO CopyJobSession: ThreadID: 1 Final Error Record Count: 0
90+
23/05/26 16:46:52 INFO CopyJobSession: ################################################################################################
91+
92+
93+
time spark-submit --properties-file cdm-v4.properties \
94+
--conf spark.executor.cores=2 \
95+
--master "local[*]" --class com.datastax.cdm.job.DiffData \
96+
/cassandra-data-migrator-4.0.0-SNAPSHOT.jar &> cdm-v4-DiffData_$(date +%Y%m%d_%H_%M).txt
97+
98+
23/05/26 16:51:33 INFO DAGScheduler: Job 1 finished: foreach at DiffData.scala:9, took 200.214341 s
99+
23/05/26 16:51:33 INFO DiffJobSession: ################################################################################################
100+
23/05/26 16:51:33 INFO DiffJobSession: ThreadID: 1 Final Read Record Count: 999996
101+
23/05/26 16:51:33 INFO DiffJobSession: ThreadID: 1 Final Mismatch Record Count: 0
102+
23/05/26 16:51:33 INFO DiffJobSession: ThreadID: 1 Final Corrected Mismatch Record Count: 0
103+
23/05/26 16:51:33 INFO DiffJobSession: ThreadID: 1 Final Missing Record Count: 0
104+
23/05/26 16:51:33 INFO DiffJobSession: ThreadID: 1 Final Corrected Missing Record Count: 0
105+
23/05/26 16:51:33 INFO DiffJobSession: ThreadID: 1 Final Valid Record Count: 999992
106+
23/05/26 16:51:33 INFO DiffJobSession: ThreadID: 1 Final Skipped Record Count: 0
107+
23/05/26 16:51:33 INFO DiffJobSession: ################################################################################################

src/main/java/com/datastax/cdm/job/CopyJobSession.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ public class CopyJobSession extends AbstractJobSession<SplitPartitions.Partition
3434
private Integer batchSize;
3535
private final Integer fetchSize;
3636

37-
private BatchStatement batch;
38-
3937
protected CopyJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
4038
super(originSession, targetSession, sc);
4139

@@ -44,8 +42,6 @@ protected CopyJobSession(CqlSession originSession, CqlSession targetSession, Spa
4442
fetchSize = this.originSession.getCqlTable().getFetchSizeInRows();
4543
batchSize = this.originSession.getCqlTable().getBatchSize();
4644

47-
batch = BatchStatement.newInstance(BatchType.UNLOGGED);
48-
4945
logger.info("CQL -- origin select: {}",this.originSession.getOriginSelectByPartitionRangeStatement().getCQL());
5046
logger.info("CQL -- target select: {}",this.targetSession.getTargetSelectByPKStatement().getCQL());
5147
logger.info("CQL -- target upsert: {}",this.targetSession.getTargetUpsertStatement().getCQL());
@@ -59,6 +55,7 @@ public void processSlice(SplitPartitions.Partition slice) {
5955
public void getDataAndInsert(BigInteger min, BigInteger max) {
6056
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min,max));
6157
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
58+
BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED);
6259
boolean done = false;
6360
int maxAttempts = maxRetries + 1;
6461
String guardrailCheck;
@@ -97,27 +94,26 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
9794
continue;
9895
}
9996
}
100-
101-
writeLimiter.acquire(1);
102-
97+
10398
BoundStatement boundUpsert = bind(r);
10499
if (null == boundUpsert) {
105100
skipCnt++; // TODO: this previously skipped, why not errCnt?
106101
continue;
107102
}
108103

109-
writeAsync(writeResults, boundUpsert);
104+
writeLimiter.acquire(1);
105+
batch = writeAsync(batch, writeResults, boundUpsert);
110106
unflushedWrites++;
111107

112108
if (unflushedWrites > fetchSize) {
113-
flushAndClearWrites(writeResults);
109+
flushAndClearWrites(batch, writeResults);
114110
flushedWriteCnt += unflushedWrites;
115111
unflushedWrites = 0;
116112
}
117113
}
118114
}
119115

120-
flushAndClearWrites(writeResults);
116+
flushAndClearWrites(batch, writeResults);
121117
flushedWriteCnt += unflushedWrites;
122118

123119
readCounter.addAndGet(readCnt);
@@ -156,7 +152,7 @@ public synchronized void printCounts(boolean isFinal) {
156152
}
157153
}
158154

159-
private void flushAndClearWrites(Collection<CompletionStage<AsyncResultSet>> writeResults) throws Exception {
155+
private void flushAndClearWrites(BatchStatement batch, Collection<CompletionStage<AsyncResultSet>> writeResults) throws Exception {
160156
if (batch.size() > 0) {
161157
writeResults.add(targetUpsertStatement.executeAsync(batch));
162158
}
@@ -177,16 +173,18 @@ private BoundStatement bind(Record r) {
177173
return targetUpsertStatement.bindRecord(r);
178174
}
179175

180-
private void writeAsync(Collection<CompletionStage<AsyncResultSet>> writeResults, BoundStatement boundUpsert) {
176+
private BatchStatement writeAsync(BatchStatement batch, Collection<CompletionStage<AsyncResultSet>> writeResults, BoundStatement boundUpsert) {
181177
if (batchSize > 1) {
182178
batch = batch.add(boundUpsert);
183179
if (batch.size() >= batchSize) {
184180
writeResults.add(targetUpsertStatement.executeAsync(batch));
185-
batch = BatchStatement.newInstance(BatchType.UNLOGGED);
181+
return BatchStatement.newInstance(BatchType.UNLOGGED);
186182
}
183+
return batch;
187184
}
188185
else {
189186
writeResults.add(targetUpsertStatement.executeAsync(boundUpsert));
187+
return batch;
190188
}
191189
}
192190

0 commit comments

Comments
 (0)