Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,9 @@ public class AmoroManagementConf {
public static final ConfigOption<Duration> OPTIMIZER_TASK_EXECUTE_TIMEOUT =
ConfigOptions.key("optimizer.task-execute-timeout")
.durationType()
.defaultValue(Duration.ofHours(1))
.withDescription("Timeout duration for task execution, default to 1 hour.");

.defaultValue(Duration.ofSeconds(Integer.MAX_VALUE))
.withDescription(
"Timeout duration for task execution, default to Integer.MAX_VALUE seconds(about 24,855 days).");
public static final ConfigOption<Integer> OPTIMIZER_MAX_PLANNING_PARALLELISM =
ConfigOptions.key("optimizer.max-planning-parallelism")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.amoro.api.AmoroTableMetastore;
import org.apache.amoro.api.OptimizingService;
import org.apache.amoro.config.ConfigHelpers;
import org.apache.amoro.config.ConfigurationException;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.config.shade.utils.ConfigShadeUtils;
import org.apache.amoro.exception.AmoroRuntimeException;
Expand Down Expand Up @@ -140,14 +141,17 @@ public static void main(String[] args) {
service.transitionToLeader();
// Used to block AMS instances that have acquired leadership
service.waitFollowerShip();
} catch (ConfigurationException e) {
LOG.error("AMS will exit...", e);
System.exit(1);
} catch (Exception e) {
LOG.error("AMS start error", e);
} finally {
service.transitionToFollower();
}
}
} catch (Throwable t) {
LOG.error("AMS encountered an unknown exception, will exist", t);
LOG.error("AMS encountered an unknown exception, will exit...", t);
System.exit(1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,17 @@ public DefaultOptimizingService(
OptimizerManager optimizerManager,
TableService tableService) {
this.optimizerTouchTimeout =
serviceConfig.get(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT).toMillis();
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT);
this.taskAckTimeout =
serviceConfig.get(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT).toMillis();
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT);
this.taskExecuteTimeout =
serviceConfig.get(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT).toMillis();
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT);
this.refreshGroupInterval =
serviceConfig.get(AmoroManagementConf.OPTIMIZING_REFRESH_GROUP_INTERVAL).toMillis();
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZING_REFRESH_GROUP_INTERVAL);
this.maxPlanningParallelism =
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_MAX_PLANNING_PARALLELISM);
this.pollingTimeout =
serviceConfig.get(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT).toMillis();
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT);
this.breakQuotaLimit =
serviceConfig.getBoolean(AmoroManagementConf.OPTIMIZING_BREAK_QUOTA_LIMIT_ENABLED);
this.tableService = tableService;
Expand Down Expand Up @@ -511,8 +511,7 @@ public void run() {
}

