Skip to content

Commit c65b826

Browse files
authored
Pipe: strict check for synonym pipe parameters to avoid ambiguity (apache#14694)
fixup apache#13799 & apache#14487
1 parent b51f63c commit c65b826

File tree

5 files changed

+121
-95
lines changed

5 files changed

+121
-95
lines changed

integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeProtocolIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,6 @@ private void doTestUseNodeUrls(String connectorName) throws Exception {
435435
extractorAttributes.put("database-name", "test.*");
436436
extractorAttributes.put("table-name", "test.*");
437437
extractorAttributes.put("inclusion", "data.insert");
438-
extractorAttributes.put("mode.streaming", "true");
439438
extractorAttributes.put("mode.snapshot", "false");
440439
extractorAttributes.put("mode.strict", "true");
441440

iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameterValidator.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
2424

2525
import java.util.Arrays;
26+
import java.util.Collections;
27+
import java.util.List;
28+
import java.util.stream.Collectors;
29+
import java.util.stream.Stream;
2630

2731
public class PipeParameterValidator {
2832

@@ -36,6 +40,40 @@ public PipeParameters getParameters() {
3640
return parameters;
3741
}
3842

43+
/**
44+
* Validates whether the attributes entered by the user contain at least one attribute from
45+
* lhsAttributes or rhsAttributes (if required), but not both.
46+
*
47+
* @param lhsAttributes list of left-hand side synonym attributes
48+
* @param rhsAttributes list of right-hand side synonym attributes
49+
* @param isRequired specifies whether at least one attribute from lhsAttributes or rhsAttributes
50+
* must be provided
51+
* @throws PipeParameterNotValidException if both lhsAttributes and rhsAttributes are provided
52+
* @throws PipeAttributeNotProvidedException if isRequired is true and neither lhsAttributes nor
53+
* rhsAttributes are provided
54+
* @return the instance of PipeParameterValidator for method chaining
55+
*/
56+
public PipeParameterValidator validateSynonymAttributes(
57+
final List<String> lhsAttributes,
58+
final List<String> rhsAttributes,
59+
final boolean isRequired) {
60+
final boolean lhsExistence = lhsAttributes.stream().anyMatch(parameters::hasAttribute);
61+
final boolean rhsExistence = rhsAttributes.stream().anyMatch(parameters::hasAttribute);
62+
if (lhsExistence && rhsExistence) {
63+
throw new PipeParameterNotValidException(
64+
String.format(
65+
"Cannot specify both %s and %s at the same time", lhsAttributes, rhsAttributes));
66+
}
67+
if (isRequired && !lhsExistence && !rhsExistence) {
68+
throw new PipeAttributeNotProvidedException(
69+
Stream.concat(lhsAttributes.stream(), rhsAttributes.stream())
70+
.collect(
71+
Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList))
72+
.toString());
73+
}
74+
return this;
75+
}
76+
3977
/**
4078
* Validates whether the attributes entered by the user contain an attribute whose key is
4179
* attributeKey.
@@ -83,7 +121,7 @@ public PipeParameterValidator validateAttributeValueRange(
83121
* @throws PipeParameterNotValidException if the given argument is not valid
84122
*/
85123
public PipeParameterValidator validate(
86-
final PipeParameterValidator.SingleObjectValidationRule validationRule,
124+
final SingleObjectValidationRule validationRule,
87125
final String messageToThrow,
88126
final Object argument)
89127
throws PipeParameterNotValidException {
@@ -107,7 +145,7 @@ public interface SingleObjectValidationRule {
107145
* @throws PipeParameterNotValidException if the given arguments are not valid
108146
*/
109147
public PipeParameterValidator validate(
110-
final PipeParameterValidator.MultipleObjectsValidationRule validationRule,
148+
final MultipleObjectsValidationRule validationRule,
111149
final String messageToThrow,
112150
final Object... arguments)
113151
throws PipeParameterNotValidException {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java

Lines changed: 61 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@
6060
import java.util.Objects;
6161
import java.util.concurrent.atomic.AtomicReference;
6262

63+
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_DATABASE_KEY;
64+
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_DATABASE_NAME_KEY;
6365
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
6466
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
6567
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
@@ -77,9 +79,11 @@
7779
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_STRICT_KEY;
7880
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
7981
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_KEY;
82+
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATH_KEY;
8083
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
8184
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_KEY;
8285
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE;
86+
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
8387
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
8488
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
8589
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_KEY;
@@ -91,8 +95,12 @@
9195
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
9296
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE;
9397
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
98+
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_TABLE_KEY;
99+
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_TABLE_NAME_KEY;
94100
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE;
95101
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_WATERMARK_INTERVAL_KEY;
102+
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_DATABASE_KEY;
103+
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_DATABASE_NAME_KEY;
96104
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
97105
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
98106
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
@@ -104,11 +112,15 @@
104112
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_STRICT_KEY;
105113
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY;
106114
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_KEY;
115+
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATH_KEY;
107116
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_FORMAT_KEY;
117+
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
108118
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
109119
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_LOOSE_RANGE_KEY;
110120
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
111121
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
122+
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_TABLE_KEY;
123+
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_TABLE_NAME_KEY;
112124
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_WATERMARK_INTERVAL_KEY;
113125
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant._EXTRACTOR_WATERMARK_INTERVAL_KEY;
114126
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant._SOURCE_WATERMARK_INTERVAL_KEY;
@@ -174,25 +186,22 @@ public void validate(final PipeParameterValidator validator) throws Exception {
174186
&& validator
175187
.getParameters()
176188
.hasAnyAttributes(
177-
PipeExtractorConstant.EXTRACTOR_PATH_KEY,
178-
PipeExtractorConstant.SOURCE_PATH_KEY,
179-
PipeExtractorConstant.EXTRACTOR_PATTERN_KEY,
180-
PipeExtractorConstant.SOURCE_PATTERN_KEY)) {
189+
EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY, EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY)) {
181190
throw new PipeException(
182191
"The pipe cannot extract tree model data when sql dialect is set to table.");
183192
}
184193
if (!isTableModelDataAllowedToBeCaptured
185194
&& validator
186195
.getParameters()
187196
.hasAnyAttributes(
188-
PipeExtractorConstant.EXTRACTOR_DATABASE_NAME_KEY,
189-
PipeExtractorConstant.SOURCE_DATABASE_NAME_KEY,
190-
PipeExtractorConstant.EXTRACTOR_TABLE_NAME_KEY,
191-
PipeExtractorConstant.SOURCE_TABLE_NAME_KEY,
192-
PipeExtractorConstant.EXTRACTOR_DATABASE_KEY,
193-
PipeExtractorConstant.SOURCE_DATABASE_KEY,
194-
PipeExtractorConstant.EXTRACTOR_TABLE_KEY,
195-
PipeExtractorConstant.SOURCE_TABLE_KEY)) {
197+
EXTRACTOR_DATABASE_NAME_KEY,
198+
SOURCE_DATABASE_NAME_KEY,
199+
EXTRACTOR_TABLE_NAME_KEY,
200+
SOURCE_TABLE_NAME_KEY,
201+
EXTRACTOR_DATABASE_KEY,
202+
SOURCE_DATABASE_KEY,
203+
EXTRACTOR_TABLE_KEY,
204+
SOURCE_TABLE_KEY)) {
196205
throw new PipeException(
197206
"The pipe cannot extract table model data when sql dialect is set to tree.");
198207
}
@@ -287,7 +296,7 @@ public void validate(final PipeParameterValidator validator) throws Exception {
287296
EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE);
288297
}
289298

290-
checkInvalidParameters(validator.getParameters());
299+
checkInvalidParameters(validator);
291300

292301
constructHistoricalExtractor();
293302
constructRealtimeExtractor(validator.getParameters());
@@ -319,7 +328,9 @@ private void validatePattern(final TreePattern treePattern, final TablePattern t
319328
}
320329
}
321330

322-
private void checkInvalidParameters(final PipeParameters parameters) {
331+
private void checkInvalidParameters(final PipeParameterValidator validator) {
332+
final PipeParameters parameters = validator.getParameters();
333+
323334
// Enable history and realtime if specifying start-time or end-time
324335
if (parameters.hasAnyAttributes(
325336
SOURCE_START_TIME_KEY,
@@ -343,87 +354,66 @@ private void checkInvalidParameters(final PipeParameters parameters) {
343354
EXTRACTOR_HISTORY_END_TIME_KEY);
344355
}
345356

357+
// Check coexistence of database-name and database
358+
validator.validateSynonymAttributes(
359+
Arrays.asList(EXTRACTOR_DATABASE_NAME_KEY, SOURCE_DATABASE_NAME_KEY),
360+
Arrays.asList(EXTRACTOR_DATABASE_KEY, SOURCE_DATABASE_KEY),
361+
false);
362+
363+
// Check coexistence of table-name and table
364+
validator.validateSynonymAttributes(
365+
Arrays.asList(EXTRACTOR_TABLE_NAME_KEY, SOURCE_TABLE_NAME_KEY),
366+
Arrays.asList(EXTRACTOR_TABLE_KEY, SOURCE_TABLE_KEY),
367+
false);
368+
346369
// Check coexistence of mode.snapshot and mode
347-
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY)
348-
&& parameters.hasAnyAttributes(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY)) {
349-
LOGGER.warn(
350-
"When {} or {} is specified, specifying {} and {} is invalid.",
351-
EXTRACTOR_MODE_SNAPSHOT_KEY,
352-
SOURCE_MODE_SNAPSHOT_KEY,
353-
EXTRACTOR_MODE_KEY,
354-
SOURCE_MODE_KEY);
355-
}
370+
validator.validateSynonymAttributes(
371+
Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY),
372+
Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY),
373+
false);
356374

