Skip to content

Commit 55543f6

Browse files
authored
Merge pull request #14 from Ankitp1342/feature/diff_only
Add new flag to DiffData to only show the diff and not insert into de…
2 parents 25d50dd + 6c1f5b6 commit 55543f6

File tree

2 files changed

+32
-3
lines changed

2 files changed

+32
-3
lines changed

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class DiffJobSession extends CopyJobSession {
3636
private AtomicLong skippedCounter = new AtomicLong(0);
3737

3838
protected List<MigrateDataType> selectColTypes = new ArrayList<MigrateDataType>();
39+
protected Boolean isDiffOnly = false;
3940

4041

4142
public static DiffJobSession getInstance(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
@@ -54,6 +55,7 @@ private DiffJobSession(CqlSession sourceSession, CqlSession astraSession, SparkC
5455
super(sourceSession, astraSession, sparkConf);
5556

5657
selectColTypes = getTypes(sparkConf.get("spark.migrate.diff.select.types"));
58+
isDiffOnly = Boolean.parseBoolean(sparkConf.get("spark.migrate.isDiffOnly", "false"));
5759
}
5860

5961
public void getDataAndDiff(BigInteger min, BigInteger max) {
@@ -120,9 +122,12 @@ private void diff(Row sourceRow, Row astraRow) {
120122
missingCounter.incrementAndGet();
121123
logger.error("Data is missing in Astra: " + getKey(sourceRow));
122124
//correct data
123-
astraSession.execute(bindInsert(astraInsertStatement, sourceRow));
124-
correctedMissingCounter.incrementAndGet();
125-
logger.error("Corrected missing data in Astra: " + getKey(sourceRow));
125+
126+
if (!isDiffOnly) {
127+
astraSession.execute(bindInsert(astraInsertStatement, sourceRow));
128+
correctedMissingCounter.incrementAndGet();
129+
logger.error("Corrected missing data in Astra: " + getKey(sourceRow));
130+
}
126131
return;
127132
}
128133

src/resources/autoSparkConfig.py

100644100755
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@
5555
'inet': '0'
5656

5757
}
58+
system_keyspace = ['OpsCenter','dse_insights_local','solr_admin','test','dse_system','dse_analytics','system_auth','system_traces','system','dse_system_local','system_distributed','system_schema','dse_perf','dse_insights','dse_security','dse_system','killrvideo','dse_leases','dsefs_c4z','HiveMetaStore','dse_analytics','dsefs','spark_system']
59+
tp_tbl_data = {
60+
'Materialized Views':{},
61+
'Secondary Indexes':{},
62+
'Storage-Attached Indexes':{}
63+
}
5864

5965
def field_type_comment(tbl,fieldName,fieldType):
6066
if fieldType=='date':
@@ -99,6 +105,18 @@ def field_type_comment(tbl,fieldName,fieldType):
99105
cfg_array = {}
100106
schema_name = 'schema'
101107

108+
def add_tp_tbl(gr,ks,tbl,src_ks,src_tbl):
109+
if src_ks not in system_keyspace:
110+
try:
111+
type(tp_tbl_data[gr][src_ks])
112+
except:
113+
tp_tbl_data[gr][src_ks]={}
114+
try:
115+
type(tp_tbl_data[gr][src_ks][src_tbl])
116+
except:
117+
tp_tbl_data[gr][src_ks][src_tbl] = []
118+
if (ks+'.'+tbl) not in tp_tbl_data[gr][src_ks][src_tbl]:
119+
tp_tbl_data[gr][src_ks][src_tbl].append(ks+'.'+tbl)
102120

103121
def process_field(tbl,fieldName,fieldType,cql=''):
104122
if 'map<' in fieldType:
@@ -113,6 +131,12 @@ def process_field(tbl,fieldName,fieldType,cql=''):
113131
for mapType in mapData:
114132
fieldValue += '%' + field_type_array[mapType.strip()]
115133
field_type_comment(tbl,fieldName,mapType.strip())
134+
elif 'list<' in fieldType:
135+
mapData = fieldType.split('<')[1].split('>')[0].split(',')
136+
fieldValue = field_type_array['list']
137+
for mapType in mapData:
138+
fieldValue += '%' + field_type_array[mapType.strip()]
139+
field_type_comment(tbl,fieldName,mapType.strip())
116140
elif fieldType in field_types:
117141
fieldValue = field_type_array[fieldType]
118142
else:

0 commit comments

Comments
 (0)