Skip to content

Commit 167f8f3

Browse files
author
zhangyongxiang.alpha
committed
check
1 parent 3f7310f commit 167f8f3

File tree

9 files changed

+84
-9
lines changed

9 files changed

+84
-9
lines changed

amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.amoro.process.RecoverProcessFailedException;
3333
import org.apache.amoro.process.TableProcess;
3434
import org.apache.amoro.process.TableProcessStore;
35+
import org.apache.amoro.server.table.DefaultTableRuntime;
3536
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
3637
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
3738
import org.apache.commons.lang3.tuple.Pair;
@@ -115,7 +116,14 @@ public void open(Map<String, String> properties) {
115116
}
116117

117118
private Optional<TableProcess> triggerExpireSnapshot(TableRuntime tableRuntime) {
118-
if (localEngine == null) {
119+
if (localEngine == null || !tableRuntime.getTableConfiguration().isExpireSnapshotEnabled()) {
120+
return Optional.empty();
121+
}
122+
123+
long lastExecuteTime =
124+
tableRuntime.getState(DefaultTableRuntime.CLEANUP_STATE_KEY).getLastSnapshotsExpiringTime();
125+
ProcessTriggerStrategy strategy = actions.get(IcebergActions.EXPIRE_SNAPSHOTS);
126+
if (System.currentTimeMillis() - lastExecuteTime < strategy.getTriggerInterval().toMillis()) {
119127
return Optional.empty();
120128
}
121129

amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.amoro.process.TableProcess;
2929
import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
3030
import org.apache.amoro.server.table.DefaultTableRuntime;
31-
import org.apache.amoro.server.table.cleanup.CleanupOperation;
3231
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
3332
import org.slf4j.Logger;
3433
import org.slf4j.LoggerFactory;
@@ -58,10 +57,9 @@ public void run() {
5857
} catch (Throwable t) {
5958
LOG.error("unexpected expire error of table {} ", tableRuntime.getTableIdentifier(), t);
6059
} finally {
61-
if (tableRuntime instanceof DefaultTableRuntime) {
62-
((DefaultTableRuntime) tableRuntime)
63-
.updateLastCleanTime(CleanupOperation.SNAPSHOTS_EXPIRING, System.currentTimeMillis());
64-
}
60+
tableRuntime.updateState(
61+
DefaultTableRuntime.CLEANUP_STATE_KEY,
62+
cleanUp -> cleanUp.setLastSnapshotsExpiringTime(System.currentTimeMillis()));
6563
}
6664
}
6765

amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525
import org.apache.amoro.server.persistence.PersistentBase;
2626
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
2727
import org.apache.amoro.shade.zookeeper3.org.apache.curator.shaded.com.google.common.collect.Maps;
28+
import org.apache.amoro.table.StateKey;
2829
import org.apache.amoro.table.TableRuntimeStore;
2930

3031
import java.util.List;
3132
import java.util.Map;
3233
import java.util.concurrent.locks.Lock;
3334
import java.util.concurrent.locks.ReentrantLock;
35+
import java.util.function.Function;
3436
import java.util.stream.Collectors;
3537

3638
public abstract class AbstractTableRuntime extends PersistentBase
@@ -62,6 +64,16 @@ public Map<String, String> getTableConfig() {
6264
return store().getTableConfig();
6365
}
6466

67+
@Override
68+
public <T> T getState(StateKey<T> key) {
69+
return store().getState(key);
70+
}
71+
72+
@Override
73+
public <T> void updateState(StateKey<T> key, Function<T, T> updater) {
74+
store().begin().updateState(key, updater).commit();
75+
}
76+
6577
@Override
6678
public List<TableProcessStore> getProcessStates() {
6779
return processContainerMap.values().stream()

amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public class DefaultTableRuntime extends AbstractTableRuntime {
7474
.jsonType(AbstractOptimizingEvaluator.PendingInput.class)
7575
.defaultValue(new AbstractOptimizingEvaluator.PendingInput());
7676

77-
private static final StateKey<TableRuntimeCleanupState> CLEANUP_STATE_KEY =
77+
public static final StateKey<TableRuntimeCleanupState> CLEANUP_STATE_KEY =
7878
StateKey.stateKey("cleanup_state")
7979
.jsonType(TableRuntimeCleanupState.class)
8080
.defaultValue(new TableRuntimeCleanupState());

amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ public long getLastSnapshotsExpiringTime() {
5252
return lastSnapshotsExpiringTime;
5353
}
5454

55-
public void setLastSnapshotsExpiringTime(long lastSnapshotsExpiringTime) {
55+
public TableRuntimeCleanupState setLastSnapshotsExpiringTime(long lastSnapshotsExpiringTime) {
5656
this.lastSnapshotsExpiringTime = lastSnapshotsExpiringTime;
57+
return this;
5758
}
5859
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,19 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
119
org.apache.amoro.server.process.iceberg.IcebergProcessFactory

amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.process.executor.ExecuteEngine

Lines changed: 0 additions & 1 deletion
This file was deleted.

amoro-common/src/main/java/org/apache/amoro/TableRuntime.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import org.apache.amoro.metrics.MetricRegistry;
2323
import org.apache.amoro.process.ProcessFactory;
2424
import org.apache.amoro.process.TableProcessStore;
25+
import org.apache.amoro.table.StateKey;
2526

2627
import java.util.List;
2728
import java.util.Map;
29+
import java.util.function.Function;
2830

2931
/**
3032
* TableRuntime is the key interface for the AMS framework to interact with the table. Typically, it
@@ -81,6 +83,24 @@ public interface TableRuntime {
8183
*/
8284
Map<String, String> getTableConfig();
8385

86+
/**
87+
* Get the value of table-runtime state
88+
*
89+
* @param key the state key
90+
* @return value of the state
91+
* @param <T> state value type
92+
*/
93+
<T> T getState(StateKey<T> key);
94+
95+
/**
96+
* Update the state
97+
*
98+
* @param key key of state
99+
* @param updater value updater of state
100+
* @param <T> value type of state.
101+
*/
102+
<T> void updateState(StateKey<T> key, Function<T, T> updater);
103+
84104
/**
85105
* Register the metric of the table runtime.
86106
*
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
org.apache.amoro.process.LocalExecutionEngine

0 commit comments

Comments
 (0)