diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c573508 --- /dev/null +++ b/.gitignore @@ -0,0 +1,39 @@ +### Intellij ### +*.iml +*.ipr +*.iws +out/ +.idea/ +.idea_modules/ + +### Java ### +# Compiled class file +*.class + +# Log file +*.log + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +### Maven ### +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties +.mvn/wrapper/maven-wrapper.jar +.flattened-pom.xml diff --git a/common/src/main/java/org/apache/cassandra/diff/ExponentialRetryStrategyProvider.java b/common/src/main/java/org/apache/cassandra/diff/ExponentialRetryStrategyProvider.java index 639d14f..c70f3c3 100644 --- a/common/src/main/java/org/apache/cassandra/diff/ExponentialRetryStrategyProvider.java +++ b/common/src/main/java/org/apache/cassandra/diff/ExponentialRetryStrategyProvider.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.cassandra.diff; import java.util.concurrent.TimeUnit; diff --git a/common/src/main/java/org/apache/cassandra/diff/KeyspaceTablePair.java b/common/src/main/java/org/apache/cassandra/diff/KeyspaceTablePair.java index e705d3c..aadd1c9 100644 --- a/common/src/main/java/org/apache/cassandra/diff/KeyspaceTablePair.java +++ b/common/src/main/java/org/apache/cassandra/diff/KeyspaceTablePair.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.cassandra.diff; import java.io.Serializable; diff --git a/common/src/main/java/org/apache/cassandra/diff/RetryStrategy.java b/common/src/main/java/org/apache/cassandra/diff/RetryStrategy.java index b0cd7c6..3ccd73e 100644 --- a/common/src/main/java/org/apache/cassandra/diff/RetryStrategy.java +++ b/common/src/main/java/org/apache/cassandra/diff/RetryStrategy.java @@ -1,6 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.cassandra.diff; import java.util.concurrent.Callable; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,12 +36,28 @@ public abstract class RetryStrategy { protected abstract boolean shouldRetry(); public final T retry(Callable retryable) throws Exception { + return retryIfNot(retryable); + } + + /** + * Retry a retryable. + * Rethrow the exception from retryable if no more retry is permitted or the thrown exception is in the exclude list. + */ + @SafeVarargs + public final T retryIfNot(Callable retryable, Class... excludedExceptions) throws Exception { + Function containsException = ex -> { + for (Class xClass : excludedExceptions) { + if (xClass.isInstance(ex)) + return true; + } + return false; + }; while (true) { try { return retryable.call(); } catch (Exception exception) { - if (!shouldRetry()) { + if (containsException.apply(exception) || !shouldRetry()) { throw exception; } logger.warn("Retry with " + toString()); diff --git a/common/src/main/java/org/apache/cassandra/diff/RetryStrategyProvider.java b/common/src/main/java/org/apache/cassandra/diff/RetryStrategyProvider.java index ec29bb7..cff347f 100644 --- a/common/src/main/java/org/apache/cassandra/diff/RetryStrategyProvider.java +++ b/common/src/main/java/org/apache/cassandra/diff/RetryStrategyProvider.java @@ -1,5 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.cassandra.diff; +import java.io.Serializable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -7,7 +28,7 @@ * Provides new RetryStrategy instances. * Use abstract class instead of interface in order to retain the referece to retryOptions; */ -public abstract class RetryStrategyProvider { +public abstract class RetryStrategyProvider implements Serializable { protected final JobConfiguration.RetryOptions retryOptions; public RetryStrategyProvider(JobConfiguration.RetryOptions retryOptions) { diff --git a/common/src/test/java/org/apache/cassandra/diff/ExponentialRetryStrategyTest.java b/common/src/test/java/org/apache/cassandra/diff/ExponentialRetryStrategyTest.java index 26940d7..06d9088 100644 --- a/common/src/test/java/org/apache/cassandra/diff/ExponentialRetryStrategyTest.java +++ b/common/src/test/java/org/apache/cassandra/diff/ExponentialRetryStrategyTest.java @@ -91,6 +91,40 @@ public void testOverflowPrevention() { } } + @Test + public void testNotMatchAndRetryWithRetryIfNot() { + AtomicInteger execCount = new AtomicInteger(0); + ExponentialRetryStrategy strategy = new ExponentialRetryStrategy(1, 5); + // run the code and retry since the thrown exception does not match with the exclude list + try { + strategy.retryIfNot(() -> { + execCount.getAndIncrement(); + throw new IllegalStateException(); + }, IllegalArgumentException.class, UnsupportedOperationException.class); + } + catch (Exception ex) { + Assert.assertSame(IllegalStateException.class, ex.getClass()); + Assert.assertEquals(4, execCount.get()); + } + } + + @Test + public void testMatchAndRetryWithRetryIfNot() { + AtomicInteger execCount = new AtomicInteger(0); + ExponentialRetryStrategy strategy = new ExponentialRetryStrategy(1, 2); + // run the code and not retry since the thrown exception matches the exclude list + try { + strategy.retryIfNot(() -> { + execCount.getAndIncrement(); + throw new IllegalStateException(); + }, RuntimeException.class); + } + catch (Exception ex) { + Assert.assertSame(IllegalStateException.class, ex.getClass()); + Assert.assertEquals(1, execCount.get()); + } + } + private JobConfiguration.RetryOptions retryOptions(long baseDelayMs, long totalDelayMs) { return new JobConfiguration.RetryOptions() {{ put(ExponentialRetryStrategy.BASE_DELAY_MS_KEY, String.valueOf(baseDelayMs)); diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java index ff697d2..d744bff 100644 --- a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java +++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java @@ -125,12 +125,13 @@ public void run(JobConfiguration configuration, JavaSparkContext sc) { try (Cluster metadataCluster = metadataProvider.getCluster(); Session metadataSession = metadataCluster.connect()) { + RetryStrategyProvider retryStrategyProvider = RetryStrategyProvider.create(configuration.retryOptions()); MetadataKeyspaceOptions metadataOptions = configuration.metadataOptions(); - JobMetadataDb.Schema.maybeInitialize(metadataSession, metadataOptions); + JobMetadataDb.Schema.maybeInitialize(metadataSession, metadataOptions, retryStrategyProvider); // Job params, which once a job is created cannot be modified in subsequent re-runs logger.info("Creating or retrieving job parameters"); - job = new JobMetadataDb.JobLifeCycle(metadataSession, metadataOptions.keyspace); + job = new JobMetadataDb.JobLifeCycle(metadataSession, metadataOptions.keyspace, retryStrategyProvider); Params params = getJobParams(job, configuration, tablesToCompare); logger.info("Job Params: {}", params); if (null == params) @@ -174,7 +175,8 @@ public void run(JobConfiguration configuration, JavaSparkContext sc) { sourceProvider, targetProvider, metadataProvider, - new TrackerProvider(configuration.metadataOptions().keyspace)) + new TrackerProvider(configuration.metadataOptions().keyspace), + retryStrategyProvider) .run()) .reduce(Differ::accumulate); // Publish results. This also removes the job from the currently running list diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java index b2cb527..cf1c9a5 100644 --- a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java +++ b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java @@ -90,7 +90,8 @@ public Differ(JobConfiguration config, ClusterProvider sourceProvider, ClusterProvider targetProvider, ClusterProvider metadataProvider, - DiffJob.TrackerProvider trackerProvider) + DiffJob.TrackerProvider trackerProvider, + RetryStrategyProvider retryStrategyProvider) { logger.info("Creating Differ for {}", split); this.jobId = params.jobId; @@ -101,7 +102,7 @@ public Differ(JobConfiguration config, rateLimiter = RateLimiter.create(perExecutorRateLimit); this.reverseReadProbability = config.reverseReadProbability(); this.specificTokens = config.specificTokens(); - this.retryStrategyProvider = RetryStrategyProvider.create(config.retryOptions()); + this.retryStrategyProvider = retryStrategyProvider; synchronized (Differ.class) { /* diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java index a7247dd..bef173a 100644 --- a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java +++ b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.function.Consumer; import java.util.stream.Collectors; import com.google.common.base.Throwables; @@ -37,6 +38,10 @@ import com.datastax.driver.core.Session; import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.Statement; +import com.datastax.driver.core.exceptions.DriverException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.QueryExecutionException; +import com.datastax.driver.core.exceptions.QueryValidationException; import com.datastax.driver.core.utils.UUIDs; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -44,6 +49,26 @@ public class JobMetadataDb { private static final Logger logger = LoggerFactory.getLogger(JobMetadataDb.class); + @FunctionalInterface + private interface QueryExecution { + ResultSet execute() throws DriverException; + } + + /** + * Ignore QueryConsistencyException (e.g. Read/Write timeout/failure) from the queryExecution. + * @param queryExecution + * @return resultSet. In the case of QueryConsistencyException, null is returned. + */ + private static ResultSet ignoreQueryException(QueryExecution queryExecution) { + try { + return queryExecution.execute(); + } + catch (QueryExecutionException queryException) { + logger.warn("Ignoring a failed query.", queryException); + return null; + } + } + static class ProgressTracker { private final UUID jobId; @@ -51,7 +76,7 @@ static class ProgressTracker { private final String startToken; private final String endToken; private final String metadataKeyspace; - private Session session; + private final Session session; private static PreparedStatement updateStmt; private static PreparedStatement mismatchStmt; @@ -155,24 +180,25 @@ public static void resetStatements() * @return */ public DiffJob.TaskStatus getLastStatus(KeyspaceTablePair keyspaceTablePair) { - ResultSet rs = session.execute(String.format("SELECT last_token, " + - " matched_partitions, " + - " mismatched_partitions, " + - " partitions_only_in_source, " + - " partitions_only_in_target, " + - " matched_rows," + - " matched_values," + - " mismatched_values," + - " skipped_partitions " + - " FROM %s.%s " + - " WHERE job_id = ? " + - " AND bucket = ? " + - " AND qualified_table_name = ? " + - " AND start_token = ? " + - " AND end_token = ?", - metadataKeyspace, Schema.TASK_STATUS), - jobId, bucket, keyspaceTablePair.toCqlValueString(), startToken, endToken); - Row row = rs.one(); + ResultSet rs = ignoreQueryException( + () -> session.execute(String.format("SELECT last_token, " + + " matched_partitions, " + + " mismatched_partitions, " + + " partitions_only_in_source, " + + " partitions_only_in_target, " + + " matched_rows," + + " matched_values," + + " mismatched_values," + + " skipped_partitions " + + " FROM %s.%s " + + " WHERE job_id = ? " + + " AND bucket = ? " + + " AND qualified_table_name = ? " + + " AND start_token = ? " + + " AND end_token = ?", + metadataKeyspace, Schema.TASK_STATUS), + jobId, bucket, keyspaceTablePair.toCqlValueString(), startToken, endToken)); + Row row = (rs == null) ? null : rs.one(); if (null == row) return DiffJob.TaskStatus.EMPTY; @@ -197,7 +223,7 @@ public DiffJob.TaskStatus getLastStatus(KeyspaceTablePair keyspaceTablePair) { * @param latestToken */ public void updateStatus(KeyspaceTablePair table, RangeStats diffStats, BigInteger latestToken) { - session.execute(bindUpdateStatement(table, diffStats, latestToken)); + ignoreQueryException(() -> session.execute(bindUpdateStatement(table, diffStats, latestToken))); } public void recordMismatch(KeyspaceTablePair table, MismatchType type, BigInteger token) { @@ -206,7 +232,7 @@ public void recordMismatch(KeyspaceTablePair table, MismatchType type, BigIntege ? " different in source and target clusters" : type == MismatchType.ONLY_IN_SOURCE ? "only present in source cluster" : "only present in target cluster"); - session.execute(bindMismatchesStatement(table, token, type.name())); + ignoreQueryException(() -> session.execute(bindMismatchesStatement(table, token, type.name()))); } /** @@ -230,7 +256,7 @@ public void recordError(KeyspaceTablePair table, BigInteger token, Throwable err } batch.add(bindErrorDetailStatement(table, token, exceptionSource)); batch.setIdempotent(true); - session.execute(batch); + ignoreQueryException(() -> session.execute(batch)); } /** @@ -241,10 +267,10 @@ public void recordError(KeyspaceTablePair table, BigInteger token, Throwable err public void finishTable(KeyspaceTablePair table, RangeStats stats, boolean updateCompletedCount) { logger.info("Finishing range [{}, {}] for table {}", startToken, endToken, table); // first flush out the last status. - session.execute(bindUpdateStatement(table, stats, endToken)); + ignoreQueryException(() -> session.execute(bindUpdateStatement(table, stats, endToken))); // then update the count of completed tasks if (updateCompletedCount) - session.execute(updateCompleteStmt.bind(jobId, bucket, table.toCqlValueString())); + ignoreQueryException(() -> session.execute(updateCompleteStmt.bind(jobId, bucket, table.toCqlValueString()))); } private Statement bindMismatchesStatement(KeyspaceTablePair table, BigInteger token, String type) { @@ -296,21 +322,24 @@ private static long getOrDefaultLong(Row row, String column) { static class JobLifeCycle { final Session session; final String metadataKeyspace; + final RetryStrategyProvider retryStrategyProvider; - public JobLifeCycle(Session session, String metadataKeyspace) { + public JobLifeCycle(Session session, String metadataKeyspace, RetryStrategyProvider retryStrategyProvider) { this.session = session; this.metadataKeyspace = metadataKeyspace; + this.retryStrategyProvider = retryStrategyProvider; } public DiffJob.Params getJobParams(UUID jobId) { - ResultSet rs = session.execute(String.format("SELECT qualified_table_names," + - " buckets," + - " total_tasks " + - "FROM %s.%s " + - "WHERE job_id = ?", - metadataKeyspace, Schema.JOB_SUMMARY), - jobId); - Row row = rs.one(); + ResultSet rs = ignoreQueryException( + () -> session.execute(String.format("SELECT qualified_table_names," + + " buckets," + + " total_tasks " + + "FROM %s.%s " + + "WHERE job_id = ?", + metadataKeyspace, Schema.JOB_SUMMARY), + jobId)); + Row row = (rs == null) ? null : rs.one(); if (null == row) return null; @@ -318,7 +347,7 @@ public DiffJob.Params getJobParams(UUID jobId) { List keyspaceTables = row.getList("qualified_table_names", String.class) .stream() .map(KeyspaceTablePair::new) - .collect(Collectors.toList());; + .collect(Collectors.toList()); return new DiffJob.Params(jobId, keyspaceTables, row.getInt("buckets"), @@ -331,7 +360,7 @@ public void initializeJob(DiffJob.Params params, String sourceClusterName, String sourceClusterDesc, String targetClusterName, - String targetClusterDesc) { + String targetClusterDesc) throws Exception { logger.info("Initializing job status"); // The job was previously run, so this could be a re-run to @@ -349,28 +378,33 @@ public void initializeJob(DiffJob.Params params, UUID timeUUID = UUIDs.timeBased(); DateTime startDateTime = new DateTime(UUIDs.unixTimestamp(timeUUID), DateTimeZone.UTC); - rs = session.execute(String.format("INSERT INTO %s.%s (" + - " job_id," + - " job_start_time," + - " buckets," + - " qualified_table_names," + - " source_cluster_name," + - " source_cluster_desc," + - " target_cluster_name," + - " target_cluster_desc," + - " total_tasks)" + - " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" + - " IF NOT EXISTS", - metadataKeyspace, Schema.JOB_SUMMARY), - params.jobId, - timeUUID, - params.buckets, - params.keyspaceTables.stream().map(KeyspaceTablePair::toCqlValueString).collect(Collectors.toList()), - sourceClusterName, - sourceClusterDesc, - targetClusterName, - targetClusterDesc, - params.tasks); + Statement initJobStatusStatement = + new SimpleStatement(String.format("INSERT INTO %s.%s (" + + " job_id," + + " job_start_time," + + " buckets," + + " qualified_table_names," + + " source_cluster_name," + + " source_cluster_desc," + + " target_cluster_name," + + " target_cluster_desc," + + " total_tasks)" + + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" + + " IF NOT EXISTS", + metadataKeyspace, Schema.JOB_SUMMARY), + params.jobId, + timeUUID, + params.buckets, + params.keyspaceTables.stream().map(KeyspaceTablePair::toCqlValueString).collect(Collectors.toList()), + sourceClusterName, + sourceClusterDesc, + targetClusterName, + targetClusterDesc, + params.tasks); + initJobStatusStatement.setIdempotent(true); + rs = retryStrategyProvider.get().retryIfNot(() -> session.execute(initJobStatusStatement), + NoHostAvailableException.class, + QueryValidationException.class); // This is a brand new job, index its details including start time if (rs.one().getBool("[applied]")) { @@ -388,47 +422,52 @@ public void initializeJob(DiffJob.Params params, "VALUES ('%s', ?, ?, ?)", metadataKeyspace, Schema.JOB_START_INDEX, startDateTime.toString("yyyy-MM-dd")), startDateTime.getHourOfDay(), timeUUID, params.jobId)); - session.execute(batch); + batch.setIdempotent(true); + retryStrategyProvider.get().retryIfNot(() -> session.execute(batch), + NoHostAvailableException.class, + QueryValidationException.class); } } - public void finalizeJob(UUID jobId, Map results) { + public void finalizeJob(UUID jobId, Map results) throws Exception { logger.info("Finalizing job status"); markNotRunning(jobId); - BatchStatement batch = new BatchStatement(); for (Map.Entry result : results.entrySet()) { KeyspaceTablePair table = result.getKey(); RangeStats stats = result.getValue(); - session.execute(String.format("INSERT INTO %s.%s (" + - " job_id," + - " qualified_table_name," + - " matched_partitions," + - " mismatched_partitions," + - " partitions_only_in_source," + - " partitions_only_in_target," + - " matched_rows," + - " matched_values," + - " mismatched_values," + - " skipped_partitions) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - metadataKeyspace, Schema.JOB_RESULTS), - jobId, - table.toCqlValueString(), - stats.getMatchedPartitions(), - stats.getMismatchedPartitions(), - stats.getOnlyInSource(), - stats.getOnlyInTarget(), - stats.getMatchedRows(), - stats.getMatchedValues(), - stats.getMismatchedValues(), - stats.getSkippedPartitions()); + Statement jobResultUpdateStatement = + new SimpleStatement(String.format("INSERT INTO %s.%s (" + + " job_id," + + " qualified_table_name," + + " matched_partitions," + + " mismatched_partitions," + + " partitions_only_in_source," + + " partitions_only_in_target," + + " matched_rows," + + " matched_values," + + " mismatched_values," + + " skipped_partitions) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + metadataKeyspace, Schema.JOB_RESULTS), + jobId, + table.toCqlValueString(), + stats.getMatchedPartitions(), + stats.getMismatchedPartitions(), + stats.getOnlyInSource(), + stats.getOnlyInTarget(), + stats.getMatchedRows(), + stats.getMatchedValues(), + stats.getMismatchedValues(), + stats.getSkippedPartitions()); + jobResultUpdateStatement.setIdempotent(true); + // also retry with NoHostAvailableException + retryStrategyProvider.get().retryIfNot(() -> session.execute(jobResultUpdateStatement), + QueryValidationException.class); } - session.execute(batch); } - public void markNotRunning(UUID jobId) { try { @@ -580,24 +619,35 @@ static class Schema { private static final String KEYSPACE_SCHEMA = "CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = %s"; - public static void maybeInitialize(Session session, MetadataKeyspaceOptions options) { + public static void maybeInitialize(Session session, MetadataKeyspaceOptions options, RetryStrategyProvider retryStrategyProvider) { if (!options.should_init) return; + Consumer retryQuery = query -> { + try { + retryStrategyProvider.get().retryIfNot(() -> session.execute(query), + NoHostAvailableException.class, + QueryValidationException.class); + } + catch (Exception exception) { + Throwables.propagate(exception); + } + }; + logger.info("Initializing cassandradiff journal schema in \"{}\" keyspace", options.keyspace); - session.execute(String.format(KEYSPACE_SCHEMA, options.keyspace, options.replication)); - session.execute(String.format(JOB_SUMMARY_SCHEMA, options.keyspace, JOB_SUMMARY, options.ttl)); - session.execute(String.format(JOB_STATUS_SCHEMA, options.keyspace, JOB_STATUS)); - session.execute(String.format(JOB_RESULTS_SCHEMA, options.keyspace, JOB_RESULTS, options.ttl)); - session.execute(String.format(TASK_STATUS_SCHEMA, options.keyspace, TASK_STATUS, options.ttl)); - session.execute(String.format(MISMATCHES_SCHEMA, options.keyspace, MISMATCHES, options.ttl)); - session.execute(String.format(ERROR_SUMMARY_SCHEMA, options.keyspace, ERROR_SUMMARY, options.ttl)); - session.execute(String.format(ERROR_DETAIL_SCHEMA, options.keyspace, ERROR_DETAIL, options.ttl)); - session.execute(String.format(SOURCE_CLUSTER_INDEX_SCHEMA, options.keyspace, SOURCE_CLUSTER_INDEX, options.ttl)); - session.execute(String.format(TARGET_CLUSTER_INDEX_SCHEMA, options.keyspace, TARGET_CLUSTER_INDEX, options.ttl)); - session.execute(String.format(KEYSPACE_INDEX_SCHEMA, options.keyspace, KEYSPACE_INDEX, options.ttl)); - session.execute(String.format(JOB_START_INDEX_SCHEMA, options.keyspace, JOB_START_INDEX, options.ttl)); - session.execute(String.format(RUNNING_JOBS_SCHEMA, options.keyspace, RUNNING_JOBS, options.ttl)); + retryQuery.accept(String.format(KEYSPACE_SCHEMA, options.keyspace, options.replication)); + retryQuery.accept(String.format(JOB_SUMMARY_SCHEMA, options.keyspace, JOB_SUMMARY, options.ttl)); + retryQuery.accept(String.format(JOB_STATUS_SCHEMA, options.keyspace, JOB_STATUS)); + retryQuery.accept(String.format(JOB_RESULTS_SCHEMA, options.keyspace, JOB_RESULTS, options.ttl)); + retryQuery.accept(String.format(TASK_STATUS_SCHEMA, options.keyspace, TASK_STATUS, options.ttl)); + retryQuery.accept(String.format(MISMATCHES_SCHEMA, options.keyspace, MISMATCHES, options.ttl)); + retryQuery.accept(String.format(ERROR_SUMMARY_SCHEMA, options.keyspace, ERROR_SUMMARY, options.ttl)); + retryQuery.accept(String.format(ERROR_DETAIL_SCHEMA, options.keyspace, ERROR_DETAIL, options.ttl)); + retryQuery.accept(String.format(SOURCE_CLUSTER_INDEX_SCHEMA, options.keyspace, SOURCE_CLUSTER_INDEX, options.ttl)); + retryQuery.accept(String.format(TARGET_CLUSTER_INDEX_SCHEMA, options.keyspace, TARGET_CLUSTER_INDEX, options.ttl)); + retryQuery.accept(String.format(KEYSPACE_INDEX_SCHEMA, options.keyspace, KEYSPACE_INDEX, options.ttl)); + retryQuery.accept(String.format(JOB_START_INDEX_SCHEMA, options.keyspace, JOB_START_INDEX, options.ttl)); + retryQuery.accept(String.format(RUNNING_JOBS_SCHEMA, options.keyspace, RUNNING_JOBS, options.ttl)); logger.info("Schema initialized"); } }