From 5dd5c4e70e10ee3cacc318f3575c9803845c024e Mon Sep 17 00:00:00 2001 From: "manson.li" Date: Sat, 17 Jan 2026 18:37:34 +0800 Subject: [PATCH] [AMORO-3951] Implement the table merge --- .../MixedAndIcebergTableDescriptor.java | 9 +- .../server/optimizing/OptimizingQueue.java | 51 +++++++- .../mapper/TableProcessMapper.java | 16 ++- .../amoro/server/process/ProcessService.java | 11 +- .../server/process/TableProcessMeta.java | 109 ++++++++++++++++-- .../TestIcebergServerTableDescriptor.java | 24 +++- .../java/org/apache/amoro/IcebergActions.java | 7 ++ 7 files changed, 206 insertions(+), 21 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java index 19627f86f0..cf45e67b5e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java @@ -671,10 +671,17 @@ public Pair, Integer> getOptimizingProcessesInfo( int pageNumber = (offset / limit) + 1; List processMetaList = Collections.emptyList(); try (Page ignored = PageHelper.startPage(pageNumber, limit, true)) { + org.apache.amoro.Action action = null; + if (type != null && !type.isEmpty()) { + action = + org.apache.amoro.server.persistence.converter.Action2StringConverter.getActionByName( + type); + } + final org.apache.amoro.Action finalAction = action; processMetaList = getAs( TableProcessMapper.class, - mapper -> mapper.listProcessMeta(identifier.getId(), type, status)); + mapper -> mapper.listProcessMeta(identifier.getId(), finalAction, status)); PageInfo pageInfo = new PageInfo<>(processMetaList); total = (int) pageInfo.getTotal(); LOG.info( diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index ec207705c7..844d0e0f6e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -490,7 +490,7 @@ public TableOptimizingProcess( OptimizingProcessState processState) { this.tableRuntime = tableRuntime; processId = tableRuntime.getProcessId(); - optimizingType = OptimizingType.valueOf(processMeta.getProcessType()); + optimizingType = getOptimizingTypeFromAction(processMeta.getAction()); targetSnapshotId = processState.getTargetSnapshotId(); targetChangeSnapshotId = processState.getTargetChangeSnapshotId(); planTime = processMeta.getCreateTime(); @@ -528,6 +528,53 @@ public OptimizingType getOptimizingType() { return optimizingType; } + /** + * Convert OptimizingType to corresponding Action. + * + * @param optimizingType optimizing type + * @return corresponding Action + */ + private org.apache.amoro.Action getOptimizingAction(OptimizingType optimizingType) { + switch (optimizingType) { + case MINOR: + return org.apache.amoro.IcebergActions.OPTIMIZING_MINOR; + case MAJOR: + return org.apache.amoro.IcebergActions.OPTIMIZING_MAJOR; + case FULL: + return org.apache.amoro.IcebergActions.OPTIMIZING_FULL; + default: + throw new IllegalArgumentException("Unknown optimizing type: " + optimizingType); + } + } + + /** + * Convert Action to corresponding OptimizingType. + * + * @param action action + * @return corresponding OptimizingType + */ + private OptimizingType getOptimizingTypeFromAction(org.apache.amoro.Action action) { + if (action == null) { + throw new IllegalArgumentException("Action cannot be null"); + } + String actionName = action.getName(); + if (org.apache.amoro.IcebergActions.OPTIMIZING_MINOR.getName().equals(actionName)) { + return OptimizingType.MINOR; + } else if (org.apache.amoro.IcebergActions.OPTIMIZING_MAJOR.getName().equals(actionName)) { + return OptimizingType.MAJOR; + } else if (org.apache.amoro.IcebergActions.OPTIMIZING_FULL.getName().equals(actionName)) { + return OptimizingType.FULL; + } else { + // Fallback to old behavior for backward compatibility + try { + return OptimizingType.valueOf(actionName.toUpperCase()); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + "Cannot convert action " + actionName + " to OptimizingType", e); + } + } + } + @Override public ProcessStatus getStatus() { return status; @@ -838,7 +885,7 @@ private void beginAndPersistProcess() { processId, "", status, - optimizingType.name().toUpperCase(), + getOptimizingAction(optimizingType), tableRuntime.getOptimizingStatus().name().toLowerCase(), "AMORO", 0, diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableProcessMapper.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableProcessMapper.java index ed61924522..bdf19d638d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableProcessMapper.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableProcessMapper.java @@ -18,7 +18,9 @@ package org.apache.amoro.server.persistence.mapper; +import org.apache.amoro.Action; import org.apache.amoro.process.ProcessStatus; +import org.apache.amoro.server.persistence.converter.Action2StringConverter; import org.apache.amoro.server.persistence.converter.Long2TsConverter; import org.apache.amoro.server.persistence.converter.Map2StringConverter; import org.apache.amoro.server.persistence.extension.InListExtendedLanguageDriver; @@ -46,7 +48,8 @@ public interface TableProcessMapper { "INSERT INTO table_process " + "(process_id, table_id, external_process_identifier, status, process_type, process_stage, execution_engine, retry_number, " + "create_time, process_parameters, summary) " - + "VALUES (#{processId}, #{tableId}, #{externalProcessIdentifier}, #{status}, #{processType}, #{processStage}, " + + "VALUES (#{processId}, #{tableId}, #{externalProcessIdentifier}, #{status}, " + + "#{action, typeHandler=org.apache.amoro.server.persistence.converter.Action2StringConverter}, #{processStage}, " + "#{executionEngine}, #{retryNumber}, " + "#{createTime, typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}, " + "#{processParameters, typeHandler=org.apache.amoro.server.persistence.converter.Map2StringConverter}, " @@ -56,7 +59,7 @@ void insertProcess( @Param("processId") long processId, @Param("externalProcessIdentifier") String externalProcessIdentifier, @Param("status") ProcessStatus status, - @Param("processType") String processType, + @Param("action") Action action, @Param("processStage") String processStage, @Param("executionEngine") String executionEngine, @Param("retryNumber") int retryNumber, @@ -97,7 +100,10 @@ void updateProcess( @Result(column = "table_id", property = "tableId"), @Result(column = "external_process_identifier", property = "externalProcessIdentifier"), @Result(column = "status", property = "status"), - @Result(column = "process_type", property = "processType"), + @Result( + column = "process_type", + property = "action", + typeHandler = Action2StringConverter.class), @Result(column = "process_stage", property = "processStage"), @Result(column = "execution_engine", property = "executionEngine"), @Result(column = "retry_number", property = "retryNumber"), @@ -123,14 +129,14 @@ void updateProcess( + "SELECT process_id, table_id, external_process_identifier, status, process_type, process_stage, execution_engine, retry_number, " + "create_time, finish_time, fail_message, process_parameters, summary " + "FROM table_process WHERE table_id = #{tableId} " - + " AND process_type = #{processType}" + + " AND process_type = #{action.name}" + " AND status = #{status}" + " ORDER BY process_id desc" + "") @ResultMap("tableProcessMap") List listProcessMeta( @Param("tableId") long tableId, - @Param("processType") String processType, + @Param("action") Action action, @Param("status") ProcessStatus optimizingStatus); @Select( diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java index ee9f136dc6..4f5041de52 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java @@ -192,7 +192,9 @@ public void recoverProcesses(List tableRuntimes) { processMeta -> { TableRuntime tableRuntime = tableIdToRuntimes.get(processMeta.getTableId()); ActionCoordinatorScheduler scheduler = - actionCoordinators.get(processMeta.getProcessType()); + processMeta.getAction() != null + ? actionCoordinators.get(processMeta.getAction().getName()) + : null; if (tableRuntime != null && scheduler != null) { scheduler.recover( tableRuntime, @@ -201,7 +203,7 @@ public void recoverProcesses(List tableRuntimes) { tableRuntime, processMeta, scheduler.getAction(), - scheduler.PROCESS_MAX_RETRY_NUMBER)); + ActionCoordinatorScheduler.PROCESS_MAX_RETRY_NUMBER)); } }); } @@ -231,7 +233,8 @@ private void executeOrTraceProcess(TableProcess process) { actionCoordinators.get(process.store().getAction().getName()); if (scheduler != null && process.getStatus() == ProcessStatus.FAILED - && process.store().getRetryNumber() < scheduler.PROCESS_MAX_RETRY_NUMBER + && process.store().getRetryNumber() + < ActionCoordinatorScheduler.PROCESS_MAX_RETRY_NUMBER && process.getTableRuntime() != null) { process .store() @@ -343,7 +346,7 @@ public TableProcessMeta persistTableProcess(TableProcess process) { processMeta.getProcessId(), processMeta.getExternalProcessIdentifier(), processMeta.getStatus(), - processMeta.getProcessType(), + processMeta.getAction(), processMeta.getProcessStage(), processMeta.getExecutionEngine(), processMeta.getRetryNumber(), diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java index 311b60e035..6ea3976f08 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java @@ -18,6 +18,7 @@ package org.apache.amoro.server.process; +import org.apache.amoro.Action; import org.apache.amoro.process.ProcessStatus; import org.apache.amoro.process.TableProcessState; import org.apache.amoro.process.TableProcessStore; @@ -30,7 +31,7 @@ public class TableProcessMeta { private long tableId; private volatile String externalProcessIdentifier; private ProcessStatus status; - private String processType; + private Action action; private String processStage; private String executionEngine; private int retryNumber; @@ -64,12 +65,57 @@ public void setStatus(ProcessStatus status) { this.status = status; } + /** + * Get the action of this process. + * + * @return action + */ + public Action getAction() { + return action; + } + + /** + * Set the action of this process. + * + * @param action action + */ + public void setAction(Action action) { + this.action = action; + } + + /** + * Get process type (action name) for backward compatibility. + * + * @return process type name + * @deprecated Use {@link #getAction()} instead + */ + @Deprecated public String getProcessType() { - return processType; + return action != null ? action.getName() : null; } + /** + * Set process type (action name) for backward compatibility. + * + * @param processType process type name + * @deprecated Use {@link #setAction(Action)} instead + */ + @Deprecated public void setProcessType(String processType) { - this.processType = processType; + // This method is kept for backward compatibility but should not be used + // Action should be set directly via setAction() + if (processType != null && action == null) { + // Try to find action by name from registry + org.apache.amoro.server.persistence.converter.Action2StringConverter.registerCustomAction( + new Action( + new org.apache.amoro.TableFormat[] { + org.apache.amoro.TableFormat.ICEBERG, + org.apache.amoro.TableFormat.MIXED_ICEBERG, + org.apache.amoro.TableFormat.MIXED_HIVE + }, + 0, + processType)); + } } public String getProcessStage() { @@ -154,7 +200,7 @@ public TableProcessMeta copy() { meta.setFinishTime(this.finishTime); meta.setExternalProcessIdentifier(this.externalProcessIdentifier); - meta.setProcessType(this.processType); + meta.setAction(this.action); meta.setProcessStage(this.processStage); meta.setExecutionEngine(this.executionEngine); meta.setFailMessage(this.failMessage); @@ -177,7 +223,7 @@ public static TableProcessMeta fromTableProcessStore(TableProcessStore tableProc tableProcessMeta.setTableId(tableProcessStore.getTableId()); tableProcessMeta.setExternalProcessIdentifier(tableProcessStore.getExternalProcessIdentifier()); tableProcessMeta.setStatus(tableProcessStore.getStatus()); - tableProcessMeta.setProcessType(tableProcessStore.getProcessType()); + tableProcessMeta.setAction(tableProcessStore.getAction()); tableProcessMeta.setProcessStage(tableProcessStore.getProcessStage()); tableProcessMeta.setExecutionEngine(tableProcessStore.getExecutionEngine()); tableProcessMeta.setRetryNumber(tableProcessStore.getRetryNumber()); @@ -196,7 +242,7 @@ public static TableProcessMeta fromTableProcessState(TableProcessState tableProc tableProcessMeta.setTableId(tableProcessState.getTableIdentifier().getId()); tableProcessMeta.setExternalProcessIdentifier(tableProcessState.getExternalProcessIdentifier()); tableProcessMeta.setStatus(tableProcessState.getStatus()); - tableProcessMeta.setProcessType(tableProcessState.getAction().getName()); + tableProcessMeta.setAction(tableProcessState.getAction()); tableProcessMeta.setProcessStage(tableProcessState.getStage().getDesc()); tableProcessMeta.setExecutionEngine(tableProcessState.getExecutionEngine()); tableProcessMeta.setRetryNumber(tableProcessState.getRetryNumber()); @@ -208,10 +254,20 @@ public static TableProcessMeta fromTableProcessState(TableProcessState tableProc return tableProcessMeta; } + /** + * Create a TableProcessMeta with Action. + * + * @param processId process id + * @param tableId table id + * @param action action + * @param executionEngine execution engine + * @param processParameters process parameters + * @return TableProcessMeta instance + */ public static TableProcessMeta of( long processId, long tableId, - String actionName, + Action action, String executionEngine, Map processParameters) { TableProcessMeta tableProcessMeta = new TableProcessMeta(); @@ -219,7 +275,7 @@ public static TableProcessMeta of( tableProcessMeta.setTableId(tableId); tableProcessMeta.setExternalProcessIdentifier(""); tableProcessMeta.setStatus(ProcessStatus.UNKNOWN); - tableProcessMeta.setProcessType(actionName); + tableProcessMeta.setAction(action); tableProcessMeta.setProcessStage(ProcessStatus.UNKNOWN.name()); tableProcessMeta.setExecutionEngine(executionEngine); tableProcessMeta.setRetryNumber(0); @@ -230,4 +286,41 @@ public static TableProcessMeta of( tableProcessMeta.setSummary(new HashMap<>()); return tableProcessMeta; } + + /** + * Create a TableProcessMeta with action name (for backward compatibility). + * + * @param processId process id + * @param tableId table id + * @param actionName action name + * @param executionEngine execution engine + * @param processParameters process parameters + * @return TableProcessMeta instance + * @deprecated Use {@link #of(long, long, Action, String, Map)} instead + */ + @Deprecated + public static TableProcessMeta of( + long processId, + long tableId, + String actionName, + String executionEngine, + Map processParameters) { + // Try to find action from registry + Action action = + org.apache.amoro.server.persistence.converter.Action2StringConverter.getActionByName( + actionName); + if (action == null) { + // Create a temporary action if not found + action = + new Action( + new org.apache.amoro.TableFormat[] { + org.apache.amoro.TableFormat.ICEBERG, + org.apache.amoro.TableFormat.MIXED_ICEBERG, + org.apache.amoro.TableFormat.MIXED_HIVE + }, + 0, + actionName); + } + return of(processId, tableId, action, executionEngine, processParameters); + } } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java index 265a5700da..335c88f57d 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java @@ -21,7 +21,9 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import org.apache.amoro.Action; import org.apache.amoro.AmoroTable; +import org.apache.amoro.IcebergActions; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; import org.apache.amoro.formats.AmoroCatalogTestHelper; @@ -387,6 +389,7 @@ public void insertOptimizingProcess( MetricsSummary summary, Map fromSequence, Map toSequence) { + Action action = getOptimizingAction(type); doAs( TableProcessMapper.class, mapper -> @@ -395,7 +398,7 @@ public void insertOptimizingProcess( processId, "", status, - type.name(), + action, type.name(), "AMORO", 0, @@ -413,5 +416,24 @@ public void insertOptimizingProcess( fromSequence, toSequence)); } + + /** + * Convert OptimizingType to corresponding Action. + * + * @param optimizingType optimizing type + * @return corresponding Action + */ + private Action getOptimizingAction(OptimizingType optimizingType) { + switch (optimizingType) { + case MINOR: + return IcebergActions.OPTIMIZING_MINOR; + case MAJOR: + return IcebergActions.OPTIMIZING_MAJOR; + case FULL: + return IcebergActions.OPTIMIZING_FULL; + default: + throw new IllegalArgumentException("Unknown optimizing type: " + optimizingType); + } + } } } diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java index 76f470d98c..bafca9c6ae 100644 --- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java +++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java @@ -28,4 +28,11 @@ public class IcebergActions { public static final Action DELETE_ORPHANS = new Action(DEFAULT_FORMATS, 2, "delete-orphans"); public static final Action SYNC_HIVE = new Action(DEFAULT_FORMATS, 3, "sync-hive"); public static final Action EXPIRE_DATA = new Action(DEFAULT_FORMATS, 1, "expire-data"); + + // Optimizing Actions + public static final Action OPTIMIZING_MINOR = + new Action(DEFAULT_FORMATS, 100, "optimizing-minor"); + public static final Action OPTIMIZING_MAJOR = + new Action(DEFAULT_FORMATS, 200, "optimizing-major"); + public static final Action OPTIMIZING_FULL = new Action(DEFAULT_FORMATS, 300, "optimizing-full"); }