Skip to content

Commit 5f8aafb

Browse files
committed
CDM-23 allowing bindInsert to return a collection
1 parent 840a062 commit 5f8aafb

File tree

5 files changed

+57
-25
lines changed

5 files changed

+57
-25
lines changed

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import java.math.BigInteger;
1111
import java.util.ArrayList;
12+
import java.util.List;
1213
import java.util.Collection;
1314
import java.util.concurrent.CompletionStage;
1415
import java.util.concurrent.atomic.AtomicLong;
@@ -95,15 +96,17 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
9596
targetRow = targetResultSet.one();
9697
}
9798

98-
BoundStatement bInsert = cqlHelper.bindInsertOneRow(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), originRow, targetRow);
99-
if (null == bInsert) {
99+
List<BoundStatement> bInsertList = cqlHelper.bindInsert(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), originRow, targetRow);
100+
if (null == bInsertList || bInsertList.isEmpty()) {
100101
skipCnt++;
101102
continue;
102103
}
103-
CompletionStage<AsyncResultSet> targetWriteResultSet = cqlHelper.getTargetSession().executeAsync(bInsert);
104-
writeResults.add(targetWriteResultSet);
105-
if (writeResults.size() > cqlHelper.getFetchSizeInRows()) {
106-
writeCnt += iterateAndClearWriteResults(writeResults, 1);
104+
for (BoundStatement bInsert : bInsertList) {
105+
CompletionStage<AsyncResultSet> targetWriteResultSet = cqlHelper.getTargetSession().executeAsync(bInsert);
106+
writeResults.add(targetWriteResultSet);
107+
if (writeResults.size() > cqlHelper.getFetchSizeInRows()) {
108+
writeCnt += iterateAndClearWriteResults(writeResults, 1);
109+
}
107110
}
108111
}
109112

@@ -128,22 +131,24 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
128131
}
129132

130133
writeLimiter.acquire(1);
131-
BoundStatement bInsert = cqlHelper.bindInsertOneRow(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), originRow, null);
132-
if (null == bInsert) {
134+
List<BoundStatement> bInsertList = cqlHelper.bindInsert(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), originRow, null);
135+
if (null == bInsertList || bInsertList.isEmpty()) {
133136
skipCnt++;
134137
continue;
135138
}
136-
batchStatement = batchStatement.add(bInsert);
137-
138-
// if batch threshold is met, send the writes and clear the batch
139-
if (batchStatement.size() >= cqlHelper.getBatchSize()) {
140-
CompletionStage<AsyncResultSet> writeResultSet = cqlHelper.getTargetSession().executeAsync(batchStatement);
141-
writeResults.add(writeResultSet);
142-
batchStatement = BatchStatement.newInstance(BatchType.UNLOGGED);
143-
}
139+
for (BoundStatement bInsert : bInsertList) {
140+
batchStatement = batchStatement.add(bInsert);
141+
142+
// if batch threshold is met, send the writes and clear the batch
143+
if (batchStatement.size() >= cqlHelper.getBatchSize()) {
144+
CompletionStage<AsyncResultSet> writeResultSet = cqlHelper.getTargetSession().executeAsync(batchStatement);
145+
writeResults.add(writeResultSet);
146+
batchStatement = BatchStatement.newInstance(BatchType.UNLOGGED);
147+
}
144148

145-
if (writeResults.size() * cqlHelper.getBatchSize() > cqlHelper.getFetchSizeInRows()) {
146-
writeCnt += iterateAndClearWriteResults(writeResults, cqlHelper.getBatchSize());
149+
if (writeResults.size() * cqlHelper.getBatchSize() > cqlHelper.getFetchSizeInRows()) {
150+
writeCnt += iterateAndClearWriteResults(writeResults, cqlHelper.getBatchSize());
151+
}
147152
}
148153
}
149154

src/main/java/datastax/astra/migrate/CopyPKJobSession.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,15 @@ public void getRowAndInsert(List<SplitPartitions.PKRows> rowsList) {
5555
logger.error("Could not find row with primary-key: {}", row);
5656
return;
5757
}
58-
ResultSet targetWriteResultSet = cqlHelper.getTargetSession()
59-
.execute(cqlHelper.bindInsertOneRow(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), pkRow, null));
60-
writeCounter.incrementAndGet();
61-
if (readCounter.get() % printStatsAfter == 0) {
62-
printCounts(false);
58+
List<BoundStatement> boundInserts = cqlHelper.bindInsert(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), pkRow, null);
59+
if (null != boundInserts) {
60+
for (BoundStatement bs : boundInserts) {
61+
ResultSet targetWriteResultSet = cqlHelper.getTargetSession().execute(bs);
62+
writeCounter.incrementAndGet();
63+
if (readCounter.get() % printStatsAfter == 0) {
64+
printCounts(false);
65+
}
66+
}
6367
}
6468
});
6569
}

src/main/java/datastax/astra/migrate/cql/CqlHelper.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,22 @@ public boolean initialize() {
9696
return validInit;
9797
}
9898

