Skip to content

Commit 79a459d

Browse files
committed
Refactor counts logging
1 parent e876ec9 commit 79a459d

File tree

7 files changed

+40
-32
lines changed

7 files changed

+40
-32
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>2.5</version>
6+
<version>2.6</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
6262

6363
writeLimiter.acquire(1);
6464
if (readCounter.incrementAndGet() % printStatsAfter == 0) {
65-
logger.info("TreadID: " + Thread.currentThread().getId() + " Read Record Count: "
66-
+ readCounter.get());
65+
printCounts(false);
6766
}
6867
Row astraRow = null;
6968
if (isCounterTable) {
@@ -88,7 +87,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
8887
readLimiter.acquire(1);
8988
writeLimiter.acquire(1);
9089
if (readCounter.incrementAndGet() % printStatsAfter == 0) {
91-
logger.info("TreadID: " + Thread.currentThread().getId() + " Read Record Count: " + readCounter.get());
90+
printCounts(false);
9291
}
9392
batchStatement = batchStatement.add(bindInsert(astraInsertStatement, sourceRow, null));
9493

@@ -116,8 +115,6 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
116115
}
117116
}
118117

119-
logger.info("TreadID: " + Thread.currentThread().getId() + " Final Read Record Count: " + readCounter.get());
120-
logger.info("TreadID: " + Thread.currentThread().getId() + " Final Write Record Count: " + writeCounter.get());
121118
retryCount = maxAttempts;
122119
} catch (Exception e) {
123120
logger.error("Error occurred retry#: " + retryCount, e);
@@ -126,13 +123,24 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
126123
}
127124
}
128125

126+
public synchronized void printCounts(boolean isFinal) {
127+
String msg = "TreadID: " + Thread.currentThread().getId();
128+
if (isFinal) {
129+
msg += " Final";
130+
logger.info("################################################################################################");
131+
}
132+
logger.info(msg + " Read Record Count: " + readCounter.get());
133+
logger.info(msg + " Write Record Count: " + writeCounter.get());
134+
if (isFinal) {
135+
logger.info("################################################################################################");
136+
}
137+
}
138+
129139
private void iterateAndClearWriteResults(Collection<CompletionStage<AsyncResultSet>> writeResults, int incrementBy) throws Exception {
130140
for (CompletionStage<AsyncResultSet> writeResult : writeResults) {
131141
//wait for the writes to complete for the batch. The Retry policy, if defined, should retry the write on timeouts.
132142
writeResult.toCompletableFuture().get().one();
133-
if (writeCounter.addAndGet(incrementBy) % printStatsAfter == 0) {
134-
logger.info("TreadID: " + Thread.currentThread().getId() + " Write Record Count: " + writeCounter.get());
135-
}
143+
writeCounter.addAndGet(incrementBy);
136144
}
137145
writeResults.clear();
138146
}

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
7171
if (!(writeTimeStampFilter && (getLargestWriteTimeStamp(srcRow) < minWriteTimeStampFilter
7272
|| getLargestWriteTimeStamp(srcRow) > maxWriteTimeStampFilter))) {
7373
if (readCounter.incrementAndGet() % printStatsAfter == 0) {
74-
printCounts("Current");
74+
printCounts(false);
7575
}
7676

7777
CompletionStage<AsyncResultSet> targetRowFuture = astraSession
@@ -86,9 +86,6 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
8686
}
8787
});
8888
diffAndClear(srcToTargetRowMap);
89-
90-
printCounts("Final");
91-
9289
retryCount = maxAttempts;
9390
} catch (Exception e) {
9491
logger.error("Error occurred retry#: " + retryCount, e);
@@ -111,21 +108,22 @@ private void diffAndClear(Map<Row, CompletionStage<AsyncResultSet>> srcToTargetR
111108
srcToTargetRowMap.clear();
112109
}
113110

114-
public void printCounts(String finalStr) {
115-
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Record Count: "
116-
+ readCounter.get());
117-
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Mismatch Count: "
118-
+ mismatchCounter.get());
119-
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Corrected Mismatch Count: "
120-
+ correctedMismatchCounter.get());
121-
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Missing Count: "
122-
+ missingCounter.get());
123-
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Corrected Missing Count: "
124-
+ correctedMissingCounter.get());
125-
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Valid Count: "
126-
+ validCounter.get());
127-
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Skipped Count: "
128-
+ skippedCounter.get());
111+
public synchronized void printCounts(boolean isFinal) {
112+
String msg = "TreadID: " + Thread.currentThread().getId();
113+
if (isFinal) {
114+
msg += " Final";
115+
logger.info("################################################################################################");
116+
}
117+
logger.info(msg + " Read Record Count: " + readCounter.get());
118+
logger.info(msg + " Read Mismatch Count: " + mismatchCounter.get());
119+
logger.info(msg + " Corrected Mismatch Count: " + correctedMismatchCounter.get());
120+
logger.info(msg + " Read Missing Count: " + missingCounter.get());
121+
logger.info(msg + " Corrected Missing Count: " + correctedMissingCounter.get());
122+
logger.info(msg + " Read Valid Count: " + validCounter.get());
123+
logger.info(msg + " Read Skipped Count: " + skippedCounter.get());
124+
if (isFinal) {
125+
logger.info("################################################################################################");
126+
}
129127
}
130128

