@@ -39,7 +39,7 @@ public static CopyPKJobSession getInstance(CqlSession sourceSession, CqlSession
39
39
40
40
public void getRowAndInsert (List <SplitPartitions .PKRows > rowsList ) {
41
41
for (SplitPartitions .PKRows rows : rowsList ) {
42
- for ( String row : rows .pkRows ) {
42
+ rows .pkRows . parallelStream (). forEach ( row -> {
43
43
readCounter .incrementAndGet ();
44
44
String [] pkFields = row .split (" %% " );
45
45
int idx = 0 ;
@@ -52,20 +52,30 @@ public void getRowAndInsert(List<SplitPartitions.PKRows> rowsList) {
52
52
if (null == pkRow ) {
53
53
missingCounter .incrementAndGet ();
54
54
logger .error ("Could not find row with primary-key: " + row );
55
- continue ;
55
+ return ;
56
56
}
57
57
ResultSet astraWriteResultSet = astraSession
58
58
.execute (bindInsert (astraInsertStatement , pkRow , null ));
59
59
writeCounter .incrementAndGet ();
60
-
61
- }
60
+ if (readCounter .get () % printStatsAfter == 0 ) {
61
+ printCounts (false );
62
+ }
63
+ });
62
64
}
63
65
64
- logger .info ("################################################################################################" );
66
+ printCounts (true );
67
+ }
68
+
69
+ public void printCounts (boolean isFinal ) {
70
+ if (isFinal ) {
71
+ logger .info ("################################################################################################" );
72
+ }
65
73
logger .info ("TreadID: " + Thread .currentThread ().getId () + " Read Record Count: " + readCounter .get ());
66
74
logger .info ("TreadID: " + Thread .currentThread ().getId () + " Read Missing Count: " + missingCounter .get ());
67
75
logger .info ("TreadID: " + Thread .currentThread ().getId () + " Inserted Record Count: " + writeCounter .get ());
68
- logger .info ("################################################################################################" );
76
+ if (isFinal ) {
77
+ logger .info ("################################################################################################" );
78
+ }
69
79
}
70
80
71
81
private Object convert (Class <?> targetType , String text ) {
0 commit comments