Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -671,10 +671,17 @@ public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
int pageNumber = (offset / limit) + 1;
List<TableProcessMeta> processMetaList = Collections.emptyList();
try (Page<?> ignored = PageHelper.startPage(pageNumber, limit, true)) {
org.apache.amoro.Action action = null;
if (type != null && !type.isEmpty()) {
action =
org.apache.amoro.server.persistence.converter.Action2StringConverter.getActionByName(
type);
}
final org.apache.amoro.Action finalAction = action;
processMetaList =
getAs(
TableProcessMapper.class,
mapper -> mapper.listProcessMeta(identifier.getId(), type, status));
mapper -> mapper.listProcessMeta(identifier.getId(), finalAction, status));
PageInfo<TableProcessMeta> pageInfo = new PageInfo<>(processMetaList);
total = (int) pageInfo.getTotal();
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ public TableOptimizingProcess(
OptimizingProcessState processState) {
this.tableRuntime = tableRuntime;
processId = tableRuntime.getProcessId();
optimizingType = OptimizingType.valueOf(processMeta.getProcessType());
optimizingType = getOptimizingTypeFromAction(processMeta.getAction());
targetSnapshotId = processState.getTargetSnapshotId();
targetChangeSnapshotId = processState.getTargetChangeSnapshotId();
planTime = processMeta.getCreateTime();
Expand Down Expand Up @@ -528,6 +528,53 @@ public OptimizingType getOptimizingType() {
return optimizingType;
}

/**
* Convert OptimizingType to corresponding Action.
*
* @param optimizingType optimizing type
* @return corresponding Action
*/
private org.apache.amoro.Action getOptimizingAction(OptimizingType optimizingType) {
switch (optimizingType) {
case MINOR:
return org.apache.amoro.IcebergActions.OPTIMIZING_MINOR;
case MAJOR:
return org.apache.amoro.IcebergActions.OPTIMIZING_MAJOR;
case FULL:
return org.apache.amoro.IcebergActions.OPTIMIZING_FULL;
default:
throw new IllegalArgumentException("Unknown optimizing type: " + optimizingType);
}
}

/**
* Convert Action to corresponding OptimizingType.
*
* @param action action
* @return corresponding OptimizingType
*/
private OptimizingType getOptimizingTypeFromAction(org.apache.amoro.Action action) {
if (action == null) {
throw new IllegalArgumentException("Action cannot be null");
}
String actionName = action.getName();
if (org.apache.amoro.IcebergActions.OPTIMIZING_MINOR.getName().equals(actionName)) {
return OptimizingType.MINOR;
} else if (org.apache.amoro.IcebergActions.OPTIMIZING_MAJOR.getName().equals(actionName)) {
return OptimizingType.MAJOR;
} else if (org.apache.amoro.IcebergActions.OPTIMIZING_FULL.getName().equals(actionName)) {
return OptimizingType.FULL;
} else {
// Fallback to old behavior for backward compatibility
try {
return OptimizingType.valueOf(actionName.toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Cannot convert action " + actionName + " to OptimizingType", e);
}
}
}

@Override
public ProcessStatus getStatus() {
return status;
Expand Down Expand Up @@ -838,7 +885,7 @@ private void beginAndPersistProcess() {
processId,
"",
status,
optimizingType.name().toUpperCase(),
getOptimizingAction(optimizingType),
tableRuntime.getOptimizingStatus().name().toLowerCase(),
"AMORO",
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.amoro.server.persistence.mapper;

import org.apache.amoro.Action;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.persistence.converter.Action2StringConverter;
import org.apache.amoro.server.persistence.converter.Long2TsConverter;
import org.apache.amoro.server.persistence.converter.Map2StringConverter;
import org.apache.amoro.server.persistence.extension.InListExtendedLanguageDriver;
Expand Down Expand Up @@ -46,7 +48,8 @@ public interface TableProcessMapper {
"INSERT INTO table_process "
+ "(process_id, table_id, external_process_identifier, status, process_type, process_stage, execution_engine, retry_number, "
+ "create_time, process_parameters, summary) "
+ "VALUES (#{processId}, #{tableId}, #{externalProcessIdentifier}, #{status}, #{processType}, #{processStage}, "
+ "VALUES (#{processId}, #{tableId}, #{externalProcessIdentifier}, #{status}, "
+ "#{action, typeHandler=org.apache.amoro.server.persistence.converter.Action2StringConverter}, #{processStage}, "
+ "#{executionEngine}, #{retryNumber}, "
+ "#{createTime, typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}, "
+ "#{processParameters, typeHandler=org.apache.amoro.server.persistence.converter.Map2StringConverter}, "
Expand All @@ -56,7 +59,7 @@ void insertProcess(
@Param("processId") long processId,
@Param("externalProcessIdentifier") String externalProcessIdentifier,
@Param("status") ProcessStatus status,
@Param("processType") String processType,
@Param("action") Action action,
@Param("processStage") String processStage,
@Param("executionEngine") String executionEngine,
@Param("retryNumber") int retryNumber,
Expand Down Expand Up @@ -97,7 +100,10 @@ void updateProcess(
@Result(column = "table_id", property = "tableId"),
@Result(column = "external_process_identifier", property = "externalProcessIdentifier"),
@Result(column = "status", property = "status"),
@Result(column = "process_type", property = "processType"),
@Result(
column = "process_type",
property = "action",
typeHandler = Action2StringConverter.class),
@Result(column = "process_stage", property = "processStage"),
@Result(column = "execution_engine", property = "executionEngine"),
@Result(column = "retry_number", property = "retryNumber"),
Expand All @@ -123,14 +129,14 @@ void updateProcess(
+ "SELECT process_id, table_id, external_process_identifier, status, process_type, process_stage, execution_engine, retry_number, "
+ "create_time, finish_time, fail_message, process_parameters, summary "
+ "FROM table_process WHERE table_id = #{tableId} "
+ " <if test='processType != null'> AND process_type = #{processType}</if>"
+ " <if test='action != null'> AND process_type = #{action.name}</if>"
+ " <if test='status != null'> AND status = #{status}</if>"
+ " ORDER BY process_id desc"
+ "</script>")
@ResultMap("tableProcessMap")
List<TableProcessMeta> listProcessMeta(
@Param("tableId") long tableId,
@Param("processType") String processType,
@Param("action") Action action,
@Param("status") ProcessStatus optimizingStatus);

@Select(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ public void recoverProcesses(List<TableRuntime> tableRuntimes) {
processMeta -> {
TableRuntime tableRuntime = tableIdToRuntimes.get(processMeta.getTableId());
ActionCoordinatorScheduler scheduler =
actionCoordinators.get(processMeta.getProcessType());
processMeta.getAction() != null
? actionCoordinators.get(processMeta.getAction().getName())
: null;
if (tableRuntime != null && scheduler != null) {
scheduler.recover(
tableRuntime,
Expand All @@ -201,7 +203,7 @@ public void recoverProcesses(List<TableRuntime> tableRuntimes) {
tableRuntime,
processMeta,
scheduler.getAction(),
scheduler.PROCESS_MAX_RETRY_NUMBER));
ActionCoordinatorScheduler.PROCESS_MAX_RETRY_NUMBER));
}
});
}
Expand Down Expand Up @@ -231,7 +233,8 @@ private void executeOrTraceProcess(TableProcess process) {
actionCoordinators.get(process.store().getAction().getName());
if (scheduler != null
&& process.getStatus() == ProcessStatus.FAILED
&& process.store().getRetryNumber() < scheduler.PROCESS_MAX_RETRY_NUMBER
&& process.store().getRetryNumber()
< ActionCoordinatorScheduler.PROCESS_MAX_RETRY_NUMBER
&& process.getTableRuntime() != null) {
process
.store()
Expand Down Expand Up @@ -343,7 +346,7 @@ public TableProcessMeta persistTableProcess(TableProcess process) {
processMeta.getProcessId(),
processMeta.getExternalProcessIdentifier(),
processMeta.getStatus(),
processMeta.getProcessType(),
processMeta.getAction(),
processMeta.getProcessStage(),
processMeta.getExecutionEngine(),
processMeta.getRetryNumber(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.amoro.server.process;

import org.apache.amoro.Action;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.process.TableProcessState;
import org.apache.amoro.process.TableProcessStore;
Expand All @@ -30,7 +31,7 @@ public class TableProcessMeta {
private long tableId;
private volatile String externalProcessIdentifier;
private ProcessStatus status;
private String processType;
private Action action;
private String processStage;
private String executionEngine;
private int retryNumber;
Expand Down Expand Up @@ -64,12 +65,57 @@ public void setStatus(ProcessStatus status) {
this.status = status;
}

/**
* Get the action of this process.
*
* @return action
*/
public Action getAction() {
return action;
}

/**
* Set the action of this process.
*
* @param action action
*/
public void setAction(Action action) {
this.action = action;
}

/**
* Get process type (action name) for backward compatibility.
*
* @return process type name
* @deprecated Use {@link #getAction()} instead
*/
@Deprecated
public String getProcessType() {
return processType;
return action != null ? action.getName() : null;
}

/**
* Set process type (action name) for backward compatibility.
*
* @param processType process type name
* @deprecated Use {@link #setAction(Action)} instead
*/
@Deprecated
public void setProcessType(String processType) {
this.processType = processType;
// This method is kept for backward compatibility but should not be used
// Action should be set directly via setAction()
if (processType != null && action == null) {
// Try to find action by name from registry
org.apache.amoro.server.persistence.converter.Action2StringConverter.registerCustomAction(
new Action(
new org.apache.amoro.TableFormat[] {
org.apache.amoro.TableFormat.ICEBERG,
org.apache.amoro.TableFormat.MIXED_ICEBERG,
org.apache.amoro.TableFormat.MIXED_HIVE
},
0,
processType));
}
}

public String getProcessStage() {
Expand Down Expand Up @@ -154,7 +200,7 @@ public TableProcessMeta copy() {
meta.setFinishTime(this.finishTime);

meta.setExternalProcessIdentifier(this.externalProcessIdentifier);
meta.setProcessType(this.processType);
meta.setAction(this.action);
meta.setProcessStage(this.processStage);
meta.setExecutionEngine(this.executionEngine);
meta.setFailMessage(this.failMessage);
Expand All @@ -177,7 +223,7 @@ public static TableProcessMeta fromTableProcessStore(TableProcessStore tableProc
tableProcessMeta.setTableId(tableProcessStore.getTableId());
tableProcessMeta.setExternalProcessIdentifier(tableProcessStore.getExternalProcessIdentifier());
tableProcessMeta.setStatus(tableProcessStore.getStatus());
tableProcessMeta.setProcessType(tableProcessStore.getProcessType());
tableProcessMeta.setAction(tableProcessStore.getAction());
tableProcessMeta.setProcessStage(tableProcessStore.getProcessStage());
tableProcessMeta.setExecutionEngine(tableProcessStore.getExecutionEngine());
tableProcessMeta.setRetryNumber(tableProcessStore.getRetryNumber());
Expand All @@ -196,7 +242,7 @@ public static TableProcessMeta fromTableProcessState(TableProcessState tableProc
tableProcessMeta.setTableId(tableProcessState.getTableIdentifier().getId());
tableProcessMeta.setExternalProcessIdentifier(tableProcessState.getExternalProcessIdentifier());
tableProcessMeta.setStatus(tableProcessState.getStatus());
tableProcessMeta.setProcessType(tableProcessState.getAction().getName());
tableProcessMeta.setAction(tableProcessState.getAction());
tableProcessMeta.setProcessStage(tableProcessState.getStage().getDesc());
tableProcessMeta.setExecutionEngine(tableProcessState.getExecutionEngine());
tableProcessMeta.setRetryNumber(tableProcessState.getRetryNumber());
Expand All @@ -208,18 +254,28 @@ public static TableProcessMeta fromTableProcessState(TableProcessState tableProc
return tableProcessMeta;
}

/**
* Create a TableProcessMeta with Action.
*
* @param processId process id
* @param tableId table id
* @param action action
* @param executionEngine execution engine
* @param processParameters process parameters
* @return TableProcessMeta instance
*/
public static TableProcessMeta of(
long processId,
long tableId,
String actionName,
Action action,
String executionEngine,
Map<String, String> processParameters) {
TableProcessMeta tableProcessMeta = new TableProcessMeta();
tableProcessMeta.setProcessId(processId);
tableProcessMeta.setTableId(tableId);
tableProcessMeta.setExternalProcessIdentifier("");
tableProcessMeta.setStatus(ProcessStatus.UNKNOWN);
tableProcessMeta.setProcessType(actionName);
tableProcessMeta.setAction(action);
tableProcessMeta.setProcessStage(ProcessStatus.UNKNOWN.name());
tableProcessMeta.setExecutionEngine(executionEngine);
tableProcessMeta.setRetryNumber(0);
Expand All @@ -230,4 +286,41 @@ public static TableProcessMeta of(
tableProcessMeta.setSummary(new HashMap<>());
return tableProcessMeta;
}

/**
* Create a TableProcessMeta with action name (for backward compatibility).
*
* @param processId process id
* @param tableId table id
* @param actionName action name
* @param executionEngine execution engine
* @param processParameters process parameters
* @return TableProcessMeta instance
* @deprecated Use {@link #of(long, long, Action, String, Map)} instead
*/
@Deprecated
public static TableProcessMeta of(
long processId,
long tableId,
String actionName,
String executionEngine,
Map<String, String> processParameters) {
// Try to find action from registry
Action action =
org.apache.amoro.server.persistence.converter.Action2StringConverter.getActionByName(
actionName);
if (action == null) {
// Create a temporary action if not found
action =
new Action(
new org.apache.amoro.TableFormat[] {
org.apache.amoro.TableFormat.ICEBERG,
org.apache.amoro.TableFormat.MIXED_ICEBERG,
org.apache.amoro.TableFormat.MIXED_HIVE
},
0,
actionName);
}
return of(processId, tableId, action, executionEngine, processParameters);
}
}
Loading
Loading