Skip to content

Commit 7e8f61b

Browse files
authored
Merge pull request #27 from datastax/feature/refactor-count-logging
Refactor counts logging
2 parents e876ec9 + 224e8ac commit 7e8f61b

File tree

11 files changed

+86
-73
lines changed

11 files changed

+86
-73
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>2.5</version>
6+
<version>2.6</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,14 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
7676
customWritetime = Long.parseLong(customWriteTimeStr);
7777
}
7878

79-
logger.info("PARAM -- Write Batch Size: " + batchSize);
80-
logger.info("PARAM -- Source Keyspace Table: " + sourceKeyspaceTable);
81-
logger.info("PARAM -- Destination Keyspace Table: " + astraKeyspaceTable);
82-
logger.info("PARAM -- ReadRateLimit: " + readLimiter.getRate());
83-
logger.info("PARAM -- WriteRateLimit: " + writeLimiter.getRate());
84-
logger.info("PARAM -- TTLCols: " + ttlCols);
85-
logger.info("PARAM -- WriteTimestampFilterCols: " + writeTimeStampCols);
86-
logger.info("PARAM -- WriteTimestampFilter: " + writeTimeStampFilter);
79+
logger.info("PARAM -- Write Batch Size: {}", batchSize);
80+
logger.info("PARAM -- Source Keyspace Table: {}", sourceKeyspaceTable);
81+
logger.info("PARAM -- Destination Keyspace Table: {}", astraKeyspaceTable);
82+
logger.info("PARAM -- ReadRateLimit: {}", readLimiter.getRate());
83+
logger.info("PARAM -- WriteRateLimit: {}", writeLimiter.getRate());
84+
logger.info("PARAM -- TTLCols: {}" + ttlCols);
85+
logger.info("PARAM -- WriteTimestampFilterCols: {}", writeTimeStampCols);
86+
logger.info("PARAM -- WriteTimestampFilter: {}", writeTimeStampFilter);
8787

8888
String selectCols = Util.getSparkProp(sc, "spark.query.origin");
8989
String partionKey = Util.getSparkProp(sc, "spark.query.origin.partitionKey");
@@ -122,7 +122,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
122122
fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable + " where " + insertBinds;
123123
}
124124
sourceSelectStatement = sourceSession.prepare(fullSelectQuery);
125-
logger.info("PARAM -- Query used: " + fullSelectQuery);
125+
logger.info("PARAM -- Query used: {}", fullSelectQuery);
126126

127127
astraSelectStatement = astraSession.prepare(
128128
"select " + insertCols + " from " + astraKeyspaceTable

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class CopyJobSession extends AbstractJobSession {
1717
private static CopyJobSession copyJobSession;
1818
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
1919
protected AtomicLong readCounter = new AtomicLong(0);
20+
protected AtomicLong skippedCounter = new AtomicLong(0);
2021
protected AtomicLong writeCounter = new AtomicLong(0);
2122

2223
protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
@@ -36,7 +37,7 @@ public static CopyJobSession getInstance(CqlSession sourceSession, CqlSession as
3637
}
3738

3839
public void getDataAndInsert(BigInteger min, BigInteger max) {
39-
logger.info("TreadID: " + Thread.currentThread().getId() + " Processing min: " + min + " max:" + max);
40+
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
4041
int maxAttempts = maxRetries;
4142
for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
4243

@@ -56,14 +57,15 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
5657
Long sourceWriteTimeStamp = getLargestWriteTimeStamp(sourceRow);
5758
if (sourceWriteTimeStamp < minWriteTimeStampFilter
5859
|| sourceWriteTimeStamp > maxWriteTimeStampFilter) {
60+
readCounter.incrementAndGet();
61+
skippedCounter.incrementAndGet();
5962
continue;
6063
}
6164
}
6265

6366
writeLimiter.acquire(1);
6467
if (readCounter.incrementAndGet() % printStatsAfter == 0) {
65-
logger.info("TreadID: " + Thread.currentThread().getId() + " Read Record Count: "
66-
+ readCounter.get());
68+
printCounts(false);
6769
}
6870
Row astraRow = null;
6971
if (isCounterTable) {
@@ -88,7 +90,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
8890
readLimiter.acquire(1);
8991
writeLimiter.acquire(1);
9092
if (readCounter.incrementAndGet() % printStatsAfter == 0) {
91-
logger.info("TreadID: " + Thread.currentThread().getId() + " Read Record Count: " + readCounter.get());
93+
printCounts(false);
9294
}
9395
batchStatement = batchStatement.add(bindInsert(astraInsertStatement, sourceRow, null));
9496

@@ -116,23 +118,34 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
116118
}
117119
}
118120

119-
logger.info("TreadID: " + Thread.currentThread().getId() + " Final Read Record Count: " + readCounter.get());
120-
logger.info("TreadID: " + Thread.currentThread().getId() + " Final Write Record Count: " + writeCounter.get());
121121
retryCount = maxAttempts;
122122
} catch (Exception e) {
123-
logger.error("Error occurred retry#: " + retryCount, e);
124-
logger.error("Error with PartitionRange -- TreadID: " + Thread.currentThread().getId() + " Processing min: " + min + " max:" + max + " -- Retry# " + retryCount);
123+
logger.error("Error occurred retry#: {}", retryCount, e);
124+
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Retry# {}",
125+
Thread.currentThread().getId(), min, max, retryCount);
125126
}
126127
}
127128
}
128129

