2
2
3
3
import com .datastax .oss .driver .api .core .ConsistencyLevel ;
4
4
import com .datastax .oss .driver .api .core .CqlSession ;
5
+ import com .datastax .oss .driver .api .core .cql .AsyncResultSet ;
5
6
import com .datastax .oss .driver .api .core .cql .ResultSet ;
6
7
import com .datastax .oss .driver .api .core .cql .Row ;
7
8
import org .apache .spark .SparkConf ;
8
9
import org .slf4j .Logger ;
9
10
import org .slf4j .LoggerFactory ;
10
11
11
12
import java .math .BigInteger ;
12
- import java .util .concurrent .ForkJoinPool ;
13
+ import java .util .HashMap ;
14
+ import java .util .Map ;
15
+ import java .util .concurrent .CompletionStage ;
13
16
import java .util .concurrent .atomic .AtomicLong ;
14
17
import java .util .stream .IntStream ;
15
18
import java .util .stream .StreamSupport ;
@@ -51,7 +54,6 @@ public static DiffJobSession getInstance(CqlSession sourceSession, CqlSession as
51
54
}
52
55
53
56
public void getDataAndDiff (BigInteger min , BigInteger max ) {
54
- ForkJoinPool customThreadPool = new ForkJoinPool ();
55
57
logger .info ("TreadID: " + Thread .currentThread ().getId () + " Processing min: " + min + " max:" + max );
56
58
int maxAttempts = maxRetries ;
57
59
for (int retryCount = 1 ; retryCount <= maxAttempts ; retryCount ++) {
@@ -61,27 +63,30 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
61
63
ResultSet resultSet = sourceSession .execute (
62
64
sourceSelectStatement .bind (hasRandomPartitioner ? min : min .longValueExact (), hasRandomPartitioner ? max : max .longValueExact ()).setConsistencyLevel (ConsistencyLevel .LOCAL_QUORUM ));
63
65
64
- customThreadPool .submit (() -> {
65
- StreamSupport .stream (resultSet .spliterator (), true ).forEach (sRow -> {
66
- readLimiter .acquire (1 );
67
- // do not process rows less than writeTimeStampFilter
68
- if (!(writeTimeStampFilter && (getLargestWriteTimeStamp (sRow ) < minWriteTimeStampFilter
69
- || getLargestWriteTimeStamp (sRow ) > maxWriteTimeStampFilter ))) {
70
- if (readCounter .incrementAndGet () % printStatsAfter == 0 ) {
71
- printCounts ("Current" );
72
- }
73
-
74
- Row astraRow = astraSession
75
- .execute (selectFromAstra (astraSelectStatement , sRow )).one ();
76
- diff (sRow , astraRow );
77
- } else {
78
- readCounter .incrementAndGet ();
79
- skippedCounter .incrementAndGet ();
66
+ Map <Row , CompletionStage <AsyncResultSet >> srcToTargetRowMap = new HashMap <Row , CompletionStage <AsyncResultSet >>();
67
+ StreamSupport .stream (resultSet .spliterator (), false ).forEach (srcRow -> {
68
+ readLimiter .acquire (1 );
69
+ // do not process rows less than writeTimeStampFilter
70
+ if (!(writeTimeStampFilter && (getLargestWriteTimeStamp (srcRow ) < minWriteTimeStampFilter
71
+ || getLargestWriteTimeStamp (srcRow ) > maxWriteTimeStampFilter ))) {
72
+ if (readCounter .incrementAndGet () % printStatsAfter == 0 ) {
73
+ printCounts ("Current" );
80
74
}
81
- });
82
75
83
- printCounts ("Final" );
84
- }).get ();
76
+ CompletionStage <AsyncResultSet > targetRowFuture = astraSession
77
+ .executeAsync (selectFromAstra (astraSelectStatement , srcRow ));
78
+ srcToTargetRowMap .put (srcRow , targetRowFuture );
79
+ if (srcToTargetRowMap .size () > 1000 ) {
80
+ diffAndClear (srcToTargetRowMap );
81
+ }
82
+ } else {
83
+ readCounter .incrementAndGet ();
84
+ skippedCounter .incrementAndGet ();
85
+ }
86
+ });
87
+ diffAndClear (srcToTargetRowMap );
88
+
89
+ printCounts ("Final" );
85
90
86
91
retryCount = maxAttempts ;
87
92
} catch (Exception e ) {
@@ -91,7 +96,18 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
91
96
}
92
97
}
93
98
94
- customThreadPool .shutdownNow ();
99
+ }
100
+
101
+ private void diffAndClear (Map <Row , CompletionStage <AsyncResultSet >> srcToTargetRowMap ) {
102
+ for (Row srcRow : srcToTargetRowMap .keySet ()) {
103
+ try {
104
+ Row targetRow = srcToTargetRowMap .get (srcRow ).toCompletableFuture ().get ().one ();
105
+ diff (srcRow , targetRow );
106
+ } catch (Exception e ) {
107
+ logger .error ("Could not perform diff for Key: " + getKey (srcRow ), e );
108
+ }
109
+ }
110
+ srcToTargetRowMap .clear ();
95
111
}
96
112
97
113
public void printCounts (String finalStr ) {
@@ -150,15 +166,13 @@ private void diff(Row sourceRow, Row astraRow) {
150
166
private String isDifferent (Row sourceRow , Row astraRow ) {
151
167
StringBuffer diffData = new StringBuffer ();
152
168
IntStream .range (0 , selectColTypes .size ()).parallel ().forEach (index -> {
153
- if (!writeTimeStampCols .contains (index )) {
154
- MigrateDataType dataType = selectColTypes .get (index );
155
- Object source = getData (dataType , index , sourceRow );
156
- Object astra = getData (dataType , index , astraRow );
157
-
158
- boolean isDiff = dataType .diff (source , astra );
159
- if (isDiff ) {
160
- diffData .append (" (Index: " + index + " Source: " + source + " Astra: " + astra + " ) " );
161
- }
169
+ MigrateDataType dataType = selectColTypes .get (index );
170
+ Object source = getData (dataType , index , sourceRow );
171
+ Object astra = getData (dataType , index , astraRow );
172
+
173
+ boolean isDiff = dataType .diff (source , astra );
174
+ if (isDiff ) {
175
+ diffData .append (" (Index: " + index + " Source: " + source + " Astra: " + astra + " ) " );
162
176
}
163
177
});
164
178
0 commit comments