Skip to content

Commit 150f402

Browse files
committed
added guardrail config element
1 parent 5ada164 commit 150f402

File tree

2 files changed

+10
-7
lines changed

2 files changed

+10
-7
lines changed

src/main/java/datastax/astra/migrate/OriginCountJobSession.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class OriginCountJobSession extends BaseJobSession{
2424
protected String filterColName;
2525
protected String filterColType;
2626
protected Integer filterColIndex;
27+
protected Integer fieldGuardraillimitMB;
2728
protected List<MigrateDataType> checkTableforColSizeTypes = new ArrayList<MigrateDataType>();
2829
public static OriginCountJobSession getInstance(CqlSession sourceSession, SparkConf sparkConf) {
2930
if (originCountJobSession == null) {
@@ -57,6 +58,7 @@ protected OriginCountJobSession(CqlSession sourceSession, SparkConf sparkConf) {
5758
filterColName = sparkConf.get("spark.origin.FilterColumn");
5859
filterColType = sparkConf.get("spark.origin.FilterColumnType");
5960
filterColIndex = Integer.parseInt(sparkConf.get("spark.origin.FilterColumnIndex", "0"));
61+
fieldGuardraillimitMB = Integer.parseInt(sparkConf.get("spark.fieldGuardraillimitMB", "0"));
6062

6163
String partionKey = sparkConf.get("spark.query.cols.partitionKey");
6264
idColTypes = getTypes(sparkConf.get("spark.query.cols.id.types"));
@@ -92,7 +94,7 @@ public void getData(BigInteger min, BigInteger max) {
9294
if(checkTableforColSize) {
9395
int rowColcnt = GetRowColumnLength(sourceRow, filterColType, filterColIndex);
9496
String result = "";
95-
if (rowColcnt > 1024 * 1024 * 10) {
97+
if (rowColcnt > fieldGuardraillimitMB) {
9698
for (int index = 0; index < checkTableforColSizeTypes.size(); index++) {
9799
MigrateDataType dataType = checkTableforColSizeTypes.get(index);
98100
Object colData = getData(dataType, index, sourceRow);
@@ -114,7 +116,7 @@ public void getData(BigInteger min, BigInteger max) {
114116
if(checkTableforColSize) {
115117
int rowColcnt = GetRowColumnLength(sourceRow, filterColType, filterColIndex);
116118
String result = "";
117-
if (rowColcnt > 1024 * 1024 * 10) {
119+
if (rowColcnt > fieldGuardraillimitMB) {
118120
for (int index = 0; index < checkTableforColSizeTypes.size(); index++) {
119121
MigrateDataType dataType = checkTableforColSizeTypes.get(index);
120122
Object colData = getData(dataType, index, sourceRow);
@@ -145,13 +147,13 @@ public void getData(BigInteger min, BigInteger max) {
145147
}
146148

147149
private int GetRowColumnLength(Row sourceRow, String filterColType, Integer filterColIndex) {
148-
int i = 0;
150+
int sizeInMB = 0;
149151
Object colData = getData(new MigrateDataType(filterColType), filterColIndex, sourceRow);
150152
byte[] colBytes = SerializationUtils.serialize((Serializable) colData);
151-
i = colBytes.length;
152-
if (i > 1024*1024*10)
153-
return i;
154-
return i;
153+
sizeInMB = colBytes.length;
154+
if (sizeInMB > fieldGuardraillimitMB)
155+
return sizeInMB;
156+
return sizeInMB;
155157
}
156158

157159
}

src/resources/sparkConf.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ spark.splitSize 10000
2828
spark.batchSize 5
2929
spark.coveragePercent 100
3030
spark.printStatsAfter 100000
31+
spark.fieldGuardraillimitMB 1024 * 1024 * 10
3132

3233
spark.query.origin partition-key,clustering-key,order-date,amount
3334
spark.query.origin.partitionKey partition-key

0 commit comments

Comments
 (0)