private void retryTask(TaskRuntime<?> task, OptimizingQueue queue) {
if (task.getStatus() == TaskRuntime.Status.ACKED
&& task.getStartTime() + taskExecuteTimeout < System.currentTimeMillis()) {
if (isTaskExecTimeout(task)) {
LOG.warn(
"Task {} has been suspended in ACK state for {} (start time: {}), put it to retry queue, optimizer {}. (Note: The task may have finished executing, but ams did not receive the COMPLETE message from the optimizer.)",
task.getTaskId(),
Expand Down Expand Up @@ -543,11 +542,16 @@ private Predicate<TaskRuntime<?>> buildSuspendingPredication(Set<String> activeT
&& task.getStatus() != TaskRuntime.Status.SUCCESS
|| task.getStatus() == TaskRuntime.Status.SCHEDULED
&& task.getStartTime() + taskAckTimeout < System.currentTimeMillis()
|| task.getStatus() == TaskRuntime.Status.ACKED
&& task.getStartTime() + taskExecuteTimeout < System.currentTimeMillis();
|| isTaskExecTimeout(task);
}
}

private boolean isTaskExecTimeout(TaskRuntime<?> task) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for extracting the common logic

return task.getStatus() == TaskRuntime.Status.ACKED
&& taskExecuteTimeout > 0
&& task.getStartTime() + taskExecuteTimeout < System.currentTimeMillis();
}

private class OptimizingConfigWatcher implements Runnable {
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;

/**
* End-to-end test cases for configuration documentation.
Expand Down Expand Up @@ -74,6 +75,35 @@ public void testAmoroManagementConfDocumentation() throws Exception {
generateConfigurationMarkdown("ams-config.md", "AMS Configuration", 100, confInfoList);
}

@Test
public void testGetDurationInMillis() throws Exception {
Properties properties = new Properties();
properties.put(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT.key(), "1h");
Configurations configuration = ConfigHelpers.createConfiguration(properties);
long durationInMillis =
configuration.getDurationInMillis(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT);
Assertions.assertEquals(3600000, durationInMillis);

// default value test
properties = new Properties();
configuration = ConfigHelpers.createConfiguration(properties);
durationInMillis =
configuration.getDurationInMillis(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT);
Assertions.assertEquals(Integer.MAX_VALUE * 1000L, durationInMillis);

properties.put(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT.key(), Long.MAX_VALUE + "m");
final Configurations conf1 = ConfigHelpers.createConfiguration(properties);
Assertions.assertThrows(
ConfigurationException.class,
() -> conf1.getDurationInMillis(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT));

properties.put(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT.key(), "-1m");
final Configurations conf2 = ConfigHelpers.createConfiguration(properties);
Assertions.assertThrows(
ConfigurationException.class,
() -> conf2.getDurationInMillis(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT));
}

/**
* Generate configuration documentation for multiple configuration classes.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.config;

/**
* Exception thrown when a configuration value causes an exception, typically when converting
* Duration to milliseconds. This exception can be caught to trigger process exit.
*/
public class ConfigurationException extends RuntimeException {

private static final long serialVersionUID = 1L;

private final String configKey;

public ConfigurationException(String configKey, String message) {
super(message);
this.configKey = configKey;
}

public ConfigurationException(String configKey, String message, Throwable cause) {
super(message, cause);
this.configKey = configKey;
}

/**
* Returns the configuration key that caused the overflow.
*
* @return the configuration key
*/
public String getConfigKey() {
return configKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.amoro.shade.guava32.com.google.common.base.Preconditions.checkNotNull;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -532,6 +533,21 @@ public <T> T get(ConfigOption<T> option) {
return getOptional(option).orElseGet(option::defaultValue);
}

public long getDurationInMillis(ConfigOption<Duration> option) {
long result;
try {
result = getOptional(option).orElseGet(option::defaultValue).toMillis();
} catch (Exception e) { // may be throw java.lang.ArithmeticException: long overflow
throw new ConfigurationException(
option.key(),
String.format(
"Exception when converting duration to millis for config option '%s': %s",
option.key(), e.getMessage()),
e);
}
return result;
}

public <T> Optional<T> getOptional(ConfigOption<T> option) {
Optional<Object> rawValue = getRawValueFromOption(option);
Class<?> clazz = option.getClazz();
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/ams-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ table td:last-child, table th:last-child { width: 40%; word-break: break-all; }
| optimizer.max-planning-parallelism | 1 | Max planning parallelism in one optimizer group. |
| optimizer.polling-timeout | 3 s | Optimizer polling task timeout. |
| optimizer.task-ack-timeout | 30 s | Timeout duration for task acknowledgment. |
| optimizer.task-execute-timeout | 1 h | Timeout duration for task execution, default to 1 hour. |
| optimizer.task-execute-timeout | 2147483647 s | Timeout duration for task execution, default to Integer.MAX_VALUE seconds(about 24,855 days). |
| overview-cache.max-size | 3360 | Max size of overview cache. |
| overview-cache.refresh-interval | 3 min | Interval for refreshing overview cache. |
| refresh-external-catalogs.interval | 3 min | Interval to refresh the external catalog. |
Expand Down