Skip to content

Commit 902cb4e

Browse files
author
zhangyongxiang.alpha
committed
Refactor
1 parent b5e817f commit 902cb4e

File tree

7 files changed

+128
-119
lines changed

7 files changed

+128
-119
lines changed

amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java

Lines changed: 45 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -22,91 +22,81 @@
2222
import org.apache.amoro.IcebergActions;
2323
import org.apache.amoro.TableFormat;
2424
import org.apache.amoro.TableRuntime;
25-
import org.apache.amoro.config.ConfigHelpers;
25+
import org.apache.amoro.config.ConfigOption;
26+
import org.apache.amoro.config.ConfigOptions;
27+
import org.apache.amoro.config.Configurations;
28+
import org.apache.amoro.process.ExecuteEngine;
29+
import org.apache.amoro.process.LocalExecutionEngine;
2630
import org.apache.amoro.process.ProcessFactory;
2731
import org.apache.amoro.process.ProcessTriggerStrategy;
2832
import org.apache.amoro.process.RecoverProcessFailedException;
2933
import org.apache.amoro.process.TableProcess;
3034
import org.apache.amoro.process.TableProcessStore;
31-
import org.apache.amoro.server.process.DefaultTableProcessStore;
32-
import org.apache.amoro.server.process.TableProcessMeta;
33-
import org.apache.amoro.server.process.executor.LocalExecutionEngine;
34-
import org.apache.amoro.server.utils.SnowflakeIdGenerator;
35+
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
36+
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
37+
import org.apache.commons.lang3.tuple.Pair;
3538

3639
import java.time.Duration;
37-
import java.util.Collections;
38-
import java.util.HashMap;
39-
import java.util.HashSet;
40+
import java.util.Collection;
41+
import java.util.List;
4042
import java.util.Map;
4143
import java.util.Optional;
4244
import java.util.Set;
45+
import java.util.stream.Collectors;
4346

4447
/** Default process factory for Iceberg-related maintenance actions in AMS. */
4548
public class IcebergProcessFactory implements ProcessFactory {
4649

4750
public static final String PLUGIN_NAME = "iceberg";
51+
public static final ConfigOption<Boolean> SNAPSHOT_EXPIRE_ENABLED =
52+
ConfigOptions.key("expire-snapshots.enabled").booleanType().defaultValue(true);
4853

49-
private final SnowflakeIdGenerator idGenerator = new SnowflakeIdGenerator();
54+
public static final ConfigOption<Duration> SNAPSHOT_EXPIRE_INTERVAL =
55+
ConfigOptions.key("expire-snapshot.interval")
56+
.durationType()
57+
.defaultValue(Duration.ofHours(1));
5058

51-
private boolean expireSnapshotsEnabled = true;
52-
private int expireSnapshotsThreadCount = 10;
53-
private Duration expireSnapshotsInterval = Duration.ofHours(1);
59+
private ExecuteEngine localEngine;
60+
private final Map<Action, ProcessTriggerStrategy> actions = Maps.newHashMap();
61+
private final List<TableFormat> formats =
62+
Lists.newArrayList(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE);
63+
64+
@Override
65+
public void availableExecuteEngines(Collection<ExecuteEngine> allAvailableEngines) {
66+
for (ExecuteEngine engine : allAvailableEngines) {
67+
if (engine instanceof LocalExecutionEngine) {
68+
this.localEngine = engine;
69+
}
70+
}
71+
}
5472

5573
@Override
5674
public Map<TableFormat, Set<Action>> supportedActions() {
57-
Set<Action> actions = new HashSet<>();
58-
actions.add(IcebergActions.EXPIRE_SNAPSHOTS);
59-
60-
Map<TableFormat, Set<Action>> supported = new HashMap<>();
61-
supported.put(TableFormat.ICEBERG, actions);
62-
supported.put(TableFormat.MIXED_ICEBERG, actions);
63-
supported.put(TableFormat.MIXED_HIVE, actions);
64-
return supported;
75+
return formats.stream()
76+
.map(f -> Pair.of(f, actions.keySet()))
77+
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
6578
}
6679

6780
@Override
6881
public ProcessTriggerStrategy triggerStrategy(TableFormat format, Action action) {
69-
if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) {
70-
return new ProcessTriggerStrategy(
71-
expireSnapshotsInterval, false, Math.max(expireSnapshotsThreadCount, 1));
72-
}
73-
74-
return ProcessTriggerStrategy.METADATA_TRIGGER;
82+
return actions.getOrDefault(action, ProcessTriggerStrategy.METADATA_TRIGGER);
7583
}
7684