357375
// Check coexistence of mode.streaming and realtime.mode
358-
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY)
359-
&& parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) {
360-
LOGGER.warn(
361-
"When {} or {} is specified, specifying {} and {} is invalid.",
362-
EXTRACTOR_MODE_STREAMING_KEY,
363-
SOURCE_MODE_STREAMING_KEY,
364-
EXTRACTOR_REALTIME_MODE_KEY,
365-
SOURCE_REALTIME_MODE_KEY);
366-
}
376+
validator.validateSynonymAttributes(
377+
Arrays.asList(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY),
378+
Arrays.asList(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY),
379+
false);
367380

368381
// Check coexistence of mode.strict, history.loose-range and realtime.loose-range
369-
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STRICT_KEY, SOURCE_MODE_STRICT_KEY)) {
370-
if (parameters.hasAnyAttributes(
371-
EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, SOURCE_HISTORY_LOOSE_RANGE_KEY)) {
372-
LOGGER.warn(
373-
"When {} or {} is specified, specifying {} and {} is invalid.",
374-
EXTRACTOR_MODE_STRICT_KEY,
375-
SOURCE_MODE_STRICT_KEY,
382+
validator.validateSynonymAttributes(
383+
Arrays.asList(EXTRACTOR_MODE_STRICT_KEY, SOURCE_MODE_STRICT_KEY),
384+
Arrays.asList(
376385
EXTRACTOR_HISTORY_LOOSE_RANGE_KEY,
377-
SOURCE_HISTORY_LOOSE_RANGE_KEY);
378-
}
379-
if (parameters.hasAnyAttributes(
380-
EXTRACTOR_REALTIME_LOOSE_RANGE_KEY, SOURCE_REALTIME_LOOSE_RANGE_KEY)) {
381-
LOGGER.warn(
382-
"When {} or {} is specified, specifying {} and {} is invalid.",
383-
EXTRACTOR_MODE_STRICT_KEY,
384-
SOURCE_MODE_STRICT_KEY,
386+
SOURCE_HISTORY_LOOSE_RANGE_KEY,
385387
EXTRACTOR_REALTIME_LOOSE_RANGE_KEY,
386-
SOURCE_REALTIME_LOOSE_RANGE_KEY);
387-
}
388-
}
388+
SOURCE_REALTIME_LOOSE_RANGE_KEY),
389+
false);
389390

