Skip to content

Commit 3d79e36

Browse files
authored
[To dev/1.3] Pipe: support path exclusion under tree model (#16632) (#16763)
* setup * FIN
1 parent d1270de commit 3d79e36

File tree

25 files changed

+1625
-584
lines changed

25 files changed

+1625
-584
lines changed

integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipePatternFormatIT.java

Lines changed: 347 additions & 269 deletions
Large diffs are not rendered by default.

integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,80 @@ public void testPureSchemaInclusionWithMultiplePattern() throws Exception {
177177
}
178178
}
179179

180+
@Test
181+
public void testPureSchemaInclusionWithExclusionPattern() throws Exception {
182+
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
183+
184+
final String receiverIp = receiverDataNode.getIp();
185+
final int receiverPort = receiverDataNode.getPort();
186+
187+
try (final SyncConfigNodeIServiceClient client =
188+
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
189+
final Map<String, String> extractorAttributes = new HashMap<>();
190+
final Map<String, String> processorAttributes = new HashMap<>();
191+
final Map<String, String> connectorAttributes = new HashMap<>();
192+
193+
extractorAttributes.put("extractor.inclusion", "schema");
194+
// Include root.ln.**
195+
extractorAttributes.put("path", "root.ln.**");
196+
// Exclude root.ln.wf02.* and root.ln.wf03.wt01.status
197+
extractorAttributes.put("path.exclusion", "root.ln.wf02.**, root.ln.wf03.wt01.status");
198+
199+
connectorAttributes.put("connector", "iotdb-thrift-connector");
200+
connectorAttributes.put("connector.ip", receiverIp);
201+
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
202+
203+
final TSStatus status =
204+
client.createPipe(
205+
new TCreatePipeReq("testPipe", connectorAttributes)
206+
.setExtractorAttributes(extractorAttributes)
207+
.setProcessorAttributes(processorAttributes));
208+
209+
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
210+
211+
Assert.assertEquals(
212+
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());
213+
214+
// Do not fail if the failure has nothing to do with pipe
215+
// Because the failures will randomly generate due to resource limitation
216+
if (TestUtils.tryExecuteNonQueriesWithRetry(
217+
senderEnv,
218+
Arrays.asList(
219+
// Should be included
220+
"create timeseries root.ln.wf01.wt01.status with datatype=BOOLEAN,encoding=PLAIN",
221+
"ALTER timeseries root.ln.wf01.wt01.status ADD TAGS tag3=v3",
222+
"ALTER timeseries root.ln.wf01.wt01.status ADD ATTRIBUTES attr4=v4",
223+
// Should be excluded by root.ln.wf02.*
224+
"create timeseries root.ln.wf02.wt01.status with datatype=BOOLEAN,encoding=PLAIN",
225+
"ALTER timeseries root.ln.wf02.wt01.status ADD TAGS tag3=v3",
226+
"ALTER timeseries root.ln.wf02.wt01.status ADD ATTRIBUTES attr4=v4",
227+
// Should be excluded by root.ln.wf03.wt01.status
228+
"create timeseries root.ln.wf03.wt01.status with datatype=BOOLEAN,encoding=PLAIN",
229+
"ALTER timeseries root.ln.wf03.wt01.status ADD TAGS tag3=v3",
230+
"ALTER timeseries root.ln.wf03.wt01.status ADD ATTRIBUTES attr4=v4"))) {
231+
return;
232+
}
233+
234+
TestUtils.assertDataEventuallyOnEnv(
235+
receiverEnv,
236+
"show timeseries root.ln.**",
237+
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
238+
// Only wf01 should be synced
239+
Collections.singleton(
240+
"root.ln.wf01.wt01.status,null,root.ln,BOOLEAN,PLAIN,LZ4,{\"tag3\":\"v3\"},{\"attr4\":\"v4\"},null,null,BASE,"));
241+
242+
if (!TestUtils.tryExecuteNonQueriesWithRetry(
243+
senderEnv,
244+
Arrays.asList(
245+
"insert into root.ln.wf01.wt01(time, status) values(now(), false)", "flush"))) {
246+
return;
247+
}
248+
249+
TestUtils.assertDataAlwaysOnEnv(
250+
receiverEnv, "select * from root.ln.**", "Time,", Collections.emptySet());
251+
}
252+
}
253+
180254
@Test
181255
public void testAuthExclusion() throws Exception {
182256
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
import org.apache.iotdb.commons.path.PartialPath;
2727
import org.apache.iotdb.commons.path.PathPatternTree;
2828
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
29+
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePatternOperations;
2930
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
30-
import org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
3131
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
3232
import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
3333
import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
@@ -530,14 +530,13 @@ protected TSStatus loadFileV2(
530530
final Set<ConfigPhysicalPlanType> executionTypes =
531531
PipeConfigRegionSnapshotEvent.getConfigPhysicalPlanTypeSet(
532532
parameters.get(ColumnHeaderConstant.TYPE));
533-
final List<PipePattern> pipePatterns =
534-
PipePattern.parseMultiplePatterns(
533+
final PipePattern pipePattern =
534+
PipePattern.parsePatternFromString(
535535
parameters.get(ColumnHeaderConstant.PATH_PATTERN), IoTDBPipePattern::new);
536-
final PipePattern pipePattern = PipePattern.buildUnionPattern(pipePatterns);
537536
final List<TSStatus> results = new ArrayList<>();
538537
while (generator.hasNext()) {
539538
IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
540-
.process(generator.next(), (UnionIoTDBPipePattern) pipePattern)
539+
.process(generator.next(), (IoTDBPipePatternOperations) pipePattern)
541540
.filter(configPhysicalPlan -> executionTypes.contains(configPhysicalPlan.getType()))
542541
.ifPresent(
543542
configPhysicalPlan ->

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitor.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
2323
import org.apache.iotdb.commons.path.PartialPath;
2424
import org.apache.iotdb.commons.path.PathPatternTree;
25+
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePatternOperations;
2526
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
26-
import org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
2727
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
2828
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanVisitor;
2929
import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan;
@@ -59,12 +59,12 @@
5959

6060
/**
6161
* The {@link PipeConfigPhysicalPlanPatternParseVisitor} will transform the schema {@link
62-
* ConfigPhysicalPlan}s using {@link UnionIoTDBPipePattern}. Rule:
62+
* ConfigPhysicalPlan}s using {@link IoTDBPipePatternOperations}. Rule:
6363
*
6464
* <p>1. All patterns in the output {@link ConfigPhysicalPlan} will be the intersection of the
65-
* original {@link ConfigPhysicalPlan}'s patterns and the given {@link UnionIoTDBPipePattern}.
65+
* original {@link ConfigPhysicalPlan}'s patterns and the given {@link IoTDBPipePatternOperations}.
6666
*
67-
* <p>2. If a pattern does not intersect with the {@link UnionIoTDBPipePattern}, it's dropped.
67+
* <p>2. If a pattern does not intersect with the {@link IoTDBPipePatternOperations}, it's dropped.
6868
*
6969
* <p>3. If all the patterns in the {@link ConfigPhysicalPlan} is dropped, the {@link
7070
* ConfigPhysicalPlan} is dropped.
@@ -73,13 +73,13 @@
7373
* one is used in the {@link PipeConfigRegionWritePlanEvent} in {@link ConfigRegionListeningQueue}.
7474
*/
7575
public class PipeConfigPhysicalPlanPatternParseVisitor
76-
extends ConfigPhysicalPlanVisitor<Optional<ConfigPhysicalPlan>, UnionIoTDBPipePattern> {
76+
extends ConfigPhysicalPlanVisitor<Optional<ConfigPhysicalPlan>, IoTDBPipePatternOperations> {
7777
private static final Logger LOGGER =
7878
LoggerFactory.getLogger(PipeConfigPhysicalPlanPatternParseVisitor.class);
7979

8080
@Override
8181
public Optional<ConfigPhysicalPlan> visitPlan(
82-
final ConfigPhysicalPlan plan, final UnionIoTDBPipePattern pattern) {
82+
final ConfigPhysicalPlan plan, final IoTDBPipePatternOperations pattern) {
8383
return Optional.of(plan);
8484
}
8585

@@ -91,23 +91,23 @@ public Optional<ConfigPhysicalPlan> visitPlan(
9191
// Other matches using "matchPrefixPath" are with the same principle.
9292
@Override
9393
public Optional<ConfigPhysicalPlan> visitCreateDatabase(
94-
final DatabaseSchemaPlan createDatabasePlan, final UnionIoTDBPipePattern pattern) {
94+
final DatabaseSchemaPlan createDatabasePlan, final IoTDBPipePatternOperations pattern) {
9595
return pattern.matchPrefixPath(createDatabasePlan.getSchema().getName())
9696
? Optional.of(createDatabasePlan)
9797
: Optional.empty();
9898
}
9999

100100
@Override
101101
public Optional<ConfigPhysicalPlan> visitAlterDatabase(
102-
final DatabaseSchemaPlan alterDatabasePlan, final UnionIoTDBPipePattern pattern) {
102+
final DatabaseSchemaPlan alterDatabasePlan, final IoTDBPipePatternOperations pattern) {
103103
return pattern.matchPrefixPath(alterDatabasePlan.getSchema().getName())
104104
? Optional.of(alterDatabasePlan)
105105
: Optional.empty();
106106
}
107107

108108
@Override
109109
public Optional<ConfigPhysicalPlan> visitDeleteDatabase(
110-
final DeleteDatabasePlan deleteDatabasePlan, final UnionIoTDBPipePattern pattern) {
110+
final DeleteDatabasePlan deleteDatabasePlan, final IoTDBPipePatternOperations pattern) {
111111
return pattern.matchPrefixPath(deleteDatabasePlan.getName())
112112
? Optional.of(deleteDatabasePlan)
113113
: Optional.empty();
@@ -116,7 +116,7 @@ public Optional<ConfigPhysicalPlan> visitDeleteDatabase(
116116
@Override
117117
public Optional<ConfigPhysicalPlan> visitCreateSchemaTemplate(
118118
final CreateSchemaTemplatePlan createSchemaTemplatePlan,
119-
final UnionIoTDBPipePattern pattern) {
119+
final IoTDBPipePatternOperations pattern) {
120120
// This is a deserialized template and can be arbitrarily altered
121121
final Template template = createSchemaTemplatePlan.getTemplate();
122122
template.getSchemaMap().keySet().removeIf(measurement -> !pattern.matchTailNode(measurement));
@@ -128,7 +128,7 @@ public Optional<ConfigPhysicalPlan> visitCreateSchemaTemplate(
128128
@Override
129129
public Optional<ConfigPhysicalPlan> visitCommitSetSchemaTemplate(
130130
final CommitSetSchemaTemplatePlan commitSetSchemaTemplatePlan,
131-
final UnionIoTDBPipePattern pattern) {
131+
final IoTDBPipePatternOperations pattern) {
132132
return pattern.matchPrefixPath(commitSetSchemaTemplatePlan.getPath())
133133
? Optional.of(commitSetSchemaTemplatePlan)
134134
: Optional.empty();
@@ -137,7 +137,7 @@ public Optional<ConfigPhysicalPlan> visitCommitSetSchemaTemplate(
137137
@Override
138138
public Optional<ConfigPhysicalPlan> visitPipeUnsetSchemaTemplate(
139139
final PipeUnsetSchemaTemplatePlan pipeUnsetSchemaTemplatePlan,
140-
final UnionIoTDBPipePattern pattern) {
140+
final IoTDBPipePatternOperations pattern) {
141141
return pattern.matchPrefixPath(pipeUnsetSchemaTemplatePlan.getPath())
142142
? Optional.of(pipeUnsetSchemaTemplatePlan)
143143
: Optional.empty();
@@ -146,7 +146,7 @@ public Optional<ConfigPhysicalPlan> visitPipeUnsetSchemaTemplate(
146146
@Override
147147
public Optional<ConfigPhysicalPlan> visitExtendSchemaTemplate(
148148
final ExtendSchemaTemplatePlan extendSchemaTemplatePlan,
149-
final UnionIoTDBPipePattern pattern) {
149+
final IoTDBPipePatternOperations pattern) {
150150
final TemplateExtendInfo extendInfo = extendSchemaTemplatePlan.getTemplateExtendInfo();
151151
final int[] filteredIndexes =
152152
IntStream.range(0, extendInfo.getMeasurements().size())
@@ -166,30 +166,30 @@ public Optional<ConfigPhysicalPlan> visitExtendSchemaTemplate(
166166

167167
@Override
168168
public Optional<ConfigPhysicalPlan> visitGrantUser(
169-
final AuthorPlan grantUserPlan, final UnionIoTDBPipePattern pattern) {
169+
final AuthorPlan grantUserPlan, final IoTDBPipePatternOperations pattern) {
170170
return visitPathRelatedAuthorPlan(grantUserPlan, pattern);
171171
}
172172

173173
@Override
174174
public Optional<ConfigPhysicalPlan> visitRevokeUser(
175-
final AuthorPlan revokeUserPlan, final UnionIoTDBPipePattern pattern) {
175+
final AuthorPlan revokeUserPlan, final IoTDBPipePatternOperations pattern) {
176176
return visitPathRelatedAuthorPlan(revokeUserPlan, pattern);
177177
}
178178

179179
@Override
180180
public Optional<ConfigPhysicalPlan> visitGrantRole(
181-
final AuthorPlan revokeUserPlan, final UnionIoTDBPipePattern pattern) {
181+
final AuthorPlan revokeUserPlan, final IoTDBPipePatternOperations pattern) {
182182
return visitPathRelatedAuthorPlan(revokeUserPlan, pattern);
183183
}
184184

185185
@Override
186186
public Optional<ConfigPhysicalPlan> visitRevokeRole(
187-
final AuthorPlan revokeUserPlan, final UnionIoTDBPipePattern pattern) {
187+
final AuthorPlan revokeUserPlan, final IoTDBPipePatternOperations pattern) {
188188
return visitPathRelatedAuthorPlan(revokeUserPlan, pattern);
189189
}
190190

191191
private Optional<ConfigPhysicalPlan> visitPathRelatedAuthorPlan(
192-
final AuthorPlan pathRelatedAuthorPlan, final UnionIoTDBPipePattern pattern) {
192+
final AuthorPlan pathRelatedAuthorPlan, final IoTDBPipePatternOperations pattern) {
193193
final List<PartialPath> intersectedPaths =
194194
pathRelatedAuthorPlan.getNodeNameList().stream()
195195
.map(pattern::getIntersection)
@@ -218,7 +218,7 @@ private Optional<ConfigPhysicalPlan> visitPathRelatedAuthorPlan(
218218
@Override
219219
public Optional<ConfigPhysicalPlan> visitPipeDeleteTimeSeries(
220220
final PipeDeleteTimeSeriesPlan pipeDeleteTimeSeriesPlan,
221-
final UnionIoTDBPipePattern pattern) {
221+
final IoTDBPipePatternOperations pattern) {
222222
try {
223223
final PathPatternTree intersectedTree =
224224
pattern.getIntersection(
@@ -237,7 +237,7 @@ public Optional<ConfigPhysicalPlan> visitPipeDeleteTimeSeries(
237237
@Override
238238
public Optional<ConfigPhysicalPlan> visitPipeDeleteLogicalView(
239239
final PipeDeleteLogicalViewPlan pipeDeleteLogicalViewPlan,
240-
final UnionIoTDBPipePattern pattern) {
240+
final IoTDBPipePatternOperations pattern) {
241241
try {
242242
final PathPatternTree intersectedTree =
243243
pattern.getIntersection(
@@ -256,7 +256,7 @@ public Optional<ConfigPhysicalPlan> visitPipeDeleteLogicalView(
256256
@Override
257257
public Optional<ConfigPhysicalPlan> visitPipeDeactivateTemplate(
258258
final PipeDeactivateTemplatePlan pipeDeactivateTemplatePlan,
259-
final UnionIoTDBPipePattern pattern) {
259+
final IoTDBPipePatternOperations pattern) {
260260
final Map<PartialPath, List<Template>> newTemplateSetInfo =
261261
pipeDeactivateTemplatePlan.getTemplateSetInfo().entrySet().stream()
262262
.flatMap(
@@ -279,7 +279,7 @@ public Optional<ConfigPhysicalPlan> visitPipeDeactivateTemplate(
279279

280280
@Override
281281
public Optional<ConfigPhysicalPlan> visitTTL(
282-
final SetTTLPlan setTTLPlan, final UnionIoTDBPipePattern pattern) {
282+
final SetTTLPlan setTTLPlan, final IoTDBPipePatternOperations pattern) {
283283
final PartialPath partialPath = new PartialPath(setTTLPlan.getPathPattern());
284284
final List<PartialPath> intersectionList =
285285
pattern.matchPrefixPath(partialPath.getFullPath())

iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitorTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.commons.path.PartialPath;
2424
import org.apache.iotdb.commons.path.PathPatternTree;
2525
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
26+
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePatternOperations;
2627
import org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
2728
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
2829
import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan;
@@ -55,11 +56,11 @@
5556

5657
public class PipeConfigPhysicalPlanPatternParseVisitorTest {
5758

58-
private final UnionIoTDBPipePattern prefixPathPattern =
59+
private final IoTDBPipePatternOperations prefixPathPattern =
5960
new UnionIoTDBPipePattern(new IoTDBPipePattern("root.db.device.**"));
60-
private final UnionIoTDBPipePattern fullPathPattern =
61+
private final IoTDBPipePatternOperations fullPathPattern =
6162
new UnionIoTDBPipePattern(new IoTDBPipePattern("root.db.device.s1"));
62-
private final UnionIoTDBPipePattern multiplePathPattern =
63+
private final IoTDBPipePatternOperations multiplePathPattern =
6364
new UnionIoTDBPipePattern(
6465
Arrays.asList(
6566
new IoTDBPipePattern("root.db.device.s1"),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
2323
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
24-
import org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
24+
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePatternOperations;
2525
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2626
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
2727
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
@@ -158,7 +158,8 @@ private void parseAndCollectEvent(final PipeSchemaRegionWritePlanEvent deleteDat
158158
// delete data event does not have progress index currently
159159
IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
160160
.process(
161-
deleteDataEvent.getPlanNode(), (UnionIoTDBPipePattern) deleteDataEvent.getPipePattern())
161+
deleteDataEvent.getPlanNode(),
162+
(IoTDBPipePatternOperations) deleteDataEvent.getPipePattern())
162163
.map(
163164
planNode ->
164165
new PipeSchemaRegionWritePlanEvent(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121

2222
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
2323
import org.apache.iotdb.commons.pipe.config.PipeConfig;
24+
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePatternOperations;
2425
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
25-
import org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
2626
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
2727
import org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
2828
import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
@@ -94,8 +94,8 @@ public TsFileInsertionDataContainer provide(final boolean isWithMod) throws IOEx
9494
isWithMod);
9595
}
9696

97-
if (pattern instanceof UnionIoTDBPipePattern
98-
&& !((UnionIoTDBPipePattern) pattern).mayMatchMultipleTimeSeriesInOneDevice()) {
97+
if (pattern instanceof IoTDBPipePatternOperations
98+
&& !((IoTDBPipePatternOperations) pattern).mayMatchMultipleTimeSeriesInOneDevice()) {
9999
// If the pattern matches only one time series in one device, use query container here
100100
// because there is no timestamps merge overhead.
101101
//

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
import org.apache.iotdb.commons.path.PartialPath;
2727
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2828
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
29+
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePatternOperations;
2930
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
30-
import org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
3131
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
3232
import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
3333
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
@@ -512,10 +512,9 @@ private TSStatus loadSchemaSnapShot(
512512
final Set<StatementType> executionTypes =
513513
PipeSchemaRegionSnapshotEvent.getStatementTypeSet(
514514
parameters.get(ColumnHeaderConstant.TYPE));
515-
final List<PipePattern> pipePatterns =
516-
PipePattern.parseMultiplePatterns(
515+
final PipePattern pipePattern =
516+
PipePattern.parsePatternFromString(
517517
parameters.get(ColumnHeaderConstant.PATH_PATTERN), IoTDBPipePattern::new);
518-
final PipePattern pipePattern = PipePattern.buildUnionPattern(pipePatterns);
519518

520519
// Clear to avoid previous exceptions
521520
batchVisitor.clear();
@@ -530,7 +529,7 @@ private TSStatus loadSchemaSnapShot(
530529
// Here we apply the statements as many as possible
531530
// Even if there are failed statements
532531
STATEMENT_PATTERN_PARSE_VISITOR
533-
.process(originalStatement, (UnionIoTDBPipePattern) pipePattern)
532+
.process(originalStatement, (IoTDBPipePatternOperations) pipePattern)
534533
.flatMap(parsedStatement -> batchVisitor.process(parsedStatement, null))
535534
.ifPresent(statement -> results.add(executeStatementAndClassifyExceptions(statement)));
536535
}

0 commit comments

Comments
 (0)