130+
public synchronized void printCounts(boolean isFinal) {
131+
String msg = "ThreadID: " + Thread.currentThread().getId();
132+
if (isFinal) {
133+
msg += " Final";
134+
logger.info("################################################################################################");
135+
}
136+
logger.info("{} Read Record Count: {}", msg, readCounter.get());
137+
logger.info("{} Skipped Record Count: {}", msg, skippedCounter.get());
138+
logger.info("{} Write Record Count: {}", msg, writeCounter.get());
139+
if (isFinal) {
140+
logger.info("################################################################################################");
141+
}
142+
}
143+
129144
private void iterateAndClearWriteResults(Collection<CompletionStage<AsyncResultSet>> writeResults, int incrementBy) throws Exception {
130145
for (CompletionStage<AsyncResultSet> writeResult : writeResults) {
131146
//wait for the writes to complete for the batch. The Retry policy, if defined, should retry the write on timeouts.
132147
writeResult.toCompletableFuture().get().one();
133-
if (writeCounter.addAndGet(incrementBy) % printStatsAfter == 0) {
134-
logger.info("TreadID: " + Thread.currentThread().getId() + " Write Record Count: " + writeCounter.get());
135-
}
148+
writeCounter.addAndGet(incrementBy);
136149
}
137150
writeResults.clear();
138151
}

