Skip to content

Commit 4b656b8

Browse files
committed
[AMORO-4018] Add support for ExternalEventService
1 parent ac5cf1f commit 4b656b8

File tree

15 files changed

+481
-9
lines changed

15 files changed

+481
-9
lines changed

amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,13 @@ public class AmoroManagementConf {
192192
.defaultValue(100)
193193
.withDescription("Filters will not be used beyond that number of partitions.");
194194

195+
public static final ConfigOption<Duration> REFRESH_TABLES_MAX_INTERVAL =
196+
ConfigOptions.key("refresh-tables.max-interval")
197+
.durationType()
198+
.defaultValue(Duration.ofHours(1))
199+
.withDescription(
200+
"Maximum interval for refreshing table metadata. (Used as the fallback interval when enabling refreshes triggered by external events)");
201+
195202
public static final ConfigOption<Duration> BLOCKER_TIMEOUT =
196203
ConfigOptions.key("blocker.timeout")
197204
.durationType()

amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ private void initHttpService() {
284284
new DashboardServer(
285285
serviceConfig, catalogManager, tableManager, optimizerManager, terminalManager);
286286
RestCatalogService restCatalogService = new RestCatalogService(catalogManager, tableManager);
287+
ExternalEventService externalEventService = new ExternalEventService();
287288

288289
httpServer =
289290
Javalin.create(
@@ -302,6 +303,7 @@ private void initHttpService() {
302303
() -> {
303304
dashboardServer.endpoints().addEndpoints();
304305
restCatalogService.endpoints().addEndpoints();
306+
externalEventService.endpoints().addEndpoints();
305307
});
306308

307309
httpServer.before(
@@ -318,6 +320,8 @@ private void initHttpService() {
318320
(e, ctx) -> {
319321
if (restCatalogService.needHandleException(ctx)) {
320322
restCatalogService.handleException(e, ctx);
323+
} else if (externalEventService.needHandleException(ctx)) {
324+
externalEventService.handleException(e, ctx);
321325
} else {
322326
dashboardServer.handleException(e, ctx);
323327
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.amoro.server;
20+
21+
import static io.javalin.apibuilder.ApiBuilder.path;
22+
import static io.javalin.apibuilder.ApiBuilder.post;
23+
24+
import io.javalin.apibuilder.EndpointGroup;
25+
import io.javalin.http.Context;
26+
import io.javalin.http.HttpCode;
27+
import org.apache.amoro.exception.ForbiddenException;
28+
import org.apache.amoro.exception.SignatureCheckException;
29+
import org.apache.amoro.server.dashboard.response.ErrorResponse;
30+
import org.apache.amoro.server.dashboard.response.OkResponse;
31+
import org.apache.amoro.server.persistence.PersistentBase;
32+
import org.apache.amoro.server.scheduler.inline.InlineTableExecutors;
33+
import org.apache.amoro.table.TableIdentifier;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
public class ExternalEventService extends PersistentBase {
38+
private static final Logger LOG = LoggerFactory.getLogger(ExternalEventService.class);
39+
40+
public static final String REFRESH_REST_API_PREFIX = "/api/ams/v1/refresh/";
41+
42+
public ExternalEventService() {}
43+
44+
public EndpointGroup endpoints() {
45+
return () -> {
46+
// for refresh rest api
47+
path(
48+
REFRESH_REST_API_PREFIX,
49+
() -> {
50+
post(
51+
"/catalog/{catalog}/database/{database}/table/{table}",
52+
this::receiveExternalTableRefreshEvent);
53+
});
54+
};
55+
}
56+
57+
public boolean needHandleException(Context ctx) {
58+
return ctx.req.getRequestURI().startsWith(REFRESH_REST_API_PREFIX);
59+
}
60+
61+
public void handleException(Exception e, Context ctx) {
62+
if (e instanceof ForbiddenException) {
63+
ctx.json(new ErrorResponse(HttpCode.FORBIDDEN, "Please check authentication", ""));
64+
65+
} else if (e instanceof SignatureCheckException) {
66+
ctx.json(new ErrorResponse(HttpCode.FORBIDDEN, "Signature check failed", ""));
67+
} else {
68+
ctx.json(new ErrorResponse(HttpCode.INTERNAL_SERVER_ERROR, e.getMessage(), ""));
69+
}
70+
LOG.error("Error when handle refresh event", e);
71+
}
72+
73+
/** POST /api/ams/refresh/catalog/{catalog}/database/{database}/table/{table} */
74+
public void receiveExternalTableRefreshEvent(Context ctx) {
75+
String catalog = ctx.pathParam("catalog").trim().replaceAll("^\"|\"$", "");
76+
String database = ctx.pathParam("database").trim().replaceAll("^\"|\"$", "");
77+
String table = ctx.pathParam("table").trim().replaceAll("^\"|\"$", "");
78+
79+
TableIdentifier tableIdentifier = TableIdentifier.of(catalog, database, table);
80+
boolean result =
81+
InlineTableExecutors.getInstance()
82+
.getTableRefreshingExecutor()
83+
.addTableToRefresh(tableIdentifier);
84+
if (result) {
85+
ctx.json(OkResponse.of("Table added to wait for refreshing"));
86+
}
87+
ctx.json(ErrorResponse.of("Table not managed by event trigger"));
88+
}
89+
}

amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.amoro.exception.ForbiddenException;
3737
import org.apache.amoro.exception.SignatureCheckException;
3838
import org.apache.amoro.server.AmoroManagementConf;
39+
import org.apache.amoro.server.ExternalEventService;
3940
import org.apache.amoro.server.RestCatalogService;
4041
import org.apache.amoro.server.authentication.HttpAuthenticationFactory;
4142
import org.apache.amoro.server.catalog.CatalogManager;
@@ -467,7 +468,8 @@ public void handleException(Exception e, Context ctx) {
467468
"/swagger-docs",
468469
"/api/ams/v1/api/token/calculate/signature",
469470
"/api/ams/v1/api/token/calculate/encryptString",
470-
RestCatalogService.ICEBERG_REST_API_PREFIX + "/*"
471+
RestCatalogService.ICEBERG_REST_API_PREFIX + "/*",
472+
ExternalEventService.REFRESH_REST_API_PREFIX + "/*"
471473
};
472474

473475
private static boolean inWhiteList(String uri) {

amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ public void setup(TableService tableService, Configurations conf) {
7878
tableService,
7979
conf.getInteger(AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT),
8080
conf.get(AmoroManagementConf.REFRESH_TABLES_INTERVAL).toMillis(),
81-
conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS));
81+
conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS),
82+
conf.get(AmoroManagementConf.REFRESH_TABLES_MAX_INTERVAL).toMillis());
8283
if (conf.getBoolean(AmoroManagementConf.AUTO_CREATE_TAGS_ENABLED)) {
8384
this.tagsAutoCreatingExecutor =
8485
new TagsAutoCreatingExecutor(

amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java

Lines changed: 122 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.amoro.server.scheduler.inline;
2020

2121
import org.apache.amoro.AmoroTable;
22+
import org.apache.amoro.ServerTableIdentifier;
2223
import org.apache.amoro.TableRuntime;
2324
import org.apache.amoro.config.TableConfiguration;
2425
import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
@@ -31,21 +32,61 @@
3132
import org.apache.amoro.server.utils.IcebergTableUtil;
3233
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
3334
import org.apache.amoro.table.MixedTable;
35+
import org.apache.amoro.table.TableIdentifier;
36+
37+
import java.util.Collections;
38+
import java.util.HashSet;
39+
import java.util.List;
40+
import java.util.Map;
41+
import java.util.Set;
42+
import java.util.concurrent.ConcurrentHashMap;
3443

3544
/** Executor that refreshes table runtimes and evaluates optimizing status periodically. */
3645
public class TableRuntimeRefreshExecutor extends PeriodicTableScheduler {
3746

3847
// 1 minutes
3948
private final long interval;
49+
// 1 hour
50+
private final long maxInterval;
4051
private final int maxPendingPartitions;
52+
// Tables configured to be triggered by events
53+
protected final Map<TableIdentifier, ServerTableIdentifier> managedEventTriggerTables =
54+
new ConcurrentHashMap<>();
55+
// Tables to be refreshed in the next execution schedule
56+
protected final Set<ServerTableIdentifier> pendingRefreshTables =
57+
Collections.synchronizedSet(new HashSet<>());
4158

4259
public TableRuntimeRefreshExecutor(
43-
TableService tableService, int poolSize, long interval, int maxPendingPartitions) {
60+
TableService tableService,
61+
int poolSize,
62+
long interval,
63+
int maxPendingPartitions,
64+
long maxInterval) {
4465
super(tableService, poolSize);
4566
this.interval = interval;
67+
this.maxInterval = maxInterval;
4668
this.maxPendingPartitions = maxPendingPartitions;
4769
}
4870

71+
@Override
72+
protected void initHandler(List<TableRuntime> tableRuntimeList) {
73+
tableRuntimeList.stream()
74+
.filter(this::enabled)
75+
.filter(
76+
tableRuntime ->
77+
tableRuntime
78+
.getTableConfiguration()
79+
.getOptimizingConfig()
80+
.isEventTriggeredRefresh())
81+
.forEach(
82+
tableRuntime -> {
83+
managedEventTriggerTables.put(
84+
tableRuntime.getTableIdentifier().getIdentifier(),
85+
tableRuntime.getTableIdentifier());
86+
});
87+
super.initHandler(tableRuntimeList);
88+
}
89+
4990
@Override
5091
protected boolean enabled(TableRuntime tableRuntime) {
5192
return tableRuntime instanceof DefaultTableRuntime;
@@ -91,6 +132,38 @@ public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration or
91132
optimizingProcess.close(false);
92133
}
93134
}
135+
// Add or remove managed event trigger table when the configuration changes
136+
if (defaultTableRuntime
137+
.getTableConfiguration()
138+
.getOptimizingConfig()
139+
.isEventTriggeredRefresh()) {
140+
addManagedEventTriggerTable(defaultTableRuntime);
141+
} else {
142+
removeManagedEventTriggerTable(defaultTableRuntime);
143+
}
144+
}
145+
146+
@Override
147+
public void handleTableAdded(AmoroTable<?> table, TableRuntime tableRuntime) {
148+
Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
149+
DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) tableRuntime;
150+
if (tableRuntime.getTableConfiguration().getOptimizingConfig().isEventTriggeredRefresh()) {
151+
addManagedEventTriggerTable(defaultTableRuntime);
152+
}
153+
super.handleTableAdded(table, tableRuntime);
154+
}
155+
156+
@Override
157+
public void handleTableRemoved(TableRuntime tableRuntime) {
158+
Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
159+
DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) tableRuntime;
160+
if (defaultTableRuntime
161+
.getTableConfiguration()
162+
.getOptimizingConfig()
163+
.isEventTriggeredRefresh()) {
164+
removeManagedEventTriggerTable(defaultTableRuntime);
165+
}
166+
super.handleTableRemoved(tableRuntime);
94167
}
95168

96169
@Override
@@ -104,6 +177,18 @@ public void execute(TableRuntime tableRuntime) {
104177
Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
105178
DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) tableRuntime;
106179

180+
if (defaultTableRuntime.getOptimizingConfig().isEventTriggeredRefresh()) {
181+
if (!reachMaxInterval(defaultTableRuntime)
182+
&& !pendingRefreshTables.contains(defaultTableRuntime.getTableIdentifier())) {
183+
// If the table refresh is configured refreshing by event but has not been triggered, or
184+
// the interval between the
185+
// last refresh has not reached the maximum interval, skip refreshing
186+
return;
187+
}
188+
}
189+
// continue the following table refresh process and remove it from the pending refresh tables
190+
removeTableToRefresh(defaultTableRuntime.getTableIdentifier());
191+
107192
long lastOptimizedSnapshotId = defaultTableRuntime.getLastOptimizedSnapshotId();
108193
long lastOptimizedChangeSnapshotId = defaultTableRuntime.getLastOptimizedChangeSnapshotId();
109194
AmoroTable<?> table = loadTable(tableRuntime);
@@ -121,4 +206,40 @@ public void execute(TableRuntime tableRuntime) {
121206
logger.error("Refreshing table {} failed.", tableRuntime.getTableIdentifier(), throwable);
122207
}
123208
}
209+
210+
private boolean reachMaxInterval(DefaultTableRuntime tableRuntime) {
211+
long currentTime = System.currentTimeMillis();
212+
long lastRefreshTime = tableRuntime.getLastRefreshTime();
213+
return currentTime - lastRefreshTime >= maxInterval;
214+
}
215+
216+
private void addManagedEventTriggerTable(DefaultTableRuntime tableRuntime) {
217+
managedEventTriggerTables.put(
218+
tableRuntime.getTableIdentifier().getIdentifier(), tableRuntime.getTableIdentifier());
219+
}
220+
221+
private void removeManagedEventTriggerTable(DefaultTableRuntime tableRuntime) {
222+
managedEventTriggerTables.remove(tableRuntime.getTableIdentifier().getIdentifier());
223+
removeTableToRefresh(tableRuntime.getTableIdentifier());
224+
}
225+
226+
public boolean addTableToRefresh(TableIdentifier tableIdentifier) {
227+
if (!managedEventTriggerTables.containsKey(tableIdentifier)) {
228+
logger.warn(
229+
"Table {} is not managed by event trigger, cannot add to refresh list.", tableIdentifier);
230+
return false;
231+
}
232+
pendingRefreshTables.add(managedEventTriggerTables.get(tableIdentifier));
233+
return true;
234+
}
235+
236+
public void addTableToRefresh(ServerTableIdentifier serverTableIdentifier) {
237+
this.pendingRefreshTables.add(serverTableIdentifier);
238+
logger.debug("Add table {} to refresh pending list.", serverTableIdentifier);
239+
}
240+
241+
public void removeTableToRefresh(ServerTableIdentifier serverTableIdentifier) {
242+
this.pendingRefreshTables.remove(serverTableIdentifier);
243+
logger.debug("Remove table {} from refresh pending list.", serverTableIdentifier);
244+
}
124245
}

amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ public class DefaultTableRuntime extends AbstractTableRuntime
100100
private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics;
101101
private final TableSummaryMetrics tableSummaryMetrics;
102102
private volatile long lastPlanTime;
103+
private volatile long lastRefreshTime;
103104
private volatile OptimizingProcess optimizingProcess;
104105
private final List<TaskRuntime.TaskQuota> taskQuotas = new CopyOnWriteArrayList<>();
105106

@@ -181,6 +182,14 @@ public void setLastPlanTime(long lastPlanTime) {
181182
this.lastPlanTime = lastPlanTime;
182183
}
183184

185+
public long getLastRefreshTime() {
186+
return lastRefreshTime;
187+
}
188+
189+
public void setLastRefreshTime(long lastRefreshTime) {
190+
this.lastRefreshTime = lastRefreshTime;
191+
}
192+
184193
public OptimizingStatus getOptimizingStatus() {
185194
return OptimizingStatus.ofCode(getStatusCode());
186195
}
@@ -334,6 +343,7 @@ public DefaultTableRuntime refresh(AmoroTable<?> table) {
334343
return s;
335344
})
336345
.commit();
346+
setLastRefreshTime(System.currentTimeMillis());
337347
return this;
338348
}
339349

amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,12 @@ public static OptimizingConfig parseOptimizingConfig(Map<String, String> propert
331331
PropertyUtil.propertyAsLong(
332332
properties,
333333
TableProperties.SELF_OPTIMIZING_MIN_PLAN_INTERVAL,
334-
TableProperties.SELF_OPTIMIZING_MIN_PLAN_INTERVAL_DEFAULT));
334+
TableProperties.SELF_OPTIMIZING_MIN_PLAN_INTERVAL_DEFAULT))
335+
.setEventTriggeredRefresh(
336+
PropertyUtil.propertyAsBoolean(
337+
properties,
338+
TableProperties.SELF_OPTIMIZING_REFRESH_EVENT_TRIGGERED,
339+
TableProperties.SELF_OPTIMIZING_REFRESH_EVENT_TRIGGERED_DEFAULT));
335340
}
336341

337342
/**

amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ protected void reboot() throws InterruptedException {
478478
private class TableRuntimeRefresher extends TableRuntimeRefreshExecutor {
479479

480480
public TableRuntimeRefresher() {
481-
super(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE);
481+
super(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE, Long.MAX_VALUE);
482482
}
483483

484484
void refreshPending() {

amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ private void appendData(UnkeyedTable table, int id) {
108108

109109
void refreshPending() {
110110
TableRuntimeRefreshExecutor refresher =
111-
new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE);
111+
new TableRuntimeRefreshExecutor(
112+
tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE, Long.MAX_VALUE);
112113
refresher.execute(getDefaultTableRuntime(serverTableIdentifier().getId()));
113114
refresher.dispose();
114115
}

0 commit comments

Comments
 (0)