Skip to content

Commit 3665573

Browse files
authored
Fixed connection issue caused when using different types of origin and target clusters (e.g. Cassandra/DSE with host/port and Astra with SCB). (#334)
1 parent bcfff40 commit 3665573

File tree

8 files changed

+31
-22
lines changed

8 files changed

+31
-22
lines changed

RELEASE.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
# Release Notes
2+
## [5.1.3] - 2024-11-27
3+
- Bug fix: Fixed connection issue caused when using different types of origin and target clusters (e.g. Cassandra/DSE with host/port and Astra with SCB).
4+
25
## [5.1.2] - 2024-11-26
36
- Bug fix: SCB file on some Spark worker nodes may get deleted before the connection is established, which may cause connection exception on that worker node. Added a static async SCB delete delay to address such issues.
47

src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,13 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable)
6666
// TODO: Remove this code block after a few releases, its only added for backward compatibility
6767
try {
6868
this.session.execute("ALTER TABLE " + cdmKsTabInfo + " ADD status TEXT");
69+
} catch (Exception e) { // ignore if column already exists
70+
logger.debug("Column 'status' already exists in table {}", cdmKsTabInfo);
71+
}
72+
try {
6973
this.session.execute("ALTER TABLE " + cdmKsTabDetails + " ADD run_info TEXT");
70-
} catch (Exception e) {
71-
// ignore if column already exists
72-
logger.trace("Column 'status' already exists in table {}", cdmKsTabInfo);
74+
} catch (Exception e) { // ignore if column already exists
75+
logger.debug("Column 'run_info' already exists in table {}", cdmKsTabDetails);
7376
}
7477