src/main/java/datastax/astra/migrate/CopyPKJobSession.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void getRowAndInsert(List<SplitPartitions.PKRows> rowsList) {
5151
Row pkRow = sourceSession.execute(bspk).one();
5252
if (null == pkRow) {
5353
missingCounter.incrementAndGet();
54-
logger.error("Could not find row with primary-key: " + row);
54+
logger.error("Could not find row with primary-key: {}", row);
5555
return;
5656
}
5757
ResultSet astraWriteResultSet = astraSession
@@ -70,9 +70,9 @@ public void printCounts(boolean isFinal) {
7070
if (isFinal) {
7171
logger.info("################################################################################################");
7272
}
73-
logger.info("TreadID: " + Thread.currentThread().getId() + " Read Record Count: " + readCounter.get());
74-
logger.info("TreadID: " + Thread.currentThread().getId() + " Read Missing Count: " + missingCounter.get());
75-
logger.info("TreadID: " + Thread.currentThread().getId() + " Inserted Record Count: " + writeCounter.get());
73+
logger.info("ThreadID: {} Read Record Count: {}", Thread.currentThread().getId(), readCounter.get());
74+
logger.info("ThreadID: {} Missing Record Count: {}", Thread.currentThread().getId(), missingCounter.get());
75+
logger.info("ThreadID: {} Inserted Record Count: {}", Thread.currentThread().getId(), writeCounter.get());
7676
if (isFinal) {
7777
logger.info("################################################################################################");
7878
}

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ private DiffJobSession(CqlSession sourceSession, CqlSession astraSession, SparkC
3636
super(sourceSession, astraSession, sc);
3737

3838
autoCorrectMissing = Boolean.parseBoolean(Util.getSparkPropOr(sc, "spark.target.autocorrect.missing", "false"));
39-
logger.info("PARAM -- Autocorrect Missing: " + autoCorrectMissing);
39+
logger.info("PARAM -- Autocorrect Missing: {}", autoCorrectMissing);
4040

4141
autoCorrectMismatch = Boolean.parseBoolean(Util.getSparkPropOr(sc, "spark.target.autocorrect.mismatch", "false"));
42-
logger.info("PARAM -- Autocorrect Mismatch: " + autoCorrectMismatch);
42+
logger.info("PARAM -- Autocorrect Mismatch: {}", autoCorrectMismatch);
4343
}
4444

4545
public static DiffJobSession getInstance(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
@@ -55,7 +55,7 @@ public static DiffJobSession getInstance(CqlSession sourceSession, CqlSession as
5555
}
5656

5757
public void getDataAndDiff(BigInteger min, BigInteger max) {
58-
logger.info("TreadID: " + Thread.currentThread().getId() + " Processing min: " + min + " max:" + max);
58+
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
5959
int maxAttempts = maxRetries;
6060
for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
6161

@@ -71,7 +71,7 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
7171
if (!(writeTimeStampFilter && (getLargestWriteTimeStamp(srcRow) < minWriteTimeStampFilter
7272
|| getLargestWriteTimeStamp(srcRow) > maxWriteTimeStampFilter))) {
7373
if (readCounter.incrementAndGet() % printStatsAfter == 0) {
74-
printCounts("Current");
74+
printCounts(false);
7575
}
7676

7777
CompletionStage<AsyncResultSet> targetRowFuture = astraSession
@@ -86,14 +86,11 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
8686
}
8787
});
8888
diffAndClear(srcToTargetRowMap);
89-
90-
printCounts("Final");
91-
9289
retryCount = maxAttempts;
9390
} catch (Exception e) {
94-
logger.error("Error occurred retry#: " + retryCount, e);
95-
logger.error("Error with PartitionRange -- TreadID: " + Thread.currentThread().getId()
96-
+ " Processing min: " + min + " max:" + max + " -- Retry# " + retryCount);
91+
logger.error("Error occurred retry#: {}", retryCount, e);
92+
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Retry# {}",
93+
Thread.currentThread().getId(), min, max, retryCount);
9794
}
9895
}
9996

@@ -105,39 +102,40 @@ private void diffAndClear(Map<Row, CompletionStage<AsyncResultSet>> srcToTargetR
105102
Row targetRow = srcToTargetRowMap.get(srcRow).toCompletableFuture().get().one();
106103
diff(srcRow, targetRow);
107104
} catch (Exception e) {
108-
logger.error("Could not perform diff for Key: " + getKey(srcRow), e);
105+
logger.error("Could not perform diff for Key: {}", getKey(srcRow), e);
109106
}
110107
}
111108
srcToTargetRowMap.clear();
112109
}
113110

114-
public void printCounts(String finalStr) {
115-
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Record Count: "
116-
+ readCounter.get());
117-
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Mismatch Count: "
118-
+ mismatchCounter.get());
119-
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Corrected Mismatch Count: "
120-
+ correctedMismatchCounter.get());
121-
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Missing Count: "
122-
+ missingCounter.get());
123-
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Corrected Missing Count: "
124-
+ correctedMissingCounter.get());
125-
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Valid Count: "
126-
+ validCounter.get());
127-
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Skipped Count: "
128-
+ skippedCounter.get());
111+
public synchronized void printCounts(boolean isFinal) {
112+
String msg = "ThreadID: " + Thread.currentThread().getId();
113+
if (isFinal) {
114+
msg += " Final";
115+
logger.info("################################################################################################");
116+
}
117+
logger.info("{} Read Record Count: {}", msg, readCounter.get());
118+
logger.info("{} Mismatch Record Count: {}", msg, mismatchCounter.get());
119+
logger.info("{} Corrected Mismatch Record Count: {}", msg, correctedMismatchCounter.get());
120+
logger.info("{} Missing Record Count: {}", msg, missingCounter.get());
121+
logger.info("{} Corrected Missing Record Count: {}", msg, correctedMissingCounter.get());
122+
logger.info("{} Valid Record Count: {}", msg, validCounter.get());
123+
logger.info("{} Skipped Record Count: {}", msg, skippedCounter.get());
124+
if (isFinal) {
125+
logger.info("################################################################################################");
126+
}
129127
}
130128