390391
// Check coexistence of mods and mods.enable
391-
if (parameters.hasAnyAttributes(EXTRACTOR_MODS_ENABLE_KEY, SOURCE_MODS_ENABLE_KEY)
392-
&& parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) {
393-
LOGGER.warn(
394-
"When {} or {} is specified, specifying {} and {} is invalid.",
395-
EXTRACTOR_MODS_KEY,
396-
SOURCE_MODS_KEY,
397-
EXTRACTOR_MODS_ENABLE_KEY,
398-
SOURCE_MODS_ENABLE_KEY);
399-
}
392+
validator.validateSynonymAttributes(
393+
Arrays.asList(EXTRACTOR_MODS_ENABLE_KEY, SOURCE_MODS_ENABLE_KEY),
394+
Arrays.asList(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY),
395+
false);
400396

401397
// Check coexistence of watermark.interval-ms and watermark-interval-ms
402-
if (parameters.hasAnyAttributes(EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY)
403-
&& parameters.hasAnyAttributes(
404-
_EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY)) {
405-
LOGGER.warn(
406-
"When {} or {} is specified, specifying {} and {} is invalid.",
407-
EXTRACTOR_WATERMARK_INTERVAL_KEY,
408-
SOURCE_WATERMARK_INTERVAL_KEY,
409-
_EXTRACTOR_WATERMARK_INTERVAL_KEY,
410-
_SOURCE_WATERMARK_INTERVAL_KEY);
411-
}
398+
validator.validateSynonymAttributes(
399+
Arrays.asList(EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY),
400+
Arrays.asList(_EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY),
401+
false);
412402