7578
boundInitInfoStatement = bindStatement("INSERT INTO " + cdmKsTabInfo

src/main/scala/com/datastax/cdm/job/BaseJob.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ package com.datastax.cdm.job
1717

1818
import com.datastax.cdm.properties.{KnownProperties, PropertyHelper}
1919
import com.datastax.spark.connector.cql.CassandraConnector
20-
import org.apache.spark.{SparkConf, SparkContext}
20+
import org.apache.spark.SparkContext
2121
import org.apache.spark.rdd.RDD
2222
import org.apache.spark.sql.SparkSession
2323
import org.slf4j.LoggerFactory
@@ -40,7 +40,6 @@ abstract class BaseJob[T: ClassTag] extends App {
4040

4141
var spark: SparkSession = _
4242
var sContext: SparkContext = _
43-
var sc: SparkConf = _
4443
var propertyHelper: PropertyHelper = _
4544

4645
var consistencyLevel: String = _
@@ -69,8 +68,7 @@ abstract class BaseJob[T: ClassTag] extends App {
6968
.appName(jobName)
7069
.getOrCreate()
7170
sContext = spark.sparkContext
72-
sc = sContext.getConf
73-
propertyHelper = PropertyHelper.getInstance(sc);
71+
propertyHelper = PropertyHelper.getInstance(sContext.getConf);
7472

7573
runId = propertyHelper.getLong(KnownProperties.RUN_ID)
7674
prevRunId = propertyHelper.getLong(KnownProperties.PREV_RUN_ID)
@@ -79,9 +77,9 @@ abstract class BaseJob[T: ClassTag] extends App {
7977
runId = System.nanoTime();
8078
}
8179
consistencyLevel = propertyHelper.getString(KnownProperties.READ_CL)
82-
connectionFetcher = new ConnectionFetcher(sc, propertyHelper)
83-
originConnection = connectionFetcher.getConnection(Side.ORIGIN, consistencyLevel, runId)
84-
targetConnection = connectionFetcher.getConnection(Side.TARGET, consistencyLevel, runId)
80+
connectionFetcher = new ConnectionFetcher(propertyHelper)
81+
originConnection = connectionFetcher.getConnection(sContext.getConf, Side.ORIGIN, consistencyLevel, runId)
82+
targetConnection = connectionFetcher.getConnection(sContext.getConf, Side.TARGET, consistencyLevel, runId)
8583

8684
val hasRandomPartitioner: Boolean = {
8785
val partitionerName = originConnection.withSessionDo(_.getMetadata.getTokenMap.get().getPartitionerName)

src/main/scala/com/datastax/cdm/job/ConnectionFetcher.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import com.datastax.cdm.data.DataUtility.generateSCB
2323
import com.datastax.cdm.data.PKFactory.Side
2424

2525
// TODO: CDM-31 - add localDC configuration support
26-
class ConnectionFetcher(config: SparkConf, propertyHelper: IPropertyHelper) extends Serializable {
26+
class ConnectionFetcher(propertyHelper: IPropertyHelper) extends Serializable {
2727
val logger: Logger = LoggerFactory.getLogger(this.getClass.getName)
2828

2929
def getConnectionDetails(side: Side): ConnectionDetails = {
@@ -63,7 +63,7 @@ class ConnectionFetcher(config: SparkConf, propertyHelper: IPropertyHelper) exte
6363
}
6464
}
6565

66-
def getConnection(side: Side, consistencyLevel: String, runId: Long): CassandraConnector = {
66+
def getConnection(config: SparkConf, side: Side, consistencyLevel: String, runId: Long): CassandraConnector = {
6767
val connectionDetails = getConnectionDetails(side)
6868

6969
logger.info("PARAM -- SSL Enabled: "+connectionDetails.sslEnabled);

src/main/scala/com/datastax/cdm/job/DiffData.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ package com.datastax.cdm.job
1717

1818
import com.datastax.cdm.feature.TrackRun
1919
import com.datastax.cdm.data.PKFactory.Side
20-
import com.datastax.cdm.properties.{KnownProperties, PropertyHelper}
20+
import com.datastax.cdm.properties.KnownProperties
2121
import com.datastax.cdm.job.IJobSessionFactory.JobType
2222

2323
object DiffData extends BasePartitionJob {
@@ -34,6 +34,8 @@ object DiffData extends BasePartitionJob {
3434
var ma = new CDMMetricsAccumulator(jobType)
3535
sContext.register(ma, "CDMMetricsAccumulator")
3636

37+
val bcOriginConfig = sContext.broadcast(sContext.getConf)
38+
val bcTargetConfig = sContext.broadcast(sContext.getConf)
3739
val bcConnectionFetcher = sContext.broadcast(connectionFetcher)
3840
val bcPropHelper = sContext.broadcast(propertyHelper)
3941
val bcJobFactory = sContext.broadcast(jobFactory)
@@ -42,8 +44,8 @@ object DiffData extends BasePartitionJob {
4244

4345
slices.foreach(slice => {
4446
if (null == originConnection) {
45-
originConnection = bcConnectionFetcher.value.getConnection(Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
46-
targetConnection = bcConnectionFetcher.value.getConnection(Side.TARGET, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
47+
originConnection = bcConnectionFetcher.value.getConnection(bcOriginConfig.value, Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
48+
targetConnection = bcConnectionFetcher.value.getConnection(bcTargetConfig.value, Side.TARGET, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
4749
trackRunFeature = targetConnection.withSessionDo(targetSession => new TrackRun(targetSession, bcKeyspaceTableValue.value))
4850
}
4951
originConnection.withSessionDo(originSession =>

src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package com.datastax.cdm.job
1717

1818
import com.datastax.cdm.data.PKFactory.Side
19-
import com.datastax.cdm.properties.{KnownProperties, PropertyHelper}
19+
import com.datastax.cdm.properties.KnownProperties
2020
import com.datastax.cdm.job.IJobSessionFactory.JobType
2121

2222
object GuardrailCheck extends BasePartitionJob {
@@ -32,13 +32,14 @@ object GuardrailCheck extends BasePartitionJob {
3232
var ma = new CDMMetricsAccumulator(jobType)
3333
sContext.register(ma, "CDMMetricsAccumulator")
3434

35+
val bcOriginConfig = sContext.broadcast(sContext.getConf)
3536
val bcConnectionFetcher = sContext.broadcast(connectionFetcher)
3637
val bcPropHelper = sContext.broadcast(propertyHelper)
3738
val bcJobFactory = sContext.broadcast(jobFactory)
3839

3940
slices.foreach(slice => {
4041
if (null == originConnection) {
41-
originConnection = bcConnectionFetcher.value.getConnection(Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), 0)
42+
originConnection = bcConnectionFetcher.value.getConnection(bcOriginConfig.value, Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), 0)
4243
}
4344
originConnection.withSessionDo(originSession =>
4445
bcJobFactory.value.getInstance(originSession, null, bcPropHelper.value)

src/main/scala/com/datastax/cdm/job/Migrate.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package com.datastax.cdm.job
1818
import com.datastax.cdm.feature.TrackRun
1919
import com.datastax.cdm.job.CDMMetricsAccumulator
2020
import com.datastax.cdm.data.PKFactory.Side
21-
import com.datastax.cdm.properties.{KnownProperties, PropertyHelper}
21+
import com.datastax.cdm.properties.KnownProperties
2222
import com.datastax.cdm.job.IJobSessionFactory.JobType
2323

2424
object Migrate extends BasePartitionJob {
@@ -34,7 +34,9 @@ object Migrate extends BasePartitionJob {
3434
jobFactory.getInstance(originSession, targetSession, propertyHelper).initCdmRun(runId, prevRunId, parts, trackRunFeature, jobType)))
3535
var ma = new CDMMetricsAccumulator(jobType)
3636
sContext.register(ma, "CDMMetricsAccumulator")
37-
37+
38+
val bcOriginConfig = sContext.broadcast(sContext.getConf)
39+
val bcTargetConfig = sContext.broadcast(sContext.getConf)
3840
val bcConnectionFetcher = sContext.broadcast(connectionFetcher)
3941
val bcPropHelper = sContext.broadcast(propertyHelper)
4042
val bcJobFactory = sContext.broadcast(jobFactory)
@@ -43,8 +45,8 @@ object Migrate extends BasePartitionJob {
4345

4446
slices.foreach(slice => {
4547
if (null == originConnection) {
46-
originConnection = bcConnectionFetcher.value.getConnection(Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
47-
targetConnection = bcConnectionFetcher.value.getConnection(Side.TARGET, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
48+
originConnection = bcConnectionFetcher.value.getConnection(bcOriginConfig.value, Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
49+
targetConnection = bcConnectionFetcher.value.getConnection(bcTargetConfig.value, Side.TARGET, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
4850
trackRunFeature = targetConnection.withSessionDo(targetSession => new TrackRun(targetSession, bcKeyspaceTableValue.value))
4951
}
5052
originConnection.withSessionDo(originSession =>

src/test/java/com/datastax/cdm/job/ConnectionFetcherTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public void setup() {
4545
commonSetupWithoutDefaultClassVariables();
4646
MockitoAnnotations.openMocks(this);
4747

48-
cf = new ConnectionFetcher(conf, propertyHelper);
48+
cf = new ConnectionFetcher(propertyHelper);
4949
}
5050

5151
@Test

0 commit comments

Comments
 (0)