Skip to content

Commit b342e9b

Browse files
author
张文领
committed
Add support for dynamic refresh interval for table metadata refreshing in the TableRuntimeRefreshExecutor.
1 parent 5e3eddb commit b342e9b

File tree

7 files changed

+510
-25
lines changed

7 files changed

+510
-25
lines changed

amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java

Lines changed: 137 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,19 +56,29 @@ protected boolean enabled(TableRuntime tableRuntime) {
5656
@Override
5757
protected long getNextExecutingTime(TableRuntime tableRuntime) {
5858
DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) tableRuntime;
59+
60+
if (defaultTableRuntime.getOptimizingConfig().getRefreshTableAdaptiveEnabled()) {
61+
long newInterval = defaultTableRuntime.getLatestRefreshInterval();
62+
if (newInterval > 0) {
63+
return newInterval;
64+
}
65+
}
66+
5967
return Math.min(
6068
defaultTableRuntime.getOptimizingConfig().getMinorLeastInterval() * 4L / 5, interval);
6169
}
6270

6371
private void tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, MixedTable table) {
6472
// only evaluate pending input when optimizing is enabled and in idle state
6573
OptimizingConfig optimizingConfig = tableRuntime.getOptimizingConfig();
66-
if (optimizingConfig.isEnabled()
67-
&& tableRuntime.getOptimizingStatus().equals(OptimizingStatus.IDLE)) {
74+
boolean optimizingEnabled = optimizingConfig.isEnabled();
75+
if (optimizingEnabled && tableRuntime.getOptimizingStatus().equals(OptimizingStatus.IDLE)) {
6876

6977
if (optimizingConfig.isMetadataBasedTriggerEnabled()
7078
&& !MetadataBasedEvaluationEvent.isEvaluatingNecessary(
7179
optimizingConfig, table, tableRuntime.getLastPlanTime())) {
80+
tableRuntime.setLatestEvaluatedNeedOptimizing(false);
81+
7282
logger.debug(
7383
"{} optimizing is not necessary due to metadata based trigger",
7484
tableRuntime.getTableIdentifier());
@@ -87,8 +97,17 @@ private void tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, MixedTa
8797
tableRuntime.setPendingInput(pendingInput);
8898
} else {
8999
tableRuntime.optimizingNotNecessary();
100+
tableRuntime.setTableSummary(evaluator.getPendingInput());
90101
}
91-
tableRuntime.setTableSummary(evaluator.getPendingInput());
102+
} else if (!optimizingEnabled) {
103+
tableRuntime.setLatestEvaluatedNeedOptimizing(false);
104+
logger.debug(
105+
"{} optimizing is not enabled, skip evaluating pending input",
106+
tableRuntime.getTableIdentifier());
107+
} else {
108+
tableRuntime.setLatestEvaluatedNeedOptimizing(true);
109+
logger.debug(
110+
"{} optimizing is processing or is in preparation", tableRuntime.getTableIdentifier());
92111
}
93112
}
94113

@@ -129,9 +148,124 @@ public void execute(TableRuntime tableRuntime) {
129148
|| (mixedTable.isUnkeyedTable()
130149
&& lastOptimizedSnapshotId != defaultTableRuntime.getCurrentSnapshotId())) {
131150
tryEvaluatingPendingInput(defaultTableRuntime, mixedTable);
151+
} else {
152+
logger.debug("{} optimizing is not necessary", defaultTableRuntime.getTableIdentifier());
153+
defaultTableRuntime.setLatestEvaluatedNeedOptimizing(false);
154+
}
155+
156+
// Update adaptive interval according to evaluating result.
157+
if (defaultTableRuntime.getOptimizingConfig().getRefreshTableAdaptiveEnabled()) {
158+
long newInterval = getAdaptiveExecutingInterval(defaultTableRuntime);
159+
defaultTableRuntime.setLatestRefreshInterval(newInterval);
132160
}
161+
133162
} catch (Throwable throwable) {
134163
logger.error("Refreshing table {} failed.", tableRuntime.getTableIdentifier(), throwable);
135164
}
136165
}
166+
167+
/**
168+
* Calculate adaptive execution interval based on table optimization status.
169+
*
170+
* <p>Uses AIMD (Additive Increase Multiplicative Decrease) algorithm inspired by TCP congestion
171+
* control:
172+
*
173+
* <ul>
174+
* <li>If table does not need to be optimized: additive increase - gradually extend interval to
175+
* reduce resource consumption
176+
* <li>If table needs optimization: multiplicative decrease - rapidly reduce interval for quick
177+
* response
178+
* </ul>
179+
*
180+
* <p>Interval is bounded by [interval_min, interval_max] and kept in memory only (resets to
181+
* interval_min on restart).
182+
*
183+
* @param tableRuntime The table runtime information containing current status and configuration
184+
* @return The next execution interval in milliseconds
185+
*/
186+
private long getAdaptiveExecutingInterval(DefaultTableRuntime tableRuntime) {
187+
final long minInterval = interval;
188+
final long maxInterval =
189+
tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveMaxInterval();
190+
long currentInterval = tableRuntime.getLatestRefreshInterval();
191+
192+
// Initialize interval on first run or after restart
193+
if (currentInterval == 0) {
194+
currentInterval = minInterval;
195+
}
196+
197+
// Determine whether table needs optimization
198+
boolean needOptimizing = tableRuntime.getLatestEvaluatedNeedOptimizing();
199+
200+
long nextInterval;
201+
if (needOptimizing) {
202+
nextInterval = decreaseInterval(currentInterval, minInterval);
203+
logger.debug(
204+
"Table {} needs optimization, decreasing interval from {}ms to {}ms",
205+
tableRuntime.getTableIdentifier(),
206+
currentInterval,
207+
nextInterval);
208+
} else {
209+
nextInterval = increaseInterval(tableRuntime, currentInterval, maxInterval);
210+
logger.debug(
211+
"Table {} does not need optimization, increasing interval from {}ms to {}ms",
212+
tableRuntime.getTableIdentifier(),
213+
currentInterval,
214+
nextInterval);
215+
}
216+
217+
return nextInterval;
218+
}
219+
220+
/**
221+
* Decrease interval when table needs optimization.
222+
*
223+
* <p>Uses multiplicative decrease (halving) inspired by TCP Fast Recovery algorithm for rapid
224+
* response to table health issues.
225+
*
226+
* @param currentInterval Current refresh interval in milliseconds
227+
* @param minInterval Minimum allowed interval in milliseconds
228+
* @return New interval after decrease.
229+
*/
230+
private long decreaseInterval(long currentInterval, long minInterval) {
231+
long newInterval = currentInterval / 2;
232+
long boundedInterval = Math.max(newInterval, minInterval);
233+
234+
if (newInterval < minInterval) {
235+
logger.debug(
236+
"Interval reached minimum boundary: attempted {}ms, capped at {}ms",
237+
newInterval,
238+
minInterval);
239+
}
240+
241+
return boundedInterval;
242+
}
243+
244+
/**
245+
* Increase interval when table does not need optimization.
246+
*
247+
* <p>Uses additive increase inspired by TCP Congestion Avoidance algorithm for gradual and stable
248+
* growth.
249+
*
250+
* @param tableRuntime The table runtime information containing configuration
251+
* @param currentInterval Current refresh interval in milliseconds
252+
* @param maxInterval Maximum allowed interval in milliseconds
253+
* @return New interval after increase.
254+
*/
255+
private long increaseInterval(
256+
DefaultTableRuntime tableRuntime, long currentInterval, long maxInterval) {
257+
long step = tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveIncreaseStep();
258+
long newInterval = currentInterval + step;
259+
long boundedInterval = Math.min(newInterval, maxInterval);
260+
261+
if (newInterval > maxInterval) {
262+
logger.debug(
263+
"Interval reached maximum boundary: currentInterval is {}ms, attempted {}ms, capped at {}ms",
264+
currentInterval,
265+
newInterval,
266+
maxInterval);
267+
}
268+
269+
return boundedInterval;
270+
}
137271
}

amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ public class DefaultTableRuntime extends AbstractTableRuntime
100100
private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics;
101101
private final TableSummaryMetrics tableSummaryMetrics;
102102
private volatile long lastPlanTime;
103+
private volatile long latestRefreshInterval = AmoroServiceConstants.INVALID_TIME;
104+
private volatile boolean latestEvaluatedNeedOptimizing = true;
103105
private volatile OptimizingProcess optimizingProcess;
104106
private final List<TaskRuntime.TaskQuota> taskQuotas = new CopyOnWriteArrayList<>();
105107

@@ -181,6 +183,22 @@ public void setLastPlanTime(long lastPlanTime) {
181183
this.lastPlanTime = lastPlanTime;
182184
}
183185

186+
public long getLatestRefreshInterval() {
187+
return latestRefreshInterval;
188+
}
189+
190+
public void setLatestRefreshInterval(long latestRefreshInterval) {
191+
this.latestRefreshInterval = latestRefreshInterval;
192+
}
193+
194+
public boolean getLatestEvaluatedNeedOptimizing() {
195+
return this.latestEvaluatedNeedOptimizing;
196+
}
197+
198+
public void setLatestEvaluatedNeedOptimizing(boolean latestEvaluatedNeedOptimizing) {
199+
this.latestEvaluatedNeedOptimizing = latestEvaluatedNeedOptimizing;
200+
}
201+
184202
public OptimizingStatus getOptimizingStatus() {
185203
return OptimizingStatus.ofCode(getStatusCode());
186204
}
@@ -288,6 +306,7 @@ public void setPendingInput(AbstractOptimizingEvaluator.PendingInput pendingInpu
288306
summary.setTotalFileCount(pendingFileCount);
289307
})
290308
.commit();
309+
this.latestEvaluatedNeedOptimizing = true;
291310
}
292311

293312
public void setTableSummary(AbstractOptimizingEvaluator.PendingInput tableSummary) {
@@ -462,6 +481,8 @@ public void optimizingNotNecessary() {
462481
})
463482
.commit();
464483
}
484+
485+
this.latestEvaluatedNeedOptimizing = false;
465486
}
466487

467488
public void beginCommitting() {

amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,22 @@ public static OptimizingConfig parseOptimizingConfig(Map<String, String> propert
341341
PropertyUtil.propertyAsLong(
342342
properties,
343343
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE,
344-
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT));
344+
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT))
345+
.setRefreshTableAdaptiveEnabled(
346+
PropertyUtil.propertyAsBoolean(
347+
properties,
348+
TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_ENABLED,
349+
TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_ENABLED_DEFAULT))
350+
.setRefreshTableAdaptiveMaxInterval(
351+
PropertyUtil.propertyAsLong(
352+
properties,
353+
TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL,
354+
TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_DEFAULT))
355+
.setRefreshTableAdaptiveIncreaseStep(
356+
PropertyUtil.propertyAsLong(
357+
properties,
358+
TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP,
359+
TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_DEFAULT));
345360
}
346361

347362
/**

0 commit comments

Comments
 (0)