Skip to content

Commit e3161b6

Browse files
committed
Improve metadata exception handling
Patch by Yifan Cai; reviewed by Marcus Eriksson, Dinesh Joshi for CASSANDRA-16211
1 parent c3ce22c commit e3161b6

File tree

9 files changed

+327
-106
lines changed

9 files changed

+327
-106
lines changed

.gitignore

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
### Intellij ###
2+
*.iml
3+
*.ipr
4+
*.iws
5+
out/
6+
.idea/
7+
.idea_modules/
8+
9+
### Java ###
10+
# Compiled class file
11+
*.class
12+
13+
# Log file
14+
*.log
15+
16+
# Package Files #
17+
*.jar
18+
*.war
19+
*.nar
20+
*.ear
21+
*.zip
22+
*.tar.gz
23+
*.rar
24+
25+
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
26+
hs_err_pid*
27+
28+
### Maven ###
29+
target/
30+
pom.xml.tag
31+
pom.xml.releaseBackup
32+
pom.xml.versionsBackup
33+
pom.xml.next
34+
release.properties
35+
dependency-reduced-pom.xml
36+
buildNumber.properties
37+
.mvn/timing.properties
38+
.mvn/wrapper/maven-wrapper.jar
39+
.flattened-pom.xml

common/src/main/java/org/apache/cassandra/diff/ExponentialRetryStrategyProvider.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
120
package org.apache.cassandra.diff;
221

322
import java.util.concurrent.TimeUnit;

common/src/main/java/org/apache/cassandra/diff/KeyspaceTablePair.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
120
package org.apache.cassandra.diff;
221

322
import java.io.Serializable;

common/src/main/java/org/apache/cassandra/diff/RetryStrategy.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
120
package org.apache.cassandra.diff;
221

322
import java.util.concurrent.Callable;
23+
import java.util.function.Function;
424

