Skip to content

Commit 17a3af3

Browse files
authored
Fixed empty previous run issue (perform fresh run when previous run not found or not-started for any reason). (#336)
1 parent dcfe8d0 commit 17a3af3

File tree

5 files changed

+46
-8
lines changed

5 files changed

+46
-8
lines changed

RELEASE.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
# Release Notes
2+
3+
## [5.1.4] - 2024-12-04
4+
- Bug fix: Any run started with a `previousRunId` that is not found in the `cdm_run_info` table (for whatever reason), will be executed as a fresh new run instead of doing nothing.
5+
26
## [5.1.3] - 2024-11-27
37
- Bug fix: Fixed connection issue caused when using different types of origin and target clusters (e.g. Cassandra/DSE with host/port and Astra with SCB).
48

src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,13 @@ public Collection<PartitionRange> getPendingPartitions(long prevRunId, JobType j
101101
.execute(boundSelectInfoStatement.setString("table_name", tableName).setLong("run_id", prevRunId));
102102
Row cdmRunStatus = rsInfo.one();
103103
if (cdmRunStatus == null) {
104-
return Collections.emptyList();
104+
throw new RunNotStartedException(
105+
"###################### Run NOT FOUND for Previous RunId: " + prevRunId + ", starting new run!");
105106
} else {
106107
String status = cdmRunStatus.getString("status");
107108
if (TrackRun.RUN_STATUS.NOT_STARTED.toString().equals(status)) {
108-
throw new RunNotStartedException("Run not started for run_id: " + prevRunId);
109+
throw new RunNotStartedException("###################### Run NOT STARTED for Previous RunId: "
110+
+ prevRunId + ", starting new run!");
109111
}
110112
}
111113

src/main/scala/com/datastax/cdm/job/BaseJob.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
3333

3434
abstract class BaseJob[T: ClassTag] extends App {
3535

36-
private val abstractLogger = LoggerFactory.getLogger(this.getClass.getName)
36+
protected val abstractLogger = LoggerFactory.getLogger(this.getClass.getName)
3737

3838
private var jobName: String = _
3939
var jobFactory: IJobSessionFactory[T] = _

src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ abstract class BasePartitionJob extends BaseJob[PartitionRange] {
4242
try {
4343
trackRunFeature.getPendingPartitions(prevRunId, jobType)
4444
} catch {
45-
case e: RunNotStartedException => SplitPartitions.getRandomSubPartitions(pieces, minPartition, maxPartition, coveragePercent, jobType)
45+
case e: RunNotStartedException => {
46+
abstractLogger.warn(e.getMessage)
47+
SplitPartitions.getRandomSubPartitions(pieces, minPartition, maxPartition, coveragePercent, jobType)
48+
}
4649
}
4750
} else {
4851
SplitPartitions.getRandomSubPartitions(pieces, minPartition, maxPartition, coveragePercent, jobType)

src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.mockito.Mock;
3333

3434
import com.datastax.cdm.cql.CommonMocks;
35+
import com.datastax.cdm.feature.TrackRun;
3536
import com.datastax.cdm.job.IJobSessionFactory.JobType;
3637
import com.datastax.cdm.job.PartitionRange;
3738
import com.datastax.cdm.job.RunNotStartedException;
@@ -72,15 +73,43 @@ public void setup() {
7273
}
7374

7475
@Test
75-
public void getPendingPartitions_nothingPending() throws RunNotStartedException {
76+
public void incorrectKsTable() {
77+
assertThrows(RuntimeException.class, () -> new TargetUpsertRunDetailsStatement(cqlSession, "table1"));
78+
}
79+
80+
@Test
81+
public void getPendingPartitions_noPrevRun() throws RunNotStartedException {
7682
targetUpsertRunDetailsStatement = new TargetUpsertRunDetailsStatement(cqlSession, "ks.table1");
7783
assertEquals(Collections.emptyList(), targetUpsertRunDetailsStatement.getPendingPartitions(0, JobType.MIGRATE));
78-
assertEquals(Collections.emptyList(), targetUpsertRunDetailsStatement.getPendingPartitions(1, JobType.MIGRATE));
7984
}
8085

8186
@Test
82-
public void incorrectKsTable() throws RunNotStartedException {
83-
assertThrows(RuntimeException.class, () -> new TargetUpsertRunDetailsStatement(cqlSession, "table1"));
87+
public void getPendingPartitions_noPrevRunFound() {
88+
targetUpsertRunDetailsStatement = new TargetUpsertRunDetailsStatement(cqlSession, "ks.table1");
89+
assertThrows(RunNotStartedException.class,
90+
() -> targetUpsertRunDetailsStatement.getPendingPartitions(1, JobType.MIGRATE));
91+
}
92+
93+
@Test
94+
public void getPendingPartitions_prevRunNotStarted() {
95+
when(rs.one()).thenReturn(row1);
96+
when(row1.getString("status")).thenReturn(TrackRun.RUN_STATUS.NOT_STARTED.toString());
97+
98+
targetUpsertRunDetailsStatement = new TargetUpsertRunDetailsStatement(cqlSession, "ks.table1");
99+
assertThrows(RunNotStartedException.class,
100+
() -> targetUpsertRunDetailsStatement.getPendingPartitions(123, JobType.MIGRATE));
101+
}
102+
103+
@Test
104+
public void getPendingPartitions_prevRunNoPartsPending() throws RunNotStartedException {
105+
when(rs.one()).thenReturn(row1);
106+
when(row1.getString("status")).thenReturn(TrackRun.RUN_STATUS.ENDED.toString());
107+
Iterator mockIterator = mock(Iterator.class);
108+
when(rs.iterator()).thenReturn(mockIterator);
109+
when(mockIterator.hasNext()).thenReturn(false);
110+
111+
targetUpsertRunDetailsStatement = new TargetUpsertRunDetailsStatement(cqlSession, "ks.table1");
112+
assertEquals(Collections.emptyList(), targetUpsertRunDetailsStatement.getPendingPartitions(123, JobType.MIGRATE));
84113
}
85114

86115
@Test

0 commit comments

Comments
 (0)