99+
public List<BoundStatement> bindInsert(PreparedStatement insertStatement, Row originRow, Row targetRow) {
100+
List<BoundStatement> rtnList = new ArrayList<>();
101+
if (isFeatureEnabled(Featureset.EXPLODE_MAP)) {
102+
Feature explodeMapFeature = getFeature(Featureset.EXPLODE_MAP);
103+
Map maptoExplode = (Map) getData(explodeMapFeature.getMigrateDataType(ExplodeMap.Property.MAP_COLUMN_TYPE),
104+
explodeMapFeature.getInteger(ExplodeMap.Property.MAP_COLUMN_INDEX),
105+
originRow);
106+
for (Object key : maptoExplode.keySet()) {
107+
rtnList.add(bindInsertOneRow(insertStatement, originRow, targetRow, key, maptoExplode.get(key)));
108+
}
109+
} else {
110+
rtnList.add(bindInsertOneRow(insertStatement, originRow, targetRow, null, null));
111+
}
112+
return rtnList;
113+
}
114+
99115
public BoundStatement bindInsertOneRow(PreparedStatement insertStatement, Row originRow, Row targetRow) {
100116
return bindInsertOneRow(insertStatement, originRow, targetRow, null, null);
101117
}
@@ -119,7 +135,9 @@ public BoundStatement bindInsertOneRow(PreparedStatement insertStatement, Row or
119135
// This loops over the selected columns and binds each type to the boundInsertStatement
120136
Feature explodeMapFeature = getFeature(Featureset.EXPLODE_MAP);
121137
for (index = 0; index < originColTypesSize; index++) {
122-
if (mapKey != null && explodeMapFeature.isEnabled() && index == explodeMapFeature.getInteger(ExplodeMap.Property.MAP_COLUMN_INDEX)) {
138+
if (mapKey != null &&
139+
isFeatureEnabled(Featureset.EXPLODE_MAP) &&
140+
index == explodeMapFeature.getInteger(ExplodeMap.Property.MAP_COLUMN_INDEX)) {
123141
// This substitutes the map column with the key and value types of the map
124142
boundInsertStatement = boundInsertStatement.set(index, mapKey, explodeMapFeature.getMigrateDataType(ExplodeMap.Property.KEY_COLUMN_TYPE).getType());
125143
// Add an 'extra' column to the statement, which will also increase the loop limit

src/main/java/datastax/astra/migrate/cql/features/ExplodeMap.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public class ExplodeMap extends AbstractFeature {
1616
public enum Property {
1717
MAP_COLUMN_NAME,
1818
MAP_COLUMN_INDEX,
19+
MAP_COLUMN_TYPE,
1920
KEY_COLUMN_NAME,
2021
KEY_COLUMN_TYPE,
2122
VALUE_COLUMN_NAME,
@@ -39,6 +40,7 @@ public boolean initialize(PropertyHelper helper) {
3940
putNumber(Property.MAP_COLUMN_INDEX, helper.getStringList(KnownProperties.ORIGIN_COLUMN_NAMES).indexOf(mapColumnName));
4041

4142
MigrateDataType columnMapDataType = getColumnMapDataType(helper);
43+
putMigrateDataType(Property.MAP_COLUMN_TYPE, columnMapDataType);
4244
putMigrateDataType(Property.KEY_COLUMN_TYPE, columnMapDataType.getSubTypeTypes().get(0));
4345
putMigrateDataType(Property.VALUE_COLUMN_TYPE, columnMapDataType.getSubTypeTypes().get(1));
4446
}

src/test/java/datastax/astra/migrate/cql/features/ExplodeMapTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public void setup() {
3030
public void tearDown() {
3131
PropertyHelper.destroyInstance();
3232
validSparkConf = null;
33+
feature = null;
3334
}
3435

3536
private void setValidSparkConf() {
@@ -54,6 +55,7 @@ public void smokeTest_initialize() {
5455
() -> assertEquals("map_key", feature.getAsString(ExplodeMap.Property.KEY_COLUMN_NAME), "KEY_COLUMN_NAME"),
5556
() -> assertEquals("map_val", feature.getAsString(ExplodeMap.Property.VALUE_COLUMN_NAME), "VALUE_COLUMN_NAME"),
5657
() -> assertEquals(2, feature.getNumber(ExplodeMap.Property.MAP_COLUMN_INDEX), "MAP_COLUMN_INDEX"),
58+
() -> assertEquals(new MigrateDataType("5%0%3"), feature.getMigrateDataType(ExplodeMap.Property.MAP_COLUMN_TYPE), "MAP_COLUMN_TYPE"),
5759
() -> assertEquals(new MigrateDataType("0"), feature.getMigrateDataType(ExplodeMap.Property.KEY_COLUMN_TYPE), "KEY_COLUMN_TYPE"),
5860
() -> assertEquals(new MigrateDataType("3"), feature.getMigrateDataType(ExplodeMap.Property.VALUE_COLUMN_TYPE), "VALUE_COLUMN_TYPE")
5961
);
@@ -65,8 +67,9 @@ public void smokeTest_alterProperties() {
6567
helper.initializeSparkConf(validSparkConf);
6668
feature.initialize(helper);
6769
feature.alterProperties(helper);
70+
6871
assertAll(
69-
() -> assertTrue(feature.isEnabled()),
72+
() -> assertTrue(feature.isEnabled(), "isEnabled"),
7073
() -> assertEquals(Arrays.asList("key","map_key"), helper.getStringList(KnownProperties.TARGET_PRIMARY_KEY), "TARGET_PRIMARY_KEY"),
7174
() -> assertEquals(Arrays.asList(new MigrateDataType("4"),new MigrateDataType("0")), helper.getMigrationTypeList(KnownProperties.TARGET_PRIMARY_KEY_TYPES), "TARGET_PRIMARY_KEY_TYPES"),
7275
() -> assertEquals(helper.getStringList(KnownProperties.TARGET_PRIMARY_KEY).size(), helper.getMigrationTypeList(KnownProperties.TARGET_PRIMARY_KEY_TYPES).size(), "sizes match"),

0 commit comments

Comments
 (0)