Skip to content

Commit f9e597e

Browse files
committed
Cleanup of initialize() method, ready for some Cql unit testing
1 parent 51435c5 commit f9e597e

File tree

7 files changed

+347
-338
lines changed

7 files changed

+347
-338
lines changed

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ public class AbstractJobSession extends BaseJobSession {
1111

1212
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
1313

14-
protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
15-
this(sourceSession, astraSession, sc, false);
14+
protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
15+
this(originSession, targetSession, sc, false);
1616
}
1717

1818
protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc, boolean isJobMigrateRowsFromFile) {

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
8989
Row targetRow = null;
9090
if (cqlHelper.isCounterTable()) {
9191
ResultSet targetResultSet = cqlHelper.getTargetSession()
92-
.execute(cqlHelper.selectFromTargetByPK(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_SELECT_BY_PK), originRow));
92+
.execute(cqlHelper.selectFromTargetByPK(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_SELECT_ORIGIN_BY_PK), originRow));
9393
targetRow = targetResultSet.one();
9494
}
9595

src/main/java/datastax/astra/migrate/CqlHelper.java

Lines changed: 322 additions & 328 deletions
Large diffs are not rendered by default.

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
8080
printCounts(false);
8181
}
8282

83-
BoundStatement bSelect = cqlHelper.selectFromTargetByPK(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_SELECT_BY_PK), srcRow);
83+
BoundStatement bSelect = cqlHelper.selectFromTargetByPK(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_SELECT_ORIGIN_BY_PK), srcRow);
8484
if (null == bSelect) {
8585
skippedCounter.incrementAndGet();
8686
} else {

src/main/java/datastax/astra/migrate/properties/KnownProperties.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public enum PropertyType {
7070
public static final String ORIGIN_WRITETIME_COLS = "spark.query.writetime.cols"; // 2,3
7171
public static final String ORIGIN_IS_COUNTER = "spark.counterTable"; // false
7272
public static final String ORIGIN_COUNTER_CQL = "spark.counterTable.cql";
73-
public static final String ORIGIN_COUNTER_INDEX = "spark.counterTable.cql.index"; // 0
73+
public static final String ORIGIN_COUNTER_INDEXES = "spark.counterTable.cql.index"; // 0
7474
static {
7575
types.put(ORIGIN_KEYSPACE_TABLE, PropertyType.STRING);
7676
required.add(ORIGIN_KEYSPACE_TABLE);
@@ -85,8 +85,8 @@ public enum PropertyType {
8585
types.put(ORIGIN_IS_COUNTER, PropertyType.BOOLEAN);
8686
defaults.put(ORIGIN_IS_COUNTER, "false");
8787
types.put(ORIGIN_COUNTER_CQL, PropertyType.STRING);
88-
types.put(ORIGIN_COUNTER_INDEX, PropertyType.NUMBER_LIST);
89-
defaults.put(ORIGIN_COUNTER_INDEX, "0");
88+
types.put(ORIGIN_COUNTER_INDEXES, PropertyType.NUMBER_LIST);
89+
defaults.put(ORIGIN_COUNTER_INDEXES, "0");
9090

9191
}
9292

@@ -98,11 +98,10 @@ public enum PropertyType {
9898
public static final String ORIGIN_FILTER_WRITETS_MIN = "spark.origin.minWriteTimeStampFilter"; // 0
9999
public static final String ORIGIN_FILTER_WRITETS_MAX = "spark.origin.maxWriteTimeStampFilter"; // 4102444800000000
100100
public static final String ORIGIN_FILTER_COLUMN_ENABLED = "spark.origin.FilterData"; // false
101-
public static final String ORIGIN_FILTER_COLUMN_NAME = "spark.origin.FilterColumn"; // test
102101
public static final String ORIGIN_FILTER_COLUMN_INDEX = "spark.origin.FilterColumnIndex"; // 2
103102
public static final String ORIGIN_FILTER_COLUMN_TYPE = "spark.origin.FilterColumnType"; // 6%16
104103
public static final String ORIGIN_FILTER_COLUMN_VALUE = "spark.origin.FilterColumnValue"; // test
105-
public static final String ORIGIN_COVERAGE_PERCENT = "spark.coveragePercent"; // 100
104+
public static final String ORIGIN_COVERAGE_PERCENT = "spark.coveragePercent"; // 100
106105
public static final String ORIGIN_HAS_RANDOM_PARTITIONER = "spark.origin.hasRandomPartitioner"; // false
107106

108107
public static final String ORIGIN_CHECK_COLSIZE_ENABLED = "spark.origin.checkTableforColSize"; // false
@@ -119,7 +118,6 @@ public enum PropertyType {
119118
defaults.put(ORIGIN_FILTER_WRITETS_MAX, "0");
120119
types.put(ORIGIN_FILTER_COLUMN_ENABLED, PropertyType.BOOLEAN);
121120
defaults.put(ORIGIN_FILTER_COLUMN_ENABLED, "false");
122-
types.put(ORIGIN_FILTER_COLUMN_NAME, PropertyType.STRING);
123121
types.put(ORIGIN_FILTER_COLUMN_INDEX, PropertyType.NUMBER);
124122
defaults.put(ORIGIN_FILTER_COLUMN_INDEX, "0");
125123
types.put(ORIGIN_FILTER_COLUMN_TYPE, PropertyType.MIGRATION_TYPE);
@@ -140,6 +138,7 @@ public enum PropertyType {
140138
//==========================================================================
141139
public static final String TARGET_KEYSPACE_TABLE = "spark.target.keyspaceTable"; // test.a1
142140
public static final String TARGET_PRIMARY_KEY = "spark.query.target.id"; // comma-separated-partition-key,comma-separated-clustering-key
141+
public static final String TARGET_PRIMARY_KEY_TYPES = "spark.query.target.id.types"; // 9,1,4,3
143142
public static final String TARGET_COLUMN_NAMES = "spark.query.target";
144143
public static final String TARGET_CUSTOM_WRITETIME = "spark.target.custom.writeTime"; // 0
145144
public static final String TARGET_AUTOCORRECT_MISSING = "spark.target.autocorrect.missing"; // false
@@ -150,6 +149,7 @@ public enum PropertyType {
150149
required.add(TARGET_KEYSPACE_TABLE);
151150
types.put(TARGET_PRIMARY_KEY, PropertyType.STRING_LIST);
152151
required.add(TARGET_PRIMARY_KEY);
152+
types.put(TARGET_PRIMARY_KEY_TYPES, PropertyType.MIGRATION_TYPE_LIST);
153153
types.put(TARGET_COLUMN_NAMES, PropertyType.STRING_LIST);
154154
required.add(TARGET_COLUMN_NAMES); // we need this, though it should be defaulted with ORIGIN_COLUMN_NAMES value
155155
types.put(TARGET_CUSTOM_WRITETIME, PropertyType.NUMBER);

src/main/java/datastax/astra/migrate/properties/PropertyHelper.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,15 @@ protected void loadSparkConf() {
240240
setProperty(KnownProperties.TARGET_COLUMN_NAMES, get(KnownProperties.ORIGIN_COLUMN_NAMES));
241241
}
242242

243+
// Target column list defaults to the source column list
244+
if (null == get(KnownProperties.TARGET_PRIMARY_KEY_TYPES) || getAsString(KnownProperties.TARGET_PRIMARY_KEY_TYPES).isEmpty()) {
245+
if (null != getMigrationTypeList(KnownProperties.ORIGIN_COLUMN_TYPES) && !getMigrationTypeList(KnownProperties.ORIGIN_COLUMN_TYPES).isEmpty()) {
246+
List<MigrateDataType> targetPKTypes = getMigrationTypeList(KnownProperties.ORIGIN_COLUMN_TYPES).subList(0, getStringList(KnownProperties.TARGET_PRIMARY_KEY).size());
247+
logger.info("Setting known property [" + KnownProperties.TARGET_PRIMARY_KEY_TYPES + "] based on [" + KnownProperties.ORIGIN_COLUMN_TYPES + "], which is [" + getAsString(KnownProperties.ORIGIN_COLUMN_TYPES) + "]");
248+
setProperty(KnownProperties.TARGET_PRIMARY_KEY_TYPES, targetPKTypes);
249+
}
250+
}
251+
243252
if (fullyLoaded) {
244253
fullyLoaded = isValidConfig();
245254
}

src/test/java/datastax/astra/migrate/properties/PropertyHelperTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,12 @@ public void getNumber() {
209209
assertEquals(4321, helper.getNumber(KnownProperties.TEST_NUMBER));
210210
}
211211

212+
@Test
213+
public void getNumber_NullValue() {
214+
helper.setProperty(KnownProperties.TEST_NUMBER, null);
215+
assertNull(helper.getNumber(KnownProperties.TEST_NUMBER));
216+
}
217+
212218
@Test
213219
public void getInteger_Integer() {
214220
helper.setProperty(KnownProperties.TEST_NUMBER, 1234);

0 commit comments

Comments
 (0)