131129
private void diff(Row sourceRow, Row astraRow) {
132130
if (astraRow == null) {
133131
missingCounter.incrementAndGet();
134-
logger.error("Missing target row found for key: " + getKey(sourceRow));
132+
logger.error("Missing target row found for key: {}", getKey(sourceRow));
135133
//correct data
136134

137135
if (autoCorrectMissing) {
138136
astraSession.execute(bindInsert(astraInsertStatement, sourceRow, null));
139137
correctedMissingCounter.incrementAndGet();
140-
logger.error("Inserted missing row in target: " + getKey(sourceRow));
138+
logger.error("Inserted missing row in target: {}", getKey(sourceRow));
141139
}
142140

143141
return;
@@ -146,7 +144,7 @@ private void diff(Row sourceRow, Row astraRow) {
146144
String diffData = isDifferent(sourceRow, astraRow);
147145
if (!diffData.isEmpty()) {
148146
mismatchCounter.incrementAndGet();
149-
logger.error("Mismatch row found for key: " + getKey(sourceRow) + " Mismatch: " + diffData);
147+
logger.error("Mismatch row found for key: {} Mismatch: {}", getKey(sourceRow), diffData);
150148

151149
if (autoCorrectMismatch) {
152150
if (isCounterTable) {
@@ -155,7 +153,7 @@ private void diff(Row sourceRow, Row astraRow) {
155153
astraSession.execute(bindInsert(astraInsertStatement, sourceRow, null));
156154
}
157155
correctedMismatchCounter.incrementAndGet();
158-
logger.error("Updated mismatch row in target: " + getKey(sourceRow));
156+
logger.error("Updated mismatch row in target: {}", getKey(sourceRow));
159157
}
160158

161159
return;

src/main/java/datastax/astra/migrate/OriginCountJobSession.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public static OriginCountJobSession getInstance(CqlSession sourceSession, SparkC
7979
}
8080

8181
public void getData(BigInteger min, BigInteger max) {
82-
logger.info("TreadID: " + Thread.currentThread().getId() + " Processing min: " + min + " max:" + max);
82+
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
8383
int maxAttempts = maxRetries;
8484
for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
8585

@@ -104,7 +104,7 @@ public void getData(BigInteger min, BigInteger max) {
104104
String[] colName = checkTableforselectCols.split(",");
105105
result = result + " - " + colName[index] + " : " + colData;
106106
}
107-
logger.error("ThreadID: " + Thread.currentThread().getId() + result + " - " + filterColName + " length: " + rowColcnt);
107+
logger.error("ThreadID: {}{} - {} length: {}", Thread.currentThread().getId(), result, filterColName, rowColcnt);
108108
continue;
109109
}
110110
}
@@ -126,24 +126,24 @@ public void getData(BigInteger min, BigInteger max) {
126126
String[] colName = checkTableforselectCols.split(",");
127127
result = result + " - " + colName[index] + " : " + colData;
128128
}
129-
logger.error("ThreadID: " + Thread.currentThread().getId() + result + " - " + filterColName + " length: " + rowColcnt);
129+
logger.error("ThreadID: {}{} - {} length: {}", Thread.currentThread().getId(), result, filterColName, rowColcnt);
130130
continue;
131131
}
132132
}
133133

134134
if (readCounter.incrementAndGet() % 1000 == 0) {
135-
logger.info("TreadID: " + Thread.currentThread().getId() + " Read Record Count: " + readCounter.get());
135+
logger.info("ThreadID: {} Read Record Count: {}", Thread.currentThread().getId(), readCounter.get());
136136
}
137137

138138
}
139139
}
140140

141-
142-
logger.info("TreadID: " + Thread.currentThread().getId() + " Final Read Record Count: " + readCounter.get());
141+
logger.info("ThreadID: {} Final Read Record Count: {}", Thread.currentThread().getId(), readCounter.get());
143142
retryCount = maxAttempts;
144143
} catch (Exception e) {
145-
logger.error("Error occurred retry#: " + retryCount, e);
146-
logger.error("Error with PartitionRange -- TreadID: " + Thread.currentThread().getId() + " Processing min: " + min + " max:" + max + " -- Retry# " + retryCount);
144+
logger.error("Error occurred retry#: {}", retryCount, e);
145+
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Retry# {}",
146+
Thread.currentThread().getId(), min, max, retryCount);
147147
}
148148
}
149149

0 commit comments

Comments
 (0)