413403
// Check if specifying mode.snapshot or mode.streaming when disable realtime extractor
414404
if (!parameters.getBooleanOrDefault(
415405
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY),
416406
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
417407
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY)) {
418-
LOGGER.info(
408+
LOGGER.warn(
419409
"When '{}' ('{}') is set to false, specifying {} and {} is invalid.",
420410
EXTRACTOR_REALTIME_ENABLE_KEY,
421411
SOURCE_REALTIME_ENABLE_KEY,
422412
EXTRACTOR_MODE_SNAPSHOT_KEY,
423413
SOURCE_MODE_SNAPSHOT_KEY);
424414
}
425415
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY)) {
426-
LOGGER.info(
416+
LOGGER.warn(
427417
"When '{}' ('{}') is set to false, specifying {} and {} is invalid.",
428418
EXTRACTOR_REALTIME_ENABLE_KEY,
429419
SOURCE_REALTIME_ENABLE_KEY,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
2323
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
2424
import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
25-
import org.apache.iotdb.commons.exception.IllegalPathException;
2625
import org.apache.iotdb.commons.path.PartialPath;
2726
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
2827
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
@@ -50,6 +49,7 @@
5049
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
5150
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
5251
import org.apache.iotdb.pipe.api.exception.PipeException;
52+
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
5353
import org.apache.iotdb.rpc.TSStatusCode;
5454
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
5555

@@ -102,32 +102,25 @@ public class TwoStageCountProcessor implements PipeProcessor {
102102

103103
@Override
104104
public void validate(PipeParameterValidator validator) throws Exception {
105-
checkInvalidParameters(validator.getParameters());
106-
107-
final String rawOutputSeries;
108-
if (!validator.getParameters().hasAttribute(PROCESSOR_OUTPUT_SERIES_KEY)) {
109-
validator.validateRequiredAttribute(_PROCESSOR_OUTPUT_SERIES_KEY);
110-
rawOutputSeries = validator.getParameters().getString(_PROCESSOR_OUTPUT_SERIES_KEY);
111-
} else {
112-
rawOutputSeries = validator.getParameters().getString(PROCESSOR_OUTPUT_SERIES_KEY);
113-
}
105+
checkInvalidParameters(validator);
114106

107+
final String rawOutputSeries =
108+
validator
109+
.getParameters()
110+
.getStringByKeys(PROCESSOR_OUTPUT_SERIES_KEY, _PROCESSOR_OUTPUT_SERIES_KEY);
115111
try {
116-
PathUtils.isLegalPath(rawOutputSeries);
117-
} catch (IllegalPathException e) {
118-
throw new IllegalArgumentException("Illegal output series path: " + rawOutputSeries);
112+
PathUtils.isLegalPath(Objects.requireNonNull(rawOutputSeries));
113+
} catch (Exception e) {
114+
throw new PipeParameterNotValidException("Illegal output series path: " + rawOutputSeries);
119115
}
120116
}
121117

122-
private void checkInvalidParameters(final PipeParameters parameters) {
118+
private void checkInvalidParameters(final PipeParameterValidator validator) {
123119
// Check coexistence of output.series and output-series
124-
if (parameters.hasAttribute(PROCESSOR_OUTPUT_SERIES_KEY)
125-
&& parameters.hasAttribute(_PROCESSOR_OUTPUT_SERIES_KEY)) {
126-
LOGGER.warn(
127-
"When {} is specified, specifying {} is invalid.",
128-
PROCESSOR_OUTPUT_SERIES_KEY,
129-
_PROCESSOR_OUTPUT_SERIES_KEY);
130-
}
120+
validator.validateSynonymAttributes(
121+
Collections.singletonList(PROCESSOR_OUTPUT_SERIES_KEY),
122+
Collections.singletonList(_PROCESSOR_OUTPUT_SERIES_KEY),
123+
true);
131124
}
132125

133126
@Override

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,12 @@ public void validate(final PipeParameterValidator validator) throws Exception {
190190
Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY),
191191
CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE));
192192

193+
// Check coexistence of user and username
194+
validator.validateSynonymAttributes(
195+
Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
196+
Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY, SINK_IOTDB_USERNAME_KEY),
197+
false);
198+
193199
username =
194200
parameters.getStringOrDefault(
195201
Arrays.asList(

0 commit comments

Comments
 (0)