Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -782,11 +782,6 @@
<artifactId>commons-compress</artifactId>
<version>${commons-compress.version}</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.14</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.slf4j.MDC;

public abstract class RouteLogCallable<T> implements Callable<T> {

public static final String LOG_PATH_PATTERN = "%s/%s/%s/%s.log";
protected static Logger log = LogManager.getLogger(RouteLogCallable.class);
private final String workSpace;
private final String taskId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public final class AlarmUtils {
* Base alarm message names
*/
public static final String CLUSTER_NAME = "Cluster";
public static final String INSTANCE_NAME = "instanceId";
public static final String TENANT_NAME = "Tenant";
public static final String ORGANIZATION_NAME = "OrganizationId";
public static final String MESSAGE_NAME = "Message";
Expand All @@ -42,6 +43,7 @@ public final class AlarmUtils {
public static final String RESOURCE_TYPE = "ResourceType";
public static final String TASK_TYPE_NAME = "TaskType";
public static final String SCHEDULE_ID_NAME = "ScheduleId";
public static final String FLOW_INSTANCE_ID_NAME = "FlowInstanceId";
public static final Collection<String> TASK_FRAMEWORK_ALARM_DIGEST_NAMES =
Arrays.asList(CLUSTER_NAME, TENANT_NAME, SCHEDULE_ID_NAME);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.oceanbase.odc.server.web.controller.v2;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
Expand All @@ -25,7 +27,10 @@
import com.oceanbase.odc.service.common.response.SuccessResponse;
import com.oceanbase.odc.service.schedule.export.ScheduleExportService;
import com.oceanbase.odc.service.schedule.export.model.FileExportResponse;
import com.oceanbase.odc.service.schedule.export.model.ScheduleExportListView;
import com.oceanbase.odc.service.schedule.export.model.ScheduleTaskExportRequest;
import com.oceanbase.odc.service.state.model.StateName;
import com.oceanbase.odc.service.state.model.StatefulRoute;

@RequestMapping("/api/v2/export")
@RestController
Expand All @@ -34,8 +39,27 @@ public class ExportController {
@Autowired
private ScheduleExportService scheduleExportService;

@RequestMapping(value = "/exportScheduleTask", method = RequestMethod.POST)
public SuccessResponse<FileExportResponse> exportScheduleTask(@RequestBody ScheduleTaskExportRequest request) {
return Responses.success(scheduleExportService.export(request));
@RequestMapping(value = "getExportListView", method = RequestMethod.POST)
public SuccessResponse<List<ScheduleExportListView>> getExportListView(
@RequestBody ScheduleTaskExportRequest request) {
return Responses.success(scheduleExportService.getExportListView(request));
}

@RequestMapping(value = "/exportSchedule", method = RequestMethod.POST)
public SuccessResponse<String> exportScheduleTask2(@RequestBody ScheduleTaskExportRequest request) {
return Responses.success(scheduleExportService.startExport(request));
}

@RequestMapping(value = "/getExportResult", method = RequestMethod.GET)
@StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#exportId")
public SuccessResponse<FileExportResponse> exportScheduleTask(String exportId) {
return Responses.success(scheduleExportService.getExportResult(exportId));
}

@RequestMapping(value = "/getExportLog", method = RequestMethod.GET)
@StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#exportId")
public SuccessResponse<String> getExportLog(String exportId) {
return Responses.success(scheduleExportService.getExportLog(exportId));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.oceanbase.odc.server.web.controller.v2;

import java.io.IOException;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -49,6 +50,7 @@
import com.oceanbase.odc.service.common.util.WebResponseUtils;
import com.oceanbase.odc.service.flow.FlowInstanceService;
import com.oceanbase.odc.service.flow.FlowTaskInstanceService;
import com.oceanbase.odc.service.flow.model.BatchTerminateFlowResult;
import com.oceanbase.odc.service.flow.model.BinaryDataResult;
import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq;
import com.oceanbase.odc.service.flow.model.FlowInstanceApprovalReq;
Expand All @@ -61,6 +63,8 @@
import com.oceanbase.odc.service.partitionplan.model.PartitionPlanConfig;
import com.oceanbase.odc.service.schedule.ScheduleService;
import com.oceanbase.odc.service.session.model.SqlExecuteResult;
import com.oceanbase.odc.service.state.model.StateName;
import com.oceanbase.odc.service.state.model.StatefulRoute;
import com.oceanbase.odc.service.task.model.OdcTaskLogLevel;

import io.swagger.annotations.ApiOperation;
Expand Down Expand Up @@ -251,4 +255,24 @@ public SuccessResponse<PartitionPlanConfig> getPartitionPlan(@PathVariable Long
return Responses.ok(this.partitionPlanScheduleService.getPartitionPlanByFlowInstanceId(id));
}

@ApiOperation(value = "cancelFlowInstance", notes = "批量终止流程")
@RequestMapping(value = "/asyncCancel", method = RequestMethod.POST)
public SuccessResponse<String> batchCancelFlowInstance(@RequestBody Collection<Long> flowInstanceIds) {
return Responses.single(flowInstanceService.startBatchCancelFlowInstance(flowInstanceIds));
}

@ApiOperation(value = "getBatchCancelResult", notes = "获取批量终止结果")
@RequestMapping(value = "/asyncCancelResult", method = RequestMethod.GET)
@StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#terminateId")
public SuccessResponse<List<BatchTerminateFlowResult>> getBatchCancelResult(String terminateId) {
return Responses.single(flowInstanceService.getBatchCancelResult(terminateId));
}

@ApiOperation(value = "getBatchCancelLog", notes = "获取批量终止日志")
@RequestMapping(value = "/asyncCancelLog", method = RequestMethod.GET)
@StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#terminateId")
public SuccessResponse<String> getBatchCancelLog(String terminateId) {
return Responses.single(flowInstanceService.getBatchCancelLog(terminateId));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import com.oceanbase.odc.service.common.util.WebResponseUtils;
import com.oceanbase.odc.service.dlm.model.RateLimitConfiguration;
import com.oceanbase.odc.service.schedule.ScheduleService;
import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateCmd;
import com.oceanbase.odc.service.schedule.export.model.ScheduleTerminateResult;
import com.oceanbase.odc.service.schedule.model.ChangeScheduleResp;
import com.oceanbase.odc.service.schedule.model.CreateScheduleReq;
import com.oceanbase.odc.service.schedule.model.OperationType;
Expand All @@ -59,6 +61,8 @@
import com.oceanbase.odc.service.schedule.model.ScheduleTaskOverview;
import com.oceanbase.odc.service.schedule.model.ScheduleType;
import com.oceanbase.odc.service.schedule.model.UpdateScheduleReq;
import com.oceanbase.odc.service.state.model.StateName;
import com.oceanbase.odc.service.state.model.StatefulRoute;
import com.oceanbase.odc.service.task.executor.logger.LogUtils;
import com.oceanbase.odc.service.task.model.OdcTaskLogLevel;

Expand Down Expand Up @@ -286,6 +290,23 @@ public SuccessResponse<RateLimitConfiguration> updateLimiterConfig(@PathVariable
return Responses.single(scheduleService.updateDlmRateLimit(id, limiterConfig));
}

@RequestMapping(value = "/schedules/asyncTerminate", method = RequestMethod.POST)
public SuccessResponse<String> startTerminateScheduleAndTask(@RequestBody ScheduleTerminateCmd cmd) {
return Responses.ok(scheduleService.startTerminateScheduleAndTask(cmd));
}

@RequestMapping(value = "/schedules/asyncTerminateResult", method = RequestMethod.GET)
@StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#terminateId")
public SuccessResponse<List<ScheduleTerminateResult>> getTerminateScheduleResult(String terminateId) {
return Responses.ok(scheduleService.getTerminateScheduleResult(terminateId));
}

@RequestMapping(value = "/schedules/asyncTerminateLog", method = RequestMethod.GET)
@StatefulRoute(stateName = StateName.UUID_STATEFUL_ID, stateIdExpression = "#terminateId")
public SuccessResponse<String> getTerminateScheduleLog(String terminateId) {
return Responses.ok(scheduleService.getTerminateLog(terminateId));
}

@RequestMapping(value = "/schedules/stats", method = RequestMethod.GET)
public ListResponse<ScheduleStat> getScheduleStats(
@RequestParam(required = false, name = "types") Set<ScheduleType> types,
Expand Down
4 changes: 4 additions & 0 deletions server/odc-server/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,10 @@
<AppenderRef ref="RouteLogRunnableAppender"/>
</Logger>

<Logger name="com.oceanbase.odc.service.schedule.ScheduleService" level="INFO" additivity="true">
<AppenderRef ref="RouteLogRunnableAppender"/>
</Logger>

<!-- ODC程序日志输出,输出级别 INFO -->
<Logger name="com.oceanbase.odc" level="INFO" additivity="false">
<AppenderRef ref="OdcFileAppender"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ public ThreadPoolTaskExecutor queryProfileMonitorExecutor() {
return executor;
}

@Bean(name = "scheduleImportExecutor")
public ThreadPoolTaskExecutor scheduleImportExecutor() {
@Bean(name = "commonAsyncTaskExecutor")
public ThreadPoolTaskExecutor commonAsyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int minPoolSize = Math.max(SystemUtils.availableProcessors(), 4);
executor.setCorePoolSize(minPoolSize);
Expand All @@ -324,7 +324,7 @@ public ThreadPoolTaskExecutor scheduleImportExecutor() {
executor.setTaskDecorator(new TraceDecorator<>());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.initialize();
log.info("scheduleImportExecutor initialized");
log.info("commonAsyncTaskExecutor initialized");
return executor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import com.oceanbase.odc.service.iam.ProjectPermissionValidator;
import com.oceanbase.odc.service.iam.ResourceRoleService;
import com.oceanbase.odc.service.iam.UserOrganizationService;
import com.oceanbase.odc.service.iam.UserService;
import com.oceanbase.odc.service.iam.auth.AuthenticationFacade;
import com.oceanbase.odc.service.iam.auth.AuthorizationFacade;
import com.oceanbase.odc.service.iam.model.User;
Expand Down Expand Up @@ -164,7 +165,8 @@ public class ProjectService {
@Autowired
@Lazy
private FlowInstanceService flowInstanceService;

@Autowired
private UserService userService;
@Value("${odc.integration.bastion.enabled:false}")
private boolean bastionEnabled;

Expand Down Expand Up @@ -340,7 +342,11 @@ public Page<Project> list(@Valid QueryProjectParams params, @NotNull Pageable pa

@SkipAuthorize("odc internal usage")
public List<Project> listByIds(@NotEmpty Set<Long> ids) {
return repository.findAllById(ids).stream().map(this::entityToModel).collect(Collectors.toList());
List<Project> projects =
repository.findAllById(ids).stream().map(projectMapper::entityToModel).collect(Collectors.toList());
userService.assignInnerUserByCreatorId(projects, c -> c.getCreator().getId(), Project::setCreator);
userService.assignInnerUserByCreatorId(projects, c -> c.getLastModifier().getId(), Project::setCreator);
return projects;
}

private Page<ProjectEntity> innerList(@Valid QueryProjectParams params, @NotNull Pageable pageable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class FutureCache {

private final Cache<String, Future<?>> tempId2Future =
Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.MINUTES)
.maximumSize(1000L)
.removalListener((String key, Future<?> future, RemovalCause cause) -> {
if (future != null) {
future.cancel(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -1299,4 +1300,18 @@ private void deleteDatabaseIfInstanceNotExists(Long connectionId, OrganizationTy
}

}

public <T> void assignDatabaseById(List<T> content, Function<T, Long> databaseIdProvider,
BiConsumer<T, Database> databaseSetter) {
List<Long> databaseIds = content.stream().map(databaseIdProvider).collect(
Collectors.toList());
List<DatabaseEntity> entities = databaseRepository.findByIdIn(databaseIds);
List<Database> databases = entitiesToModels(new PageImpl<>(entities), false).getContent();
Map<Long, Database> idDatabaseEntityMap = databases.stream().collect(
Collectors.toMap(Database::getId, t -> t));
content.forEach(c -> {
Database database = idDatabaseEntityMap.get(databaseIdProvider.apply(c));
databaseSetter.accept(c, database);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public void storeTaskGenerator(TaskGenerator taskGenerator) throws SQLException
ps.setString(9, taskGenerator.getPartitionSavePoint());
ps.setString(10, JsonUtils.toJson(taskGenerator.getPartName2MinKey()));
ps.setString(11, JsonUtils.toJson(taskGenerator.getPartName2MaxKey()));
ps.execute();
}
}
}
Expand Down Expand Up @@ -226,6 +227,7 @@ public void storeTaskMeta(TaskMeta taskMeta) throws SQLException {
ps.setString(7,
taskMeta.getCursorPrimaryKey() == null ? "" : taskMeta.getCursorPrimaryKey().toSqlString());
ps.setString(8, taskMeta.getPartitionName());
ps.execute();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ public List<DlmTableUnit> findByScheduleTaskId(Long scheduleTaskId) {
@SkipAuthorize("odc internal usage")
public TaskStatus getFinalTaskStatus(Long scheduleTaskId) {
List<DlmTableUnit> dlmTableUnits = findByScheduleTaskId(scheduleTaskId);
return getFinalTaskStatus(dlmTableUnits);
}

@SkipAuthorize("odc internal usage")
public TaskStatus getFinalTaskStatus(List<DlmTableUnit> dlmTableUnits) {
Set<TaskStatus> collect = dlmTableUnits.stream().map(DlmTableUnit::getStatus).collect(
Collectors.toSet());
// If the tables do not exist or any table fails, the task is considered a failure.
Expand All @@ -148,4 +153,5 @@ public TaskStatus getFinalTaskStatus(Long scheduleTaskId) {
return TaskStatus.DONE;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ public ExportRowDataReader<JsonNode> getRowDataReader() throws IOException {
ObjectMapper objectMapper = new ObjectMapper(jsonFactory);
File configJson = getConfigJson(this.tempFilePath);
Verify.notNull(configJson, "Invalid archived file");
try (InputStream inputStream = Files.newInputStream(configJson.toPath())) {
InputStream inputStream = null;
try {
inputStream = Files.newInputStream(configJson.toPath());
JsonParser jsonParser = jsonFactory.createParser(inputStream);

JsonToken jsonToken = jsonParser.nextToken();
Expand Down Expand Up @@ -177,6 +179,12 @@ public ExportRowDataReader<JsonNode> getRowDataReader() throws IOException {
throw new IllegalStateException("Expected data to be an Array");
}
return new JsonRowDataReader(properties, jsonParser, objectMapper, exportedFile.getSecret(), tempFilePath);
} catch (Exception e) {
if (inputStream != null) {
inputStream.close();
}
log.error("Get row data reader failed", e);
throw e;
}
}

Expand Down Expand Up @@ -231,12 +239,19 @@ public ExportProperties getProperties() {

@Override
public <R extends Encryptable> R readRow(Class<R> rowDataClass) throws IOException {
JsonNode jsonNode = readRow();
R rowData = objectMapper.convertValue(jsonNode, rowDataClass);
if (rowData != null && encryptKey != null) {
rowData.decrypt(encryptKey);
try {
JsonNode jsonNode = readRow();
R rowData = objectMapper.convertValue(jsonNode, rowDataClass);
log.info("read rowData={}", rowData);
if (rowData != null && encryptKey != null) {
rowData.decrypt(encryptKey);
}
return rowData;
} catch (Exception e) {
log.error("read row failed", e);
throw e;
}
return rowData;

}

@Override
Expand Down
Loading