Skip to content

Commit 728cb78

Browse files
[Improvement-17010][datax] Add datax channel count to a custom parameter. (#17898)
1 parent 05dc4b5 commit 728cb78

File tree

8 files changed

+70
-19
lines changed

8 files changed

+70
-19
lines changed

dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ public class DataxParameters extends AbstractParameters {
9595
*/
9696
private int jobSpeedRecord;
9797

98+
/**
99+
* datax channel
100+
*/
101+
private int jobChannel;
102+
98103
/**
99104
* Xms memory
100105
*/
@@ -206,6 +211,14 @@ public void setJobSpeedRecord(int jobSpeedRecord) {
206211
this.jobSpeedRecord = jobSpeedRecord;
207212
}
208213

214+
public int getJobChannel() {
215+
return jobChannel;
216+
}
217+
218+
public void setJobChannel(int jobChannel) {
219+
this.jobChannel = jobChannel;
220+
}
221+
209222
public int getXms() {
210223
return xms;
211224
}
@@ -249,23 +262,24 @@ public List<ResourceInfo> getResourceFilesList() {
249262

250263
@Override
251264
public String toString() {
252-
return "DataxParameters{"
253-
+ "customConfig=" + customConfig
254-
+ ", json='" + json + '\''
255-
+ ", dsType='" + dsType + '\''
256-
+ ", dataSource=" + dataSource
257-
+ ", dtType='" + dtType + '\''
258-
+ ", dataTarget=" + dataTarget
259-
+ ", sql='" + sql + '\''
260-
+ ", targetTable='" + targetTable + '\''
261-
+ ", preStatements=" + preStatements
262-
+ ", postStatements=" + postStatements
263-
+ ", jobSpeedByte=" + jobSpeedByte
264-
+ ", jobSpeedRecord=" + jobSpeedRecord
265-
+ ", xms=" + xms
266-
+ ", xmx=" + xmx
267-
+ ", resourceList=" + JSONUtils.toJsonString(resourceList)
268-
+ '}';
265+
return "DataxParameters{" +
266+
"customConfig=" + customConfig +
267+
", json='" + json + '\'' +
268+
", dsType='" + dsType + '\'' +
269+
", dataSource=" + dataSource +
270+
", dtType='" + dtType + '\'' +
271+
", dataTarget=" + dataTarget +
272+
", sql='" + sql + '\'' +
273+
", targetTable='" + targetTable + '\'' +
274+
", preStatements=" + preStatements +
275+
", postStatements=" + postStatements +
276+
", jobSpeedByte=" + jobSpeedByte +
277+
", jobSpeedRecord=" + jobSpeedRecord +
278+
", jobChannel=" + jobChannel +
279+
", xms=" + xms +
280+
", xmx=" + xmx +
281+
", resourceList=" + JSONUtils.toJsonString(resourceList) +
282+
'}';
269283
}
270284

271285
@Override

dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import java.util.ArrayList;
5656
import java.util.List;
5757
import java.util.Map;
58+
import java.util.Optional;
5859
import java.util.concurrent.ExecutionException;
5960

6061
import lombok.extern.slf4j.Slf4j;
@@ -309,7 +310,7 @@ private ObjectNode buildDataxJobSettingJson() {
309310

310311
ObjectNode speed = JSONUtils.createObjectNode();
311312

312-
speed.put("channel", DATAX_CHANNEL_COUNT);
313+
speed.put("channel", Optional.of(dataXParameters.getJobChannel()).orElse(DATAX_CHANNEL_COUNT));
313314

314315
if (dataXParameters.getJobSpeedByte() > 0) {
315316
speed.put("byte", dataXParameters.getJobSpeedByte());
@@ -333,7 +334,7 @@ private ObjectNode buildDataxJobSettingJson() {
333334
private ObjectNode buildDataxCoreJson() {
334335

335336
ObjectNode speed = JSONUtils.createObjectNode();
336-
speed.put("channel", DATAX_CHANNEL_COUNT);
337+
speed.put("channel", Optional.of(dataXParameters.getJobChannel()).orElse(DATAX_CHANNEL_COUNT));
337338

338339
if (dataXParameters.getJobSpeedByte() > 0) {
339340
speed.put("byte", dataXParameters.getJobSpeedByte());

dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParametersTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public void testToString() {
7070
dataxParameters.setDtType("MYSQL");
7171
dataxParameters.setJobSpeedByte(1);
7272
dataxParameters.setJobSpeedRecord(1);
73+
dataxParameters.setJobChannel(1);
7374
dataxParameters.setJson("json");
7475
dataxParameters.setResourceList(resourceInfoList);
7576

@@ -87,6 +88,7 @@ public void testToString() {
8788
+ "postStatements=null, "
8889
+ "jobSpeedByte=1, "
8990
+ "jobSpeedRecord=1, "
91+
+ "jobChannel=1, "
9092
+ "xms=0, "
9193
+ "xmx=-100, "
9294
+ "resourceList=[{\"id\":null,\"resourceName\":\"/hdfs.keytab\",\"res\":null}]"

dolphinscheduler-ui/src/locales/en_US/project.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,7 @@ export default {
648648
datax_job_speed_byte_info: '(0 means unlimited)',
649649
datax_job_speed_record: 'Speed(Record count)',
650650
datax_job_speed_record_info: '(0 means unlimited)',
651+
datax_job_channel: 'datax channel',
651652
datax_job_runtime_memory: 'Runtime Memory Limits',
652653
datax_job_runtime_memory_xms: 'Low Limit Value',
653654
datax_job_runtime_memory_xmx: 'High Limit Value',

dolphinscheduler-ui/src/locales/zh_CN/project.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,7 @@ export default {
629629
datax_job_speed_byte_info: '(KB,0代表不限制)',
630630
datax_job_speed_record: '限流(记录数)',
631631
datax_job_speed_record_info: '(0代表不限制)',
632+
datax_job_channel: '数据管道数',
632633
datax_job_runtime_memory: '运行内存',
633634
datax_job_runtime_memory_xms: '最小内存',
634635
datax_job_runtime_memory_xmx: '最大内存',

dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,28 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] {
7878
value: 3000
7979
}
8080
]
81+
const jobChannelOptions: any[] = [
82+
{
83+
label: '1',
84+
value: 1
85+
},
86+
{
87+
label: '3',
88+
value: 3
89+
},
90+
{
91+
label: '5',
92+
value: 5
93+
},
94+
{
95+
label: '10',
96+
value: 10
97+
},
98+
{
99+
label: '15',
100+
value: 15
101+
}
102+
]
81103
const memoryLimitOptions = [
82104
{
83105
label: '1G',
@@ -264,6 +286,14 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] {
264286
options: memoryLimitOptions,
265287
value: 1
266288
},
289+
{
290+
type: 'input',
291+
field: 'jobChannel',
292+
name: t('project.node.datax_job_channel'),
293+
span: jobSpeedSpan,
294+
options: jobChannelOptions,
295+
value: 1
296+
},
267297
...useCustomParams({ model, field: 'localParams', isSimple: true })
268298
]
269299
}

dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ export function formatParams(data: INodeData): {
274274
taskParams.targetTable = data.targetTable
275275
taskParams.jobSpeedByte = data.jobSpeedByte
276276
taskParams.jobSpeedRecord = data.jobSpeedRecord
277+
taskParams.jobChannel = data.jobChannel
277278
taskParams.preStatements = data.preStatements
278279
taskParams.postStatements = data.postStatements
279280
} else {

dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ interface ITaskParams {
340340
targetTable?: string
341341
jobSpeedByte?: number
342342
jobSpeedRecord?: number
343+
jobChannel?: number
343344
xms?: number
344345
xmx?: number
345346
sparkParameters?: ISparkParameters

0 commit comments

Comments
 (0)