131129
private void diff(Row sourceRow, Row astraRow) {

src/main/scala/datastax/astra/migrate/AbstractJob.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@ class AbstractJob extends BaseJob {
2626

2727
var config: SparkConf = sContext.getConf
2828
if ("true".equals(isAstra)) {
29-
abstractLogger.info(connType + ": Connected to Astra using SCB: " + scbPath);
29+
abstractLogger.info(connType + ": Connecting to Astra using SCB: " + scbPath);
3030

3131
return CassandraConnector(config
3232
.set("spark.cassandra.auth.username", username)
3333
.set("spark.cassandra.auth.password", password)
3434
.set("spark.cassandra.input.consistency.level", readConsistencyLevel)
3535
.set("spark.cassandra.connection.config.cloud.path", scbPath))
3636
} else if (null != trustStorePath && !trustStorePath.trim.isEmpty) {
37-
abstractLogger.info(connType + ": Connected to Cassandra (or DSE) with SSL host: " + host);
37+
abstractLogger.info(connType + ": Connecting to Cassandra (or DSE) with SSL host: " + host);
3838

3939
// Use defaults when not provided
4040
var enabledAlgorithmsVar = enabledAlgorithms
@@ -57,7 +57,7 @@ class AbstractJob extends BaseJob {
5757
.set("spark.cassandra.connection.ssl.clientAuth.enabled", "true")
5858
)
5959
} else {
60-
abstractLogger.info(connType + ": Connected to Cassandra (or DSE) host: " + host);
60+
abstractLogger.info(connType + ": Connecting to Cassandra (or DSE) host: " + host);
6161

6262
return CassandraConnector(config.set("spark.cassandra.auth.username", username)
6363
.set("spark.cassandra.auth.password", password)

src/main/scala/datastax/astra/migrate/DiffData.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ object DiffData extends AbstractJob {
2727
.getDataAndDiff(part.getMin, part.getMax)))
2828
})
2929

30-
DiffJobSession.getInstance(null, null, sc).printCounts("Job Final");
30+
DiffJobSession.getInstance(null, null, sc).printCounts(true);
3131
}
3232

3333
}

src/main/scala/datastax/astra/migrate/Migrate.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ object Migrate extends AbstractJob {
2929
.getDataAndInsert(part.getMin, part.getMax)))
3030
})
3131

32+
CopyJobSession.getInstance(null, null, sc).printCounts(true);
3233
}
3334

3435
}

src/main/scala/datastax/astra/migrate/MigratePartitionsFromFile.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ object MigratePartitionsFromFile extends AbstractJob {
2727
.getDataAndInsert(part.getMin, part.getMax)))
2828
})
2929

30+
CopyJobSession.getInstance(null, null, sc).printCounts(true);
3031
}
3132

3233
}

0 commit comments

Comments
 (0)