Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions spark-job/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,12 @@
<artifactId>junit</artifactId>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.5.10</version>
<scope>test</scope>
</dependency>

</dependencies>
</project>
35 changes: 26 additions & 9 deletions spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,12 @@ public void run(JobConfiguration configuration, JavaSparkContext sc) {
ClusterProvider metadataProvider = ClusterProvider.getProvider(configuration.clusterConfig("metadata"), "metadata");
JobMetadataDb.JobLifeCycle job = null;
UUID jobId = null;
try (Cluster metadataCluster = metadataProvider.getCluster();
Session metadataSession = metadataCluster.connect()) {
Cluster metadataCluster = null;
Session metadataSession = null;

try {
metadataCluster = metadataProvider.getCluster();
metadataSession = metadataCluster.connect();
RetryStrategyProvider retryStrategyProvider = RetryStrategyProvider.create(configuration.retryOptions());
MetadataKeyspaceOptions metadataOptions = configuration.metadataOptions();
JobMetadataDb.Schema.maybeInitialize(metadataSession, metadataOptions, retryStrategyProvider);
Expand Down Expand Up @@ -197,18 +200,32 @@ public void run(JobConfiguration configuration, JavaSparkContext sc) {
Differ.shutdown();
JobMetadataDb.ProgressTracker.resetStatements();
}
if (metadataCluster != null) {
metadataCluster.close();
}
if (metadataSession != null) {
metadataSession.close();
}

}
}

private static Params getJobParams(JobMetadataDb.JobLifeCycle job, JobConfiguration conf, List<KeyspaceTablePair> keyspaceTables) {
@VisibleForTesting
static Params getJobParams(JobMetadataDb.JobLifeCycle job, JobConfiguration conf, List<KeyspaceTablePair> keyspaceTables) {
if (conf.jobId().isPresent()) {
return job.getJobParams(conf.jobId().get());
} else {
return new Params(UUID.randomUUID(),
keyspaceTables,
conf.buckets(),
conf.splits());
final Params jobParams = job.getJobParams(conf.jobId().get());
if(jobParams != null) {
// When job_id is passed as a config property for the first time, we will not have metadata associated
// with job_id in metadata table. we should return jobParams from the table only when jobParams is not null
// Otherwise return new jobParams with provided job_id
return jobParams;
}
}
final UUID jobId = conf.jobId().isPresent() ? conf.jobId().get() : UUID.randomUUID();
return new Params(jobId,
keyspaceTables,
conf.buckets(),
conf.splits());
}

private static List<Split> getSplits(JobConfiguration config, TokenHelper tokenHelper) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ public void initializeJob(DiffJob.Params params,
metadataKeyspace, Schema.RUNNING_JOBS),
params.jobId);
if (!rs.one().getBool("[applied]")) {
logger.info("Aborting due to inability to mark job as running. " +
logger.info("Could not mark job as running. " +
"Did a previous run of job id {} fail non-gracefully?",
params.jobId);
throw new RuntimeException("Unable to mark job running, aborting");
Expand Down
55 changes: 55 additions & 0 deletions spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,18 @@
package org.apache.cassandra.diff;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class DiffJobTest
{
Expand All @@ -39,6 +46,37 @@ public void testSplitsRandom()
splitTestHelper(TokenHelper.forPartitioner("RandomPartitioner"));
}

@Test
public void testGetJobParamsWithJobIdProvidedShouldReturnNonNullConFigParams() {
final MockConfig mockConfig = new MockConfig();
final JobMetadataDb.JobLifeCycle mockJob = mock(JobMetadataDb.JobLifeCycle.class);
final List<KeyspaceTablePair> keyspaceTablePairs = new ArrayList<>();
final DiffJob.Params params = DiffJob.getJobParams(mockJob, mockConfig, keyspaceTablePairs);
assertNotNull(params);
}

@Test
public void testGetJobParamsDuringRetryShouldReturnPreviousParams() {
final MockConfig mockConfig = new MockConfig();
final JobMetadataDb.JobLifeCycle mockJob = mock(JobMetadataDb.JobLifeCycle.class);
final DiffJob.Params mockParams = mock(DiffJob.Params.class);
when(mockJob.getJobParams(any())).thenAnswer(invocationOnMock -> mockParams);
final List<KeyspaceTablePair> keyspaceTablePairs = new ArrayList<>();
final DiffJob.Params params = DiffJob.getJobParams(mockJob, mockConfig, keyspaceTablePairs);
assertEquals(params, mockParams);
}

@Test
public void testGetJobParamsWithNoJobId() {
final MockConfig mockConfig = mock(MockConfig.class);
when(mockConfig.jobId()).thenReturn(Optional.empty());

final JobMetadataDb.JobLifeCycle mockJob = mock(JobMetadataDb.JobLifeCycle.class);
final List<KeyspaceTablePair> keyspaceTablePairs = new ArrayList<>();
final DiffJob.Params params = DiffJob.getJobParams(mockJob, mockConfig, keyspaceTablePairs);
assertNotNull(params.jobId);
}

private void splitTestHelper(TokenHelper tokens)
{
List<DiffJob.Split> splits = DiffJob.calculateSplits(50, 1, tokens);
Expand All @@ -54,4 +92,21 @@ private void splitTestHelper(TokenHelper tokens)
for (int i = 0; i < splits.size(); i++)
assertEquals(i, splits.get(i).splitNumber);
}

private class MockConfig extends AbstractMockJobConfiguration {
@Override
public int splits() {
return 2;
}

@Override
public int buckets() {
return 2;
}

@Override
public Optional<UUID> jobId() {
return Optional.of(UUID.randomUUID());
}
}
}