Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 0 additions & 29 deletions backend/services/data-collection-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,35 +127,6 @@

<build>
<plugins>
<!-- OpenAPI Generator Plugin -->
<plugin>
<groupId>org.openapitools</groupId>
<artifactId>openapi-generator-maven-plugin</artifactId>
<version>6.6.0</version>
<executions>
<execution>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<inputSpec>${project.basedir}/../../openapi/specs/data-collection.yaml</inputSpec>
<generatorName>spring</generatorName>
<output>${project.build.directory}/generated-sources/openapi</output>
<apiPackage>com.datamate.collection.interfaces.api</apiPackage>
<modelPackage>com.datamate.collection.interfaces.dto</modelPackage>
<configOptions>
<interfaceOnly>true</interfaceOnly>
<useTags>true</useTags>
<useSpringBoot3>true</useSpringBoot3>
<documentationProvider>springdoc</documentationProvider>
<dateLibrary>java8-localdatetime</dateLibrary>
<java8>true</java8>
</configOptions>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.datamate.collection.application;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.domain.model.entity.TaskExecution;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.collection.domain.repository.CollectionTaskRepository;
import com.datamate.collection.interfaces.dto.CollectionTaskPagingQuery;
import com.datamate.collection.common.enums.SyncMode;
import com.datamate.common.domain.utils.ChunksSaver;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;

@Slf4j
@Service
@RequiredArgsConstructor
public class CollectionTaskService {
private final TaskExecutionService taskExecutionService;
private final CollectionTaskRepository collectionTaskRepository;

@Transactional
public CollectionTask create(CollectionTask task) {
task.setStatus(TaskStatus.READY);
task.setCreatedAt(LocalDateTime.now());
task.setUpdatedAt(LocalDateTime.now());
collectionTaskRepository.save(task);
executeTaskNow(task);
return task;
}

private void executeTaskNow(CollectionTask task) {
if (Objects.equals(task.getSyncMode(), SyncMode.ONCE)) {
TaskExecution exec = taskExecutionService.createExecution(task);
int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds();
taskExecutionService.runAsync(task, exec.getId(), timeout);
log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), LocalDateTime.now(), exec.getId());
}
}

@Transactional
public CollectionTask update(CollectionTask task) {
task.setUpdatedAt(LocalDateTime.now());
collectionTaskRepository.updateById(task);
return task;
}

@Transactional
public void delete(String id) {
CollectionTask task = collectionTaskRepository.getById(id);
if (task != null) {
ChunksSaver.deleteFolder("/dataset/local/" + task.getId());
}
collectionTaskRepository.removeById(id);
}

public CollectionTask get(String id) {
return collectionTaskRepository.getById(id);
}

public IPage<CollectionTask> getTasks(CollectionTaskPagingQuery query) {
LambdaQueryWrapper<CollectionTask> wrapper = new LambdaQueryWrapper<CollectionTask>()
.eq(query.getStatus() != null, CollectionTask::getStatus, query.getStatus())
.like(StringUtils.isNotBlank(query.getName()), CollectionTask::getName, query.getName());
return collectionTaskRepository.page(new Page<>(query.getPage(), query.getSize()), wrapper);
}

public List<CollectionTask> selectActiveTasks() {
return collectionTaskRepository.selectActiveTasks();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.datamate.collection.application;

import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.domain.model.entity.TaskExecution;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.collection.domain.process.ProcessRunner;
import com.datamate.collection.domain.repository.CollectionTaskRepository;
import com.datamate.collection.domain.repository.TaskExecutionRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;

@Slf4j
@Service
@RequiredArgsConstructor
public class TaskExecutionService {
private final ProcessRunner processRunner;
private final TaskExecutionRepository executionRepository;
private final CollectionTaskRepository collectionTaskRepository;


@Transactional
public TaskExecution createExecution(CollectionTask task) {
TaskExecution exec = TaskExecution.initTaskExecution();
exec.setTaskId(task.getId());
exec.setTaskName(task.getName());
executionRepository.save(exec);
collectionTaskRepository.updateLastExecution(task.getId(), exec.getId());
collectionTaskRepository.updateStatus(task.getId(), TaskStatus.RUNNING.name());
return exec;
}

public TaskExecution selectLatestByTaskId(String taskId) {
return executionRepository.selectLatestByTaskId(taskId);
}

@Async
public void runAsync(CollectionTask task, String executionId, int timeoutSeconds) {
try {
int code = processRunner.runJob(task, executionId, timeoutSeconds);
log.info("DataX finished with code {} for execution {}", code, executionId);
// 简化:成功即完成
executionRepository.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(),
0, 0L, 0L, 0L, null);
collectionTaskRepository.updateStatus(task.getId(), TaskStatus.SUCCESS.name());
} catch (Exception e) {
log.error("DataX execution failed", e);
executionRepository.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(),
0, 0L, 0L, 0L, e.getMessage());
collectionTaskRepository.updateStatus(task.getId(), TaskStatus.FAILED.name());
}
}
}

This file was deleted.

This file was deleted.

Loading