Skip to content

Commit 5da992d

Browse files
authored
feature:增加obs归集方式 (#90)
* feature:实现通过datax进行obs归集的插件 * feature:前端增加obs归集时的前缀参数
1 parent 04a233b commit 5da992d

File tree

13 files changed

+692
-1
lines changed

13 files changed

+692
-1
lines changed

frontend/src/pages/DataCollection/Create/CreateTask.tsx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,13 @@ export default function CollectionTaskCreate() {
314314
placeholder="Secret Key"
315315
/>
316316
</Form.Item>
317+
<Form.Item
318+
name={["config", "prefix"]}
319+
rules={[{ required: true }]}
320+
label="Prefix"
321+
>
322+
<Input className="h-8 text-xs" placeholder="Prefix" />
323+
</Form.Item>
317324
</div>
318325
)}
319326

runtime/datax/obsreader/pom.xml

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>com.alibaba.datax</groupId>
8+
<artifactId>datax-all</artifactId>
9+
<version>0.0.1-SNAPSHOT</version>
10+
</parent>
11+
12+
<artifactId>obsreader</artifactId>
13+
14+
<properties>
15+
<maven.compiler.source>21</maven.compiler.source>
16+
<maven.compiler.target>21</maven.compiler.target>
17+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
18+
</properties>
19+
20+
<dependencies>
21+
<dependency>
22+
<groupId>com.alibaba.datax</groupId>
23+
<artifactId>datax-core</artifactId>
24+
<version>${datax-project-version}</version>
25+
</dependency>
26+
<dependency>
27+
<groupId>com.alibaba.datax</groupId>
28+
<artifactId>datax-common</artifactId>
29+
<version>${datax-project-version}</version>
30+
</dependency>
31+
<dependency>
32+
<groupId>org.slf4j</groupId>
33+
<artifactId>slf4j-api</artifactId>
34+
</dependency>
35+
36+
<dependency>
37+
<groupId>ch.qos.logback</groupId>
38+
<artifactId>logback-classic</artifactId>
39+
</dependency>
40+
41+
<dependency>
42+
<groupId>software.amazon.awssdk</groupId>
43+
<artifactId>s3</artifactId>
44+
</dependency>
45+
</dependencies>
46+
47+
<build>
48+
<resources>
49+
<resource>
50+
<directory>src/main/java</directory>
51+
<includes>
52+
<include>**/*.properties</include>
53+
</includes>
54+
</resource>
55+
</resources>
56+
<plugins>
57+
<!-- compiler plugin -->
58+
<plugin>
59+
<artifactId>maven-compiler-plugin</artifactId>
60+
<configuration>
61+
<source>${jdk-version}</source>
62+
<target>${jdk-version}</target>
63+
<encoding>${project-sourceEncoding}</encoding>
64+
</configuration>
65+
</plugin>
66+
<!-- assembly plugin -->
67+
<plugin>
68+
<artifactId>maven-assembly-plugin</artifactId>
69+
<configuration>
70+
<descriptors>
71+
<descriptor>src/main/assembly/package.xml</descriptor>
72+
</descriptors>
73+
<finalName>datax</finalName>
74+
</configuration>
75+
<executions>
76+
<execution>
77+
<id>dwzip</id>
78+
<phase>package</phase>
79+
<goals>
80+
<goal>single</goal>
81+
</goals>
82+
</execution>
83+
</executions>
84+
</plugin>
85+
</plugins>
86+
</build>
87+
88+
</project>
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<assembly
2+
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
5+
<id></id>
6+
<formats>
7+
<format>dir</format>
8+
</formats>
9+
<includeBaseDirectory>false</includeBaseDirectory>
10+
<fileSets>
11+
<fileSet>
12+
<directory>src/main/resources</directory>
13+
<includes>
14+
<include>plugin.json</include>
15+
<include>plugin_job_template.json</include>
16+
</includes>
17+
<outputDirectory>plugin/reader/obsreader</outputDirectory>
18+
</fileSet>
19+
<fileSet>
20+
<directory>target/</directory>
21+
<includes>
22+
<include>obsreader-0.0.1-SNAPSHOT.jar</include>
23+
</includes>
24+
<outputDirectory>plugin/reader/obsreader</outputDirectory>
25+
</fileSet>
26+
</fileSets>
27+
28+
<dependencySets>
29+
<dependencySet>
30+
<useProjectArtifact>false</useProjectArtifact>
31+
<outputDirectory>plugin/reader/obsreader/libs</outputDirectory>
32+
<scope>runtime</scope>
33+
</dependencySet>
34+
</dependencySets>
35+
</assembly>
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
package com.datamate.plugin.reader.obsreader;
2+
3+
import java.io.IOException;
4+
import java.net.URI;
5+
import java.nio.file.Files;
6+
import java.nio.file.Path;
7+
import java.nio.file.Paths;
8+
import java.util.ArrayList;
9+
import java.util.Collections;
10+
import java.util.HashSet;
11+
import java.util.List;
12+
import java.util.Set;
13+
import java.util.stream.Collectors;
14+
15+
import com.alibaba.datax.common.element.Record;
16+
import com.alibaba.datax.common.element.StringColumn;
17+
import com.alibaba.datax.common.exception.CommonErrorCode;
18+
import com.alibaba.datax.common.exception.DataXException;
19+
import com.alibaba.datax.common.plugin.RecordSender;
20+
import com.alibaba.datax.common.spi.Reader;
21+
import com.alibaba.datax.common.util.Configuration;
22+
23+
import org.apache.commons.lang3.StringUtils;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
28+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
29+
import software.amazon.awssdk.core.sync.ResponseTransformer;
30+
import software.amazon.awssdk.regions.Region;
31+
import software.amazon.awssdk.services.s3.S3Client;
32+
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
33+
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
34+
import software.amazon.awssdk.services.s3.model.S3Object;
35+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
36+
import software.amazon.awssdk.services.s3.S3Configuration;
37+
38+
public class ObsReader extends Reader {
39+
40+
private static final Logger LOG = LoggerFactory.getLogger(ObsReader.class);
41+
42+
public static class Job extends Reader.Job {
43+
private Configuration jobConfig = null;
44+
45+
@Override
46+
public void init() {
47+
this.jobConfig = super.getPluginJobConf();
48+
}
49+
50+
@Override
51+
public void prepare() {
52+
}
53+
54+
@Override
55+
public List<Configuration> split(int adviceNumber) {
56+
return Collections.singletonList(this.jobConfig);
57+
}
58+
59+
@Override
60+
public void post() {
61+
}
62+
63+
@Override
64+
public void destroy() {
65+
}
66+
}
67+
68+
public static class Task extends Reader.Task {
69+
70+
private Configuration jobConfig;
71+
private Set<String> fileType;
72+
private String endpoint;
73+
private String accessKey;
74+
private String secretKey;
75+
private String bucket;
76+
private String prefix;
77+
private S3Client s3;
78+
private String effectivePrefix;
79+
80+
@Override
81+
public void init() {
82+
this.jobConfig = super.getPluginJobConf();
83+
this.fileType = new HashSet<>(this.jobConfig.getList("fileType", Collections.emptyList(), String.class));
84+
this.endpoint = this.jobConfig.getString("endpoint");
85+
this.accessKey = this.jobConfig.getString("accessKey");
86+
this.secretKey = this.jobConfig.getString("secretKey");
87+
this.bucket = this.jobConfig.getString("bucket");
88+
this.prefix = this.jobConfig.getString("prefix");
89+
this.s3 = getS3Client();
90+
this.effectivePrefix = getEffectivePrefix();
91+
}
92+
93+
@Override
94+
public void startRead(RecordSender recordSender) {
95+
try {
96+
List<String> files = listFiles().stream()
97+
.filter(file -> fileType.isEmpty() || fileType.contains(getFileSuffix(file)))
98+
.collect(Collectors.toList());
99+
files.forEach(filePath -> {
100+
Record record = recordSender.createRecord();
101+
record.addColumn(new StringColumn(filePath));
102+
recordSender.sendToWriter(record);
103+
});
104+
this.jobConfig.set("columnNumber", 1);
105+
} catch (Exception e) {
106+
LOG.error("Error reading files from obs file system: {}", this.endpoint, e);
107+
throw new RuntimeException(e);
108+
}
109+
}
110+
111+
/**
112+
* 使用 AWS SDK v2 列举 S3/OBS 对象并将对象下载到 /dataset/local/。
113+
* 非递归:只列举 prefix 当前目录下的对象(通过 delimiter="/" 实现)。
114+
* 返回对象 key 列表(下载后文件名为 key 的最后一段)。
115+
*/
116+
private List<String> listFiles() throws Exception {
117+
if (StringUtils.isBlank(endpoint) || StringUtils.isBlank(bucket)) {
118+
throw new IllegalArgumentException("endpoint and bucket must be provided");
119+
}
120+
List<String> keys = new ArrayList<>();
121+
String continuationToken = null;
122+
try {
123+
do {
124+
ListObjectsV2Request.Builder reqBuilder = ListObjectsV2Request.builder()
125+
.bucket(bucket)
126+
.prefix(effectivePrefix)
127+
.delimiter("/"); // 非递归:只列出当前目录下的对象(不下钻子目录)
128+
if (continuationToken != null) reqBuilder.continuationToken(continuationToken);
129+
ListObjectsV2Response res = s3.listObjectsV2(reqBuilder.build());
130+
for (S3Object obj : res.contents()) {
131+
String key = obj.key();
132+
if (isInValid(key)) continue;
133+
// 到此认为是“文件”key(且位于 prefix 当前目录)
134+
keys.add(key);
135+
}
136+
continuationToken = res.isTruncated() ? res.nextContinuationToken() : null;
137+
} while (continuationToken != null);
138+
} catch (Exception e) {
139+
LOG.warn("Failed to build S3 client or read object: {}", e.getMessage(), e);
140+
// 保持原行为,对下载失败记录 warn,但不抛出新的运行时错误(外层会捕获)
141+
}
142+
return keys;
143+
}
144+
145+
private boolean isInValid(String key) {
146+
// 仅接受以 effectivePrefix 开头的 key(请求通常已保证),并排除目录占位符
147+
if (!effectivePrefix.isEmpty() && !key.startsWith(effectivePrefix)) {
148+
return true;
149+
}
150+
if (key.equals(effectivePrefix) || key.endsWith("/")) {
151+
// 这是一个目录占位符或与 prefix 相同,跳过
152+
return true;
153+
}
154+
return false;
155+
}
156+
157+
private String getEffectivePrefix() {
158+
// 规范化 prefix:去掉前导 '/',并确保以 '/' 结尾以表示目录前缀(如果非空)
159+
String effectivePrefix = "";
160+
if (prefix != null) {
161+
effectivePrefix = prefix.startsWith("/") ? prefix.substring(1) : prefix;
162+
if (!effectivePrefix.isEmpty() && !effectivePrefix.endsWith("/")) {
163+
effectivePrefix = effectivePrefix + "/";
164+
}
165+
}
166+
return effectivePrefix;
167+
}
168+
169+
private S3Client getS3Client() {
170+
try {
171+
AwsBasicCredentials creds = AwsBasicCredentials.create(accessKey, secretKey);
172+
S3Configuration serviceConfig = S3Configuration.builder()
173+
.pathStyleAccessEnabled(true)
174+
.build();
175+
return S3Client.builder()
176+
.endpointOverride(new URI(endpoint))
177+
.region(Region.of("us-east-1"))
178+
.serviceConfiguration(serviceConfig)
179+
.credentialsProvider(StaticCredentialsProvider.create(creds))
180+
.build();
181+
} catch (Exception e) {
182+
LOG.error("Error init s3 client: {}", this.endpoint, e);
183+
throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, e);
184+
}
185+
}
186+
187+
private String getFileSuffix(String key) {
188+
String fileName = Paths.get(key).getFileName().toString();
189+
int lastDotIndex = fileName.lastIndexOf('.');
190+
if (lastDotIndex == -1 || lastDotIndex == fileName.length() - 1) {
191+
return "";
192+
}
193+
return fileName.substring(lastDotIndex + 1);
194+
}
195+
196+
@Override
197+
public void destroy() {
198+
if (s3 != null) {
199+
try {
200+
s3.close();
201+
} catch (Exception ignore) {
202+
}
203+
}
204+
}
205+
}
206+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"name": "obsreader",
3+
"class": "com.datamate.plugin.reader.obsreader.ObsReader",
4+
"description": "read from obs file system",
5+
"developer": "datamate"
6+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"name": "obsreader",
3+
"parameter": {
4+
"endpoint": "127.0.0.1",
5+
"bucket": "test",
6+
"accessKey": "ak-xxx",
7+
"secretKey": "sk-xxx",
8+
"prefix": "/test"
9+
}
10+
}

0 commit comments

Comments
 (0)