7785
@Override
7886
public Optional<TableProcess> trigger(TableRuntime tableRuntime, Action action) {
79-
if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)
80-
&& (!expireSnapshotsEnabled
81-
|| !tableRuntime.getTableConfiguration().isExpireSnapshotEnabled())) {
87+
if (!actions.containsKey(action)) {
8288
return Optional.empty();
8389
}
8490

85-
long processId = idGenerator.generateId();
86-
TableProcessMeta meta =
87-
TableProcessMeta.of(
88-
processId,
89-
tableRuntime.getTableIdentifier().getId(),
90-
action.getName(),
91-
LocalExecutionEngine.ENGINE_NAME,
92-
Collections.emptyMap());
93-
94-
TableProcessStore store = new DefaultTableProcessStore(tableRuntime, meta, action);
95-
9691
if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) {
97-
return Optional.of(new SnapshotsExpiringProcess(tableRuntime, store));
92+
return triggerExpireSnapshot(tableRuntime);
9893
}
99-
10094
return Optional.empty();
10195
}
10296

10397
@Override
10498
public TableProcess recover(TableRuntime tableRuntime, TableProcessStore store)
10599
throws RecoverProcessFailedException {
106-
if (IcebergActions.EXPIRE_SNAPSHOTS.equals(store.getAction())) {
107-
return new SnapshotsExpiringProcess(tableRuntime, store);
108-
}
109-
110100
throw new RecoverProcessFailedException(
111101
"Unsupported action for IcebergProcessFactory: " + store.getAction());
112102
}
@@ -116,42 +106,20 @@ public void open(Map<String, String> properties) {
116106
if (properties == null || properties.isEmpty()) {
117107
return;
118108
}
119-
120-
expireSnapshotsEnabled =
121-
parseBoolean(properties.get("expire-snapshots.enabled"), expireSnapshotsEnabled);
122-
expireSnapshotsThreadCount =
123-
parseInt(properties.get("expire-snapshots.thread-count"), expireSnapshotsThreadCount);
124-
expireSnapshotsInterval =
125-
parseDuration(properties.get("expire-snapshots.interval"), expireSnapshotsInterval);
126-
}
127-
128-
private boolean parseBoolean(String value, boolean defaultValue) {
129-
if (value == null) {
130-
return defaultValue;
109+
Configurations configs = Configurations.fromMap(properties);
110+
if (configs.getBoolean(SNAPSHOT_EXPIRE_ENABLED)) {
111+
Duration interval = configs.getDuration(SNAPSHOT_EXPIRE_INTERVAL);
112+
this.actions.put(
113+
IcebergActions.EXPIRE_SNAPSHOTS, ProcessTriggerStrategy.triggerAtFixRate(interval));
131114
}
132-
return Boolean.parseBoolean(value.trim());
133115
}
134116

135-
private int parseInt(String value, int defaultValue) {
136-
if (value == null) {
137-
return defaultValue;
138-
}
139-
try {
140-
return Integer.parseInt(value.trim());
141-
} catch (NumberFormatException e) {
142-
return defaultValue;
117+
private Optional<TableProcess> triggerExpireSnapshot(TableRuntime tableRuntime) {
118+
if (localEngine == null) {
119+
return Optional.empty();
143120
}
144-
}
145121

146-
private Duration parseDuration(String value, Duration defaultValue) {
147-
if (value == null) {
148-
return defaultValue;
149-
}
150-
try {
151-
return ConfigHelpers.TimeUtils.parseDuration(value);
152-
} catch (Exception e) {
153-
return defaultValue;
154-
}
122+
return Optional.of(new SnapshotsExpiringProcess(tableRuntime, localEngine));
155123
}
156124

157125
@Override

amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,31 +18,35 @@
1818

1919
package org.apache.amoro.server.process.iceberg;
2020

21+
import org.apache.amoro.Action;
2122
import org.apache.amoro.AmoroTable;
23+
import org.apache.amoro.IcebergActions;
2224
import org.apache.amoro.TableRuntime;
2325
import org.apache.amoro.maintainer.TableMaintainer;
26+
import org.apache.amoro.process.ExecuteEngine;
2427
import org.apache.amoro.process.LocalProcess;
2528
import org.apache.amoro.process.TableProcess;
26-
import org.apache.amoro.process.TableProcessStore;
2729
import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
28-
import org.apache.amoro.server.process.executor.LocalExecutionEngine;
2930
import org.apache.amoro.server.table.DefaultTableRuntime;
3031
import org.apache.amoro.server.table.cleanup.CleanupOperation;
32+
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
3133
import org.slf4j.Logger;
3234
import org.slf4j.LoggerFactory;
3335

36+
import java.util.Map;
37+
3438
/** Local table process for expiring Iceberg snapshots. */
3539
public class SnapshotsExpiringProcess extends TableProcess implements LocalProcess {
3640

3741
private static final Logger LOG = LoggerFactory.getLogger(SnapshotsExpiringProcess.class);
3842

39-
public SnapshotsExpiringProcess(TableRuntime tableRuntime, TableProcessStore store) {
40-
super(tableRuntime, store);
43+
public SnapshotsExpiringProcess(TableRuntime tableRuntime, ExecuteEngine engine) {
44+
super(tableRuntime, engine);
4145
}
4246

4347
@Override
4448
public String tag() {
45-
return LocalExecutionEngine.SNAPSHOTS_EXPIRING_POOL;
49+
return getAction().getName().toLowerCase();
4650
}
4751

4852
@Override
@@ -62,5 +66,17 @@ public void run() {
6266
}
6367

6468
@Override
65-
protected void closeInternal() {}
69+
public Action getAction() {
70+
return IcebergActions.EXPIRE_SNAPSHOTS;
71+
}
72+
73+
@Override
74+
public Map<String, String> getProcessParameters() {
75+
return Maps.newHashMap();
76+
}
77+
78+
@Override
79+
public Map<String, String> getSummary() {
80+
return Maps.newHashMap();
81+
}
6682
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
org.apache.amoro.server.process.executor.LocalExecutionEngine
1+
org.apache.amoro.process.LocalExecutionEngine

amoro-common/src/main/java/org/apache/amoro/config/Configurations.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,19 @@ public long getDurationInMillis(ConfigOption<Duration> option) {
548548
return result;
549549
}
550550

551+
public Duration getDuration(ConfigOption<Duration> option) {
552+
try {
553+
return getOptional(option).orElseGet(option::defaultValue);
554+
} catch (Exception e) { // may be throw java.lang.ArithmeticException: long overflow
555+
throw new ConfigurationException(
556+
option.key(),
557+
String.format(
558+
"Exception when converting duration for config option '%s': %s",
559+
option.key(), e.getMessage()),
560+
e);
561+
}
562+
}
563+
551564
public <T> Optional<T> getOptional(ConfigOption<T> option) {
552565
Optional<Object> rawValue = getRawValueFromOption(option);
553566
Class<?> clazz = option.getClazz();

amoro-ams/src/main/java/org/apache/amoro/server/process/executor/LocalExecutionEngine.java renamed to amoro-common/src/main/java/org/apache/amoro/process/LocalExecutionEngine.java

Lines changed: 45 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,31 +16,44 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.amoro.server.process.executor;
19+
package org.apache.amoro.process;
2020

21-
import org.apache.amoro.process.LocalProcess;
22-
import org.apache.amoro.process.ProcessStatus;
23-
import org.apache.amoro.process.TableProcess;
21+
import org.apache.amoro.config.ConfigOption;
22+
import org.apache.amoro.config.ConfigOptions;
23+
import org.apache.amoro.config.Configurations;
24+
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
2425
import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
2528

2629
import java.util.Map;
30+
import java.util.Set;
2731
import java.util.UUID;
2832
import java.util.concurrent.ConcurrentHashMap;
2933
import java.util.concurrent.Future;
3034
import java.util.concurrent.LinkedBlockingQueue;
3135
import java.util.concurrent.ThreadPoolExecutor;
3236
import java.util.concurrent.TimeUnit;
37+
import java.util.stream.Collectors;
3338

3439
/**
35-
* Local execution engine that runs {@link LocalProcess} instances in AMS thread pools.
40+
* Local execution engine that runs {@link org.apache.amoro.process.LocalProcess} instances in AMS
41+
* thread pools.
3642
*
37-
* <p>The engine maintains multiple thread pools keyed by {@link LocalProcess#tag()}.
43+
* <p>The engine maintains multiple thread pools keyed by {@link
44+
* org.apache.amoro.process.LocalProcess#tag()}.
3845
*/
3946
public class LocalExecutionEngine implements ExecuteEngine {
47+
private static final Logger LOG = LoggerFactory.getLogger(LocalExecutionEngine.class);
4048

4149
public static final String ENGINE_NAME = "local";
4250
public static final String DEFAULT_POOL = "default";
43-
public static final String SNAPSHOTS_EXPIRING_POOL = "snapshots-expiring";
51+
public static final String POOL_CONFIG_PREFIX = "pool.";
52+
public static final String POOL_SIZE_SUFFIX = ".thread-count";
53+
public static final ConfigOption<Integer> DEFAULT_POOL_SIZE =
54+
ConfigOptions.key(POOL_CONFIG_PREFIX + DEFAULT_POOL + POOL_SIZE_SUFFIX)
55+
.intType()
56+
.defaultValue(10);
4457

4558
private final Map<String, ThreadPoolExecutor> pools = new ConcurrentHashMap<>();
4659
private final Map<String, Future<?>> activeInstances = new ConcurrentHashMap<>();
@@ -95,7 +108,7 @@ public String submitTableProcess(TableProcess tableProcess) {
95108
LocalProcess localProcess = (LocalProcess) tableProcess;
96109
String identifier = UUID.randomUUID().toString();
97110

98-
ThreadPoolExecutor executor = getOrCreatePool(localProcess.tag());
111+
ThreadPoolExecutor executor = getPool(localProcess.tag());
99112
Future<?> future =
100113
executor.submit(
101114
() -> {
@@ -136,16 +149,28 @@ public ProcessStatus tryCancelTableProcess(TableProcess tableProcess, String pro
136149

137150
@Override
138151
public void open(Map<String, String> properties) {
139-
String defaultSizeValue = properties == null ? null : properties.get("default.thread-count");
140-
int defaultSize = parseInt(defaultSizeValue, 10);
152+
Configurations configs = Configurations.fromMap(properties);
153+
int defaultSize = configs.getInteger(DEFAULT_POOL_SIZE);
141154
pools.put(DEFAULT_POOL, newFixedPool(DEFAULT_POOL, defaultSize));
142155

143-
String snapshotsExpiringSizeValue =
144-
properties == null ? null : properties.get("snapshots-expiring.thread-count");
145-
int snapshotsExpiringSize = parseInt(snapshotsExpiringSizeValue, defaultSize);
146-
pools.put(
147-
SNAPSHOTS_EXPIRING_POOL,
148-
newFixedPool(SNAPSHOTS_EXPIRING_POOL, Math.max(snapshotsExpiringSize, 1)));
156+
Set<String> customPools =
157+
properties.keySet().stream()
158+
.filter(key -> key.startsWith(POOL_CONFIG_PREFIX))
159+
.map(key -> key.substring(POOL_CONFIG_PREFIX.length()))
160+
.map(key -> key.substring(0, key.indexOf(".") + 1))
161+
.collect(Collectors.toSet());
162+
163+
customPools.forEach(
164+
name -> {
165+
ConfigOption<Integer> poolSizeOpt =
166+
ConfigOptions.key(POOL_CONFIG_PREFIX + name + POOL_SIZE_SUFFIX)
167+
.intType()
168+
.defaultValue(-1);
169+
int size = configs.getInteger(poolSizeOpt);
170+
Preconditions.checkArgument(size > 0, "Pool thread-count is not configured for %s", name);
171+
pools.put(name, newFixedPool(name, size));
172+
LOG.info("Initialize local execute pool:{} with size:{}", name, size);
173+
});
149174
}
150175

151176
@Override
@@ -161,12 +186,11 @@ public String name() {
161186
return ENGINE_NAME;
162187
}
163188

164-
private ThreadPoolExecutor getOrCreatePool(String tag) {
165-
if (tag == null || tag.isEmpty()) {
166-
tag = DEFAULT_POOL;
189+
private ThreadPoolExecutor getPool(String tag) {
190+
if (pools.containsKey(tag)) {
191+
return pools.get(tag);
167192
}
168-
169-
return pools.computeIfAbsent(tag, t -> newFixedPool(t, 10));
193+
return pools.get(DEFAULT_POOL);
170194
}
171195

172196
private ThreadPoolExecutor newFixedPool(String tag, int size) {
@@ -178,15 +202,4 @@ private ThreadPoolExecutor newFixedPool(String tag, int size) {
178202
new LinkedBlockingQueue<>(),
179203
new ThreadFactoryBuilder().setDaemon(false).setNameFormat("local-" + tag + "-%d").build());
180204
}
181-
182-
private int parseInt(String value, int defaultValue) {
183-
if (value == null) {
184-
return defaultValue;
185-
}
186-
try {
187-
return Integer.parseInt(value);
188-
} catch (NumberFormatException e) {
189-
return defaultValue;
190-
}
191-
}
192205
}

0 commit comments

Comments
 (0)