Skip to content

Commit 9de195e

Browse files
authored
[feature][core]: Add YAML job config support (#1408)
- Parse .yaml/.yml job files via SnakeYAML while keeping existing JSON flow - Add SnakeYAML dependency and version property - Add YAML equivalents for job examples under core/src/main/job
1 parent 35922d5 commit 9de195e

32 files changed

+1842
-6
lines changed

core/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@
4949
<artifactId>fastjson2</artifactId>
5050
</dependency>
5151

52+
<dependency>
53+
<groupId>org.yaml</groupId>
54+
<artifactId>snakeyaml</artifactId>
55+
<version>${snakeyaml.version}</version>
56+
</dependency>
57+
5258
<dependency>
5359
<groupId>commons-cli</groupId>
5460
<artifactId>commons-cli</artifactId>

core/src/main/java/com/wgzhao/addax/core/util/ConfigParser.java

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,7 @@
2323

2424
import java.io.File;
2525
import java.io.IOException;
26-
import java.net.http.HttpClient;
27-
import java.net.http.HttpRequest;
28-
import java.net.http.HttpResponse;
2926
import java.nio.charset.StandardCharsets;
30-
import java.time.Duration;
3127
import java.util.ArrayList;
3228
import java.util.HashSet;
3329
import java.util.List;
@@ -36,16 +32,19 @@
3632

3733
import org.apache.commons.io.FileUtils;
3834
import org.apache.commons.lang3.StringUtils;
35+
import org.apache.commons.lang3.Strings;
3936
import org.slf4j.Logger;
4037
import org.slf4j.LoggerFactory;
38+
import org.yaml.snakeyaml.LoaderOptions;
39+
import org.yaml.snakeyaml.Yaml;
40+
import org.yaml.snakeyaml.constructor.SafeConstructor;
41+
import org.yaml.snakeyaml.error.YAMLException;
4142

4243
import static com.wgzhao.addax.core.base.Key.CONNECTION;
4344
import static com.wgzhao.addax.core.spi.ErrorCode.CONFIG_ERROR;
44-
import static com.wgzhao.addax.core.spi.ErrorCode.IO_ERROR;
4545
import static com.wgzhao.addax.core.spi.ErrorCode.PLUGIN_INIT_ERROR;
4646
import static com.wgzhao.addax.core.spi.ErrorCode.REQUIRED_VALUE;
4747
import static com.wgzhao.addax.core.util.container.CoreConstant.CONF_PATH;
48-
import static com.wgzhao.addax.core.util.container.CoreConstant.CORE_SERVER_TIMEOUT_SEC;
4948
import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_CONTENT;
5049
import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_CONTENT_READER;
5150
import static com.wgzhao.addax.core.util.container.CoreConstant.JOB_CONTENT_READER_NAME;
@@ -161,9 +160,43 @@ private static Configuration parseCoreConfig()
161160
public static Configuration parseJobConfig(String path)
162161
{
163162
String jobContent = getJobContent(path);
163+
if (isYamlPath(path)) {
164+
return parseYamlJobConfig(jobContent, path);
165+
}
164166
return Configuration.from(jobContent);
165167
}
166168

169+
private static boolean isYamlPath(String path)
170+
{
171+
return Strings.CI.endsWith(path, ".yaml") || Strings.CI.endsWith(path, ".yml");
172+
}
173+
174+
private static Configuration parseYamlJobConfig(String jobContent, String path)
175+
{
176+
if (StringUtils.isBlank(jobContent)) {
177+
throw AddaxException.asAddaxException(CONFIG_ERROR, "The configure file is empty.");
178+
}
179+
try {
180+
Yaml yaml = new Yaml(new SafeConstructor(new LoaderOptions()));
181+
Object yamlObject = yaml.load(jobContent);
182+
if (yamlObject == null) {
183+
throw AddaxException.asAddaxException(CONFIG_ERROR, "The configure file is empty.");
184+
}
185+
if (yamlObject instanceof Map) {
186+
return Configuration.from((Map<String, Object>) yamlObject);
187+
}
188+
if (yamlObject instanceof List) {
189+
return Configuration.from((List<Object>) yamlObject);
190+
}
191+
throw AddaxException.asAddaxException(CONFIG_ERROR,
192+
"The configuration is incorrect. The configuration you provided is not in valid YAML format: top-level node must be a map or list.");
193+
}
194+
catch (YAMLException e) {
195+
throw AddaxException.asAddaxException(CONFIG_ERROR,
196+
String.format("The configuration is incorrect. The configuration you provided is not in valid YAML format: %s.", e.getMessage()));
197+
}
198+
}
199+
167200
private static String getJobContent(String jobResource)
168201
{
169202
String jobContent;
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
job:
2+
setting:
3+
speed:
4+
channel: 3
5+
errorLimit:
6+
record: 0
7+
percentage: 0.02
8+
content:
9+
reader:
10+
name: clickhousereader
11+
parameter:
12+
username: default
13+
password: ""
14+
column:
15+
- "*"
16+
connection:
17+
jdbcUrl: jdbc:clickhouse://127.0.0.1:8123/tpch
18+
querySql:
19+
- select * from orders limit 10
20+
writer:
21+
name: streamwriter
22+
parameter:
23+
print: true

core/src/main/job/dbf2hdfs.yaml

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
job:
2+
content:
3+
reader:
4+
name: dbfreader
5+
parameter:
6+
column:
7+
-
8+
index: 0
9+
type: string
10+
-
11+
index: 1
12+
type: string
13+
-
14+
index: 2
15+
type: double
16+
-
17+
index: 3
18+
type: double
19+
-
20+
index: 4
21+
type: double
22+
-
23+
index: 5
24+
type: double
25+
-
26+
index: 6
27+
type: double
28+
-
29+
index: 7
30+
type: double
31+
-
32+
index: 8
33+
type: double
34+
-
35+
index: 9
36+
type: double
37+
-
38+
index: 10
39+
type: double
40+
-
41+
index: 11
42+
type: double
43+
-
44+
index: 12
45+
type: double
46+
-
47+
index: 13
48+
type: double
49+
-
50+
index: 14
51+
type: double
52+
-
53+
index: 15
54+
type: double
55+
-
56+
index: 16
57+
type: double
58+
-
59+
index: 17
60+
type: double
61+
-
62+
index: 18
63+
type: double
64+
-
65+
index: 19
66+
type: double
67+
-
68+
index: 20
69+
type: double
70+
-
71+
index: 21
72+
type: double
73+
-
74+
index: 22
75+
type: double
76+
-
77+
index: 23
78+
type: double
79+
-
80+
index: 24
81+
type: double
82+
-
83+
index: 25
84+
type: double
85+
-
86+
index: 26
87+
type: double
88+
-
89+
index: 27
90+
type: double
91+
-
92+
index: 28
93+
type: double
94+
-
95+
index: 29
96+
type: double
97+
path: /tmp/show2003.dbf
98+
encoding: GBK
99+
writer:
100+
name: hdfswriter
101+
parameter:
102+
column:
103+
-
104+
name: s1
105+
type: string
106+
-
107+
name: s2
108+
type: string
109+
-
110+
name: s3
111+
type: decimal
112+
-
113+
name: s4
114+
type: decimal
115+
-
116+
name: s5
117+
type: decimal
118+
-
119+
name: s6
120+
type: decimal
121+
-
122+
name: s7
123+
type: decimal
124+
-
125+
name: s8
126+
type: decimal
127+
-
128+
name: s9
129+
type: decimal
130+
-
131+
name: s10
132+
type: decimal
133+
-
134+
name: s11
135+
type: decimal
136+
-
137+
name: s13
138+
type: decimal
139+
-
140+
name: s15
141+
type: decimal
142+
-
143+
name: s16
144+
type: decimal
145+
-
146+
name: s17
147+
type: decimal
148+
-
149+
name: s18
150+
type: decimal
151+
-
152+
name: s19
153+
type: decimal
154+
-
155+
name: s21
156+
type: decimal
157+
-
158+
name: s22
159+
type: decimal
160+
-
161+
name: s23
162+
type: decimal
163+
-
164+
name: s24
165+
type: decimal
166+
-
167+
name: s25
168+
type: decimal
169+
-
170+
name: s26
171+
type: decimal
172+
-
173+
name: s27
174+
type: decimal
175+
-
176+
name: s28
177+
type: decimal
178+
-
179+
name: s29
180+
type: decimal
181+
-
182+
name: s30
183+
type: decimal
184+
-
185+
name: s31
186+
type: decimal
187+
-
188+
name: s32
189+
type: decimal
190+
-
191+
name: s33
192+
type: decimal
193+
compress: SNAPPY
194+
defaultFS: hdfs://sandbox
195+
fieldDelimiter: "\t"
196+
fileName: dbf_orc
197+
fileType: orc
198+
path: /tmp/dbftest
199+
writeMode: overwrite
200+
haveKerberos: "true"
201+
kerberosKeytabFilePath: /etc/security/keytabs/hdfs.headless.keytab
202+
kerberosPrincipal: hdfs-sandbox@SANDBOX.COM
203+
hadoopConfig:
204+
dfs.nameservices: sandbox
205+
dfs.ha.namenodes.sandbox: nn1,nn2
206+
dfs.namenode.rpc-address.sandbox.nn1: hdp1.sandbox.com:8020
207+
dfs.namenode.rpc-address.sandbox.nn2: hdp2.sandbox.com:8020
208+
dfs.client.failover.proxy.provider.sandbox: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
209+
setting:
210+
speed:
211+
channel: 2
212+
byte: -1
213+
record: -1
214+
batchSize: 4096

0 commit comments

Comments
 (0)