Skip to content

Commit dd8843f

Browse files
authored
CDM-57 min/max partition defaults based on partitioner rather than co… (#150)
* CDM-57 min/max partition defaults based on partitioner rather than config setting * CDM-57 fixing feature post-merge --------- Co-authored-by: Phil Miesle <[email protected]>
1 parent 7a3fb8a commit dd8843f

File tree

6 files changed

+48
-86
lines changed

6 files changed

+48
-86
lines changed

src/main/java/com/datastax/cdm/cql/statement/OriginSelectByPartitionRangeStatement.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,20 @@
1616
public class OriginSelectByPartitionRangeStatement extends OriginSelectStatement {
1717
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
1818

19-
private final Long defaultMinPartition;
20-
private final Long defaultMaxPartition;
21-
2219
public OriginSelectByPartitionRangeStatement(IPropertyHelper propertyHelper, EnhancedSession session) {
2320
super(propertyHelper, session);
24-
25-
defaultMinPartition = propertyHelper.getLong(KnownProperties.PARTITION_MIN);
26-
defaultMaxPartition = propertyHelper.getLong(KnownProperties.PARTITION_MAX);
2721
}
2822

2923
@Override
3024
public BoundStatement bind(Object... binds) {
3125
if (null==binds
3226
|| binds.length != 2
33-
|| !(null==binds[0] || binds[0] instanceof BigInteger)
34-
|| !(null==binds[1] || binds[1] instanceof BigInteger))
35-
throw new RuntimeException("Expected 2 nullable bind of type BigInteger, got " + binds.length);
27+
|| !(binds[0] instanceof BigInteger)
28+
|| !(binds[1] instanceof BigInteger))
29+
throw new RuntimeException("Expected 2 not-null binds of type BigInteger, got " + binds.length);
3630

37-
BigInteger min = (null==binds[0]) ? BigInteger.valueOf(defaultMinPartition) : (BigInteger) binds[0];
38-
BigInteger max = (null==binds[1]) ? BigInteger.valueOf(defaultMaxPartition) : (BigInteger) binds[1];
31+
BigInteger min = (BigInteger) binds[0];
32+
BigInteger max = (BigInteger) binds[1];
3933

4034
PreparedStatement preparedStatement = prepareStatement();
4135
// random partitioner uses BigInteger, the normal partitioner uses long

src/main/java/com/datastax/cdm/properties/KnownProperties.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,8 @@ public enum PropertyType {
172172
public static final String PARTITION_MAX = "spark.cdm.filter.cassandra.partition.max";
173173
public static final String FILTER_CQL_WHERE_CONDITION = "spark.cdm.filter.cassandra.where.condition";
174174
static {
175-
types.put(PARTITION_MIN, PropertyType.NUMBER);
176-
defaults.put(PARTITION_MIN, "-9223372036854775808");
177-
types.put(PARTITION_MAX, PropertyType.NUMBER);
178-
defaults.put(PARTITION_MAX, "9223372036854775807");
175+
types.put(PARTITION_MIN, PropertyType.STRING);
176+
types.put(PARTITION_MAX, PropertyType.STRING);
179177
types.put(FILTER_CQL_WHERE_CONDITION, PropertyType.STRING);
180178
}
181179

src/main/scala/com/datastax/cdm/job/BaseJob.scala

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,16 @@ abstract class BaseJob[T: ClassTag] extends App {
5757
propertyHelper = PropertyHelper.getInstance(sc);
5858

5959
consistencyLevel = propertyHelper.getString(KnownProperties.READ_CL)
60-
minPartition = new BigInteger(propertyHelper.getAsString(KnownProperties.PARTITION_MIN))
61-
maxPartition = new BigInteger(propertyHelper.getAsString(KnownProperties.PARTITION_MAX))
60+
val connectionFetcher = new ConnectionFetcher(sContext, propertyHelper)
61+
originConnection = connectionFetcher.getConnection("ORIGIN", consistencyLevel)
62+
targetConnection = connectionFetcher.getConnection("TARGET", consistencyLevel)
63+
64+
val hasRandomPartitioner: Boolean = {
65+
val partitionerName = originConnection.withSessionDo(_.getMetadata.getTokenMap.get().getPartitionerName)
66+
partitionerName.endsWith("RandomPartitioner")
67+
}
68+
minPartition = getMinPartition(propertyHelper.getString(KnownProperties.PARTITION_MIN), hasRandomPartitioner)
69+
maxPartition = getMaxPartition(propertyHelper.getString(KnownProperties.PARTITION_MAX), hasRandomPartitioner)
6270
coveragePercent = propertyHelper.getInteger(KnownProperties.TOKEN_COVERAGE_PERCENT)
6371
numSplits = propertyHelper.getInteger(KnownProperties.PERF_NUM_PARTS)
6472
abstractLogger.info("PARAM -- Min Partition: " + minPartition)
@@ -71,9 +79,6 @@ abstract class BaseJob[T: ClassTag] extends App {
7179
abstractLogger.info("PARAM Calculated -- Total Partitions: " + parts.size())
7280
abstractLogger.info("Spark parallelize created : " + slices.count() + " slices!");
7381

74-
val connectionFetcher = new ConnectionFetcher(sContext, propertyHelper)
75-
originConnection = connectionFetcher.getConnection("ORIGIN", consistencyLevel)
76-
targetConnection = connectionFetcher.getConnection("TARGET", consistencyLevel)
7782
}
7883

7984
def getParts(pieces: Int): util.Collection[T]
@@ -125,4 +130,15 @@ abstract class BaseJob[T: ClassTag] extends App {
125130
abstractLogger.info(bannerFill)
126131
}
127132

133+
def getMinPartition(minPartition: String, hasRandomPartitioner: Boolean): BigInteger = {
134+
if (minPartition != null && minPartition.nonEmpty) new BigInteger(minPartition)
135+
else if (hasRandomPartitioner) BigInteger.ZERO
136+
else BigInteger.valueOf(Long.MinValue)
137+
}
138+
139+
def getMaxPartition(maxPartition: String, hasRandomPartitioner: Boolean): BigInteger = {
140+
if (maxPartition != null && maxPartition.nonEmpty) new BigInteger(maxPartition)
141+
else if (hasRandomPartitioner) new BigInteger("2").pow(127).subtract(BigInteger.ONE)
142+
else BigInteger.valueOf(Long.MaxValue)
143+
}
128144
}

src/resources/sparkConf.properties

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,8 +235,10 @@ spark.cdm.perfops.writeRateLimit 40000
235235
#
236236
# spark.cdm.filter.cassandra
237237
# .partition
238-
# .min : Default is -9223372036854775808 (-2^63). Lower partition bound (inclusive).
239-
# .min : Default is 9223372036854775807 (2^63-1). Upper partition bound (inclusive).
238+
# .min : Default is 0 (when using RandomPartitioner) and -9223372036854775808 (-2^63)
239+
# otherwise. Lower partition bound (inclusive).
240+
# .min : Default is 2^127-1 (when using RandomPartitioner) and 9223372036854775807
241+
# (2^63-1) otherwise. Upper partition bound (inclusive).
240242
# .where.condition : CQL added to the WHERE clause of SELECTs from origin
241243
#-----------------------------------------------------------------------------------------------------------
242244
#spark.cdm.filter.cassandra.partition.min -9223372036854775808

src/test/java/com/datastax/cdm/cql/CommonMocks.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ public class CommonMocks {
109109
public List<String> targetColumnNames;
110110
public List<DataType> targetColumnTypes;
111111

112-
public Long minPartition;
113-
public Long maxPartition;
112+
public String minPartition;
113+
public String maxPartition;
114114
public ConsistencyLevel readCL;
115115
public Integer fetchSizeInRows;
116116

@@ -180,8 +180,8 @@ public void defaultClassVariables() {
180180
targetKeyspaceName = "target_ks";
181181
targetTableName = "table_name";
182182

183-
minPartition = -9876543L;
184-
maxPartition = 1234567L;
183+
minPartition = "-9876543";
184+
maxPartition = "1234567";
185185
readCL = ConsistencyLevel.LOCAL_QUORUM;
186186
fetchSizeInRows = 999;
187187

@@ -239,8 +239,8 @@ public void setPropertyHelperWhens() {
239239
when(propertyHelper.getAsString(KnownProperties.FILTER_COLUMN_NAME)).thenReturn("");
240240
when(propertyHelper.getString(KnownProperties.FILTER_COLUMN_VALUE)).thenReturn(null);
241241

242-
when(propertyHelper.getLong(KnownProperties.PARTITION_MIN)).thenReturn(minPartition);
243-
when(propertyHelper.getLong(KnownProperties.PARTITION_MAX)).thenReturn(maxPartition);
242+
when(propertyHelper.getString(KnownProperties.PARTITION_MIN)).thenReturn(minPartition);
243+
when(propertyHelper.getString(KnownProperties.PARTITION_MAX)).thenReturn(maxPartition);
244244

245245
when(propertyHelper.getStringList(KnownProperties.ORIGIN_COLUMN_NAMES_TO_TARGET)).thenReturn(originToTargetNameList);
246246
}

src/test/java/com/datastax/cdm/cql/statement/OriginSelectByPartitionRangeStatementTest.java

Lines changed: 10 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,14 @@ public void originFilterCondition() {
5555
String cql = originSelectByPartitionRangeStatement.getCQL();
5656
assertEquals(sb.toString(),cql);
5757
}
58+
5859
@Test
59-
public void bind_withNullBinds_usesDefaultPartitions() {
60-
originSelectByPartitionRangeStatement.bind(null, null);
60+
public void bind_withNullBinds() {
6161
assertAll(
62-
() -> verify(preparedStatement).bind(
63-
BigInteger.valueOf(minPartition).longValueExact(),
64-
BigInteger.valueOf(maxPartition).longValueExact()),
65-
() -> verify(boundStatement).setConsistencyLevel(readCL),
66-
() -> verify(boundStatement).setPageSize(fetchSizeInRows)
62+
() -> assertThrows(RuntimeException.class, () -> originSelectByPartitionRangeStatement.bind(null,null), "two null"),
63+
() -> assertThrows(RuntimeException.class, () -> originSelectByPartitionRangeStatement.bind(BigInteger.valueOf(20)), "missing second"),
64+
() -> assertThrows(RuntimeException.class, () -> originSelectByPartitionRangeStatement.bind(BigInteger.valueOf(20),null), "null second"),
65+
() -> assertThrows(RuntimeException.class, () -> originSelectByPartitionRangeStatement.bind(null,BigInteger.valueOf(20)), "null first")
6766
);
6867
}
6968

@@ -74,41 +73,12 @@ public void bind_withNonNullBinds_usesProvidedPartitions() {
7473

7574
originSelectByPartitionRangeStatement.bind(providedMin, providedMax);
7675
assertAll(
77-
() -> verify(preparedStatement).bind(
78-
providedMin.longValueExact(),
79-
providedMax.longValueExact()),
76+
() -> verify(preparedStatement).bind(providedMin.longValueExact(), providedMax.longValueExact()),
8077
() -> verify(boundStatement).setConsistencyLevel(readCL),
8178
() -> verify(boundStatement).setPageSize(fetchSizeInRows)
8279
);
8380
}
8481

85-
@Test
86-
public void bind_withMixedBinds_usesDefaultAndProvidedPartitions() {
87-
BigInteger providedMin = BigInteger.valueOf(12345L);
88-
89-
originSelectByPartitionRangeStatement.bind(providedMin, null);
90-
assertAll(
91-
() -> verify(preparedStatement).bind(
92-
providedMin.longValueExact(),
93-
BigInteger.valueOf(maxPartition).longValueExact()),
94-
() -> verify(boundStatement).setConsistencyLevel(readCL),
95-
() -> verify(boundStatement).setPageSize(fetchSizeInRows)
96-
);
97-
}
98-
99-
@Test
100-
public void bind_withNullBinds_usesDefaultPartitions_whenRandomPartitioner() {
101-
when(originTable.hasRandomPartitioner()).thenReturn(true);
102-
103-
originSelectByPartitionRangeStatement.bind(null, null);
104-
assertAll(
105-
() -> verify(preparedStatement).bind(
106-
BigInteger.valueOf(minPartition),
107-
BigInteger.valueOf(maxPartition)),
108-
() -> verify(boundStatement).setConsistencyLevel(readCL),
109-
() -> verify(boundStatement).setPageSize(fetchSizeInRows)
110-
);
111-
}
11282

11383
@Test
11484
public void bind_withNonNullBinds_usesProvidedPartitions_whenRandomPartitioner() {
@@ -128,29 +98,11 @@ public void bind_withNonNullBinds_usesProvidedPartitions_whenRandomPartitioner()
12898
}
12999

130100
@Test
131-
public void bind_withMixedBinds_usesDefaultAndProvidedPartitions_whenRandomPartitioner() {
132-
when(originTable.hasRandomPartitioner()).thenReturn(true);
133-
134-
BigInteger providedMax = BigInteger.valueOf(999999999L);
135-
136-
originSelectByPartitionRangeStatement.bind(null, providedMax);
101+
public void bind_withInvalidBindType_throwsException() {
137102
assertAll(
138-
() -> verify(preparedStatement).bind(
139-
BigInteger.valueOf(minPartition),
140-
providedMax),
141-
() -> verify(boundStatement).setConsistencyLevel(readCL),
142-
() -> verify(boundStatement).setPageSize(fetchSizeInRows)
103+
() -> assertThrows(RuntimeException.class, () -> originSelectByPartitionRangeStatement.bind("invalidType", BigInteger.valueOf(20)), "invalid first"),
104+
() -> assertThrows(RuntimeException.class, () -> originSelectByPartitionRangeStatement.bind(BigInteger.valueOf(20),"invalidType"), "invalid second")
143105
);
144106
}
145107

146-
@Test
147-
public void bind_withInvalidBindLength_throwsException() {
148-
assertThrows(RuntimeException.class, () -> originSelectByPartitionRangeStatement.bind(null));
149-
}
150-
151-
@Test
152-
public void bind_withInvalidBindType_throwsException() {
153-
assertThrows(RuntimeException.class, () -> originSelectByPartitionRangeStatement.bind("invalidType", BigInteger.valueOf(20)));
154-
}
155-
156108
}

0 commit comments

Comments
 (0)