525
import org.slf4j.Logger;
626
import org.slf4j.LoggerFactory;
@@ -16,12 +36,28 @@ public abstract class RetryStrategy {
1636
protected abstract boolean shouldRetry();
1737

1838
public final <T> T retry(Callable<T> retryable) throws Exception {
39+
return retryIfNot(retryable);
40+
}
41+
42+
/**
43+
* Retry a retryable.
44+
* Rethrow the exception from retryable if no more retry is permitted or the thrown exception is in the exclude list.
45+
*/
46+
@SafeVarargs
47+
public final <T> T retryIfNot(Callable<T> retryable, Class<? extends Exception>... excludedExceptions) throws Exception {
48+
Function<Exception, Boolean> containsException = ex -> {
49+
for (Class<? extends Exception> xClass : excludedExceptions) {
50+
if (xClass.isInstance(ex))
51+
return true;
52+
}
53+
return false;
54+
};
1955
while (true) {
2056
try {
2157
return retryable.call();
2258
}
2359
catch (Exception exception) {
24-
if (!shouldRetry()) {
60+
if (containsException.apply(exception) || !shouldRetry()) {
2561
throw exception;
2662
}
2763
logger.warn("Retry with " + toString());

common/src/main/java/org/apache/cassandra/diff/RetryStrategyProvider.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
120
package org.apache.cassandra.diff;
221

22+
import java.io.Serializable;
23+
324
import org.slf4j.Logger;
425
import org.slf4j.LoggerFactory;
526

627
/**
728
* Provides new RetryStrategy instances.
829
* Use abstract class instead of interface in order to retain the referece to retryOptions;
930
*/
10-
public abstract class RetryStrategyProvider {
31+
public abstract class RetryStrategyProvider implements Serializable {
1132
protected final JobConfiguration.RetryOptions retryOptions;
1233

1334
public RetryStrategyProvider(JobConfiguration.RetryOptions retryOptions) {

common/src/test/java/org/apache/cassandra/diff/ExponentialRetryStrategyTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,40 @@ public void testOverflowPrevention() {
9191
}
9292
}
9393

94+
@Test
95+
public void testNotMatchAndRetryWithRetryIfNot() {
96+
AtomicInteger execCount = new AtomicInteger(0);
97+
ExponentialRetryStrategy strategy = new ExponentialRetryStrategy(1, 5);
98+
// run the code and retry since the thrown exception does not match with the exclude list
99+
try {
100+
strategy.retryIfNot(() -> {
101+
execCount.getAndIncrement();
102+
throw new IllegalStateException();
103+
}, IllegalArgumentException.class, UnsupportedOperationException.class);
104+
}
105+
catch (Exception ex) {
106+
Assert.assertSame(IllegalStateException.class, ex.getClass());
107+
Assert.assertEquals(4, execCount.get());
108+
}
109+
}
110+
111+
@Test
112+
public void testMatchAndRetryWithRetryIfNot() {
113+
AtomicInteger execCount = new AtomicInteger(0);
114+
ExponentialRetryStrategy strategy = new ExponentialRetryStrategy(1, 2);
115+
// run the code and not retry since the thrown exception matches the exclude list
116+
try {
117+
strategy.retryIfNot(() -> {
118+
execCount.getAndIncrement();
119+
throw new IllegalStateException();
120+
}, RuntimeException.class);
121+
}
122+
catch (Exception ex) {
123+
Assert.assertSame(IllegalStateException.class, ex.getClass());
124+
Assert.assertEquals(1, execCount.get());
125+
}
126+
}
127+
94128
private JobConfiguration.RetryOptions retryOptions(long baseDelayMs, long totalDelayMs) {
95129
return new JobConfiguration.RetryOptions() {{
96130
put(ExponentialRetryStrategy.BASE_DELAY_MS_KEY, String.valueOf(baseDelayMs));

spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,12 +125,13 @@ public void run(JobConfiguration configuration, JavaSparkContext sc) {
125125
try (Cluster metadataCluster = metadataProvider.getCluster();
126126
Session metadataSession = metadataCluster.connect()) {
127127

128+
RetryStrategyProvider retryStrategyProvider = RetryStrategyProvider.create(configuration.retryOptions());
128129
MetadataKeyspaceOptions metadataOptions = configuration.metadataOptions();
129-
JobMetadataDb.Schema.maybeInitialize(metadataSession, metadataOptions);
130+
JobMetadataDb.Schema.maybeInitialize(metadataSession, metadataOptions, retryStrategyProvider);
130131

131132
// Job params, which once a job is created cannot be modified in subsequent re-runs
132133
logger.info("Creating or retrieving job parameters");
133-
job = new JobMetadataDb.JobLifeCycle(metadataSession, metadataOptions.keyspace);
134+
job = new JobMetadataDb.JobLifeCycle(metadataSession, metadataOptions.keyspace, retryStrategyProvider);
134135
Params params = getJobParams(job, configuration, tablesToCompare);
135136
logger.info("Job Params: {}", params);
136137
if (null == params)
@@ -174,7 +175,8 @@ public void run(JobConfiguration configuration, JavaSparkContext sc) {
174175
sourceProvider,
175176
targetProvider,
176177
metadataProvider,
177-
new TrackerProvider(configuration.metadataOptions().keyspace))
178+
new TrackerProvider(configuration.metadataOptions().keyspace),
179+
retryStrategyProvider)
178180
.run())
179181
.reduce(Differ::accumulate);
180182
// Publish results. This also removes the job from the currently running list

spark-job/src/main/java/org/apache/cassandra/diff/Differ.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ public Differ(JobConfiguration config,
9090
ClusterProvider sourceProvider,
9191
ClusterProvider targetProvider,
9292
ClusterProvider metadataProvider,
93-
DiffJob.TrackerProvider trackerProvider)
93+
DiffJob.TrackerProvider trackerProvider,
94+
RetryStrategyProvider retryStrategyProvider)
9495
{
9596
logger.info("Creating Differ for {}", split);
9697
this.jobId = params.jobId;
@@ -101,7 +102,7 @@ public Differ(JobConfiguration config,
101102
rateLimiter = RateLimiter.create(perExecutorRateLimit);
102103
this.reverseReadProbability = config.reverseReadProbability();
103104
this.specificTokens = config.specificTokens();
104-
this.retryStrategyProvider = RetryStrategyProvider.create(config.retryOptions());
105+
this.retryStrategyProvider = retryStrategyProvider;
105106
synchronized (Differ.class)
106107
{
107108
/*

0 commit comments

Comments
 (0)