5
5
import com .datastax .oss .driver .api .core .cql .ResultSet ;
6
6
import com .datastax .oss .driver .api .core .cql .Row ;
7
7
import com .datastax .oss .driver .api .core .data .UdtValue ;
8
+ import datastax .astra .migrate .cql .EnhancedPK ;
8
9
import datastax .astra .migrate .cql .PKFactory ;
9
10
import datastax .astra .migrate .cql .Record ;
11
+ import datastax .astra .migrate .cql .features .ExplodeMap ;
12
+ import datastax .astra .migrate .cql .features .Feature ;
13
+ import datastax .astra .migrate .cql .features .FeatureFactory ;
14
+ import datastax .astra .migrate .cql .features .Featureset ;
10
15
import datastax .astra .migrate .cql .statements .OriginSelectByPartitionRangeStatement ;
11
16
import datastax .astra .migrate .cql .statements .TargetSelectByPKStatement ;
12
17
import datastax .astra .migrate .properties .KnownProperties ;
@@ -36,6 +41,10 @@ public class DiffJobSession extends CopyJobSession {
36
41
private final AtomicLong skippedCounter = new AtomicLong (0 );
37
42
38
43
private final boolean isCounterTable ;
44
+ private final List <Integer > targetToOriginColumnIndexes ;
45
+ private final List <MigrateDataType > targetColumnTypes ;
46
+ private final int explodeMapKeyIndex ;
47
+ private final int explodeMapValueIndex ;
39
48
40
49
private DiffJobSession (CqlSession originSession , CqlSession targetSession , SparkConf sc ) {
41
50
super (originSession , targetSession , sc );
@@ -46,7 +55,20 @@ private DiffJobSession(CqlSession originSession, CqlSession targetSession, Spark
46
55
autoCorrectMismatch = propertyHelper .getBoolean (KnownProperties .TARGET_AUTOCORRECT_MISMATCH );
47
56
logger .info ("PARAM -- Autocorrect Mismatch: {}" , autoCorrectMismatch );
48
57
49
- isCounterTable = cqlHelper .isCounterTable ();
58
+ this .isCounterTable = cqlHelper .isCounterTable ();
59
+ this .targetToOriginColumnIndexes = cqlHelper .getPKFactory ().getTargetToOriginColumnIndexes ();
60
+ this .targetColumnTypes = cqlHelper .getPKFactory ().getTargetColumnTypes ();
61
+
62
+ Feature explodeMapFeature = cqlHelper .getFeature (Featureset .EXPLODE_MAP );
63
+ if (FeatureFactory .isEnabled (explodeMapFeature )) {
64
+ List <String > targetColumnNames = propertyHelper .getStringList (KnownProperties .TARGET_COLUMN_NAMES );
65
+ this .explodeMapKeyIndex = targetColumnNames .indexOf (explodeMapFeature .getString (ExplodeMap .Property .KEY_COLUMN_NAME ));
66
+ this .explodeMapValueIndex = targetColumnNames .indexOf (explodeMapFeature .getString (ExplodeMap .Property .VALUE_COLUMN_NAME ));
67
+ }
68
+ else {
69
+ this .explodeMapKeyIndex = -1 ;
70
+ this .explodeMapValueIndex = -1 ;
71
+ }
50
72
}
51
73
52
74
public static DiffJobSession getInstance (CqlSession originSession , CqlSession targetSession , SparkConf sparkConf ) {
@@ -138,6 +160,7 @@ public synchronized void printCounts(boolean isFinal) {
138
160
}
139
161
140
162
private void diff (Record record ) {
163
+ EnhancedPK originPK = record .getPk ();
141
164
Row originRow = record .getOriginRow ();
142
165
Row targetRow = record .getTargetRow ();
143
166
@@ -156,7 +179,7 @@ private void diff(Record record) {
156
179
return ;
157
180
}
158
181
159
- String diffData = isDifferent (originRow , targetRow );
182
+ String diffData = isDifferent (originPK , originRow , targetRow );
160
183
if (!diffData .isEmpty ()) {
161
184
mismatchCounter .incrementAndGet ();
162
185
logger .error ("Mismatch row found for key: {} Mismatch: {}" , record .getPk (), diffData );
@@ -174,30 +197,27 @@ private void diff(Record record) {
174
197
}
175
198
}
176
199
177
- private String isDifferent (Row originRow , Row targetRow ) {
200
+ private String isDifferent (EnhancedPK pk , Row originRow , Row targetRow ) {
178
201
StringBuffer diffData = new StringBuffer ();
179
- IntStream .range (0 , cqlHelper .getOriginColTypes ().size ()).parallel ().forEach (index -> {
180
- MigrateDataType dataTypeObj = cqlHelper .getOriginColTypes ().get (index );
181
- Object origin = cqlHelper .getData (dataTypeObj , index , originRow );
182
- if (index < cqlHelper .getIdColTypes ().size ()) {
183
- Optional <Object > optionalVal = cqlHelper .handleBlankInPrimaryKey (index , origin , dataTypeObj .typeClass , originRow , false );
184
- if (optionalVal .isPresent ()) {
185
- origin = optionalVal .get ();
186
- }
187
- }
202
+ IntStream .range (0 , targetColumnTypes .size ()).parallel ().forEach (targetIndex -> {
203
+ MigrateDataType dataTypeObj = targetColumnTypes .get (targetIndex );
204
+ Object target = cqlHelper .getData (dataTypeObj , targetIndex , targetRow );
188
205
189
- Object target = cqlHelper .getData (dataTypeObj , index , targetRow );
206
+ Object origin ;
207
+ if (targetIndex == explodeMapKeyIndex ) origin = pk .getExplodeMapKey ();
208
+ else if (targetIndex == explodeMapValueIndex ) origin = pk .getExplodeMapValue ();
209
+ else origin = cqlHelper .getData (dataTypeObj , targetToOriginColumnIndexes .get (targetIndex ), originRow );
190
210
191
211
boolean isDiff = dataTypeObj .diff (origin , target );
192
212
if (isDiff ) {
193
- if (dataTypeObj .typeClass .equals (UdtValue .class )) {
213
+ if (dataTypeObj .getTypeClass () .equals (UdtValue .class )) {
194
214
String originUdtContent = ((UdtValue ) origin ).getFormattedContents ();
195
215
String targetUdtContent = ((UdtValue ) target ).getFormattedContents ();
196
216
if (!originUdtContent .equals (targetUdtContent )) {
197
- diffData .append ("(Index: " + index + " Origin: " + originUdtContent + " Target: " + targetUdtContent + ") " );
217
+ diffData .append ("(Target Index: " + targetIndex + " Origin: " + originUdtContent + " Target: " + targetUdtContent + ") " );
198
218
}
199
219
} else {
200
- diffData .append ("( Index: " + index + " Origin: " + origin + " Target: " + target + ") " );
220
+ diffData .append ("Target Index: " + targetIndex + " Origin: " + origin + " Target: " + target + ") " );
201
221
}
202
222
}
203
223
});
0 commit comments