Skip to content

Commit be0c8ad

Browse files
authored
Pipe: Fixed the bug that historical aligned timeseries' attributes/tags/alias is not transferred (#14071) (#14078)
1 parent 4d290b5 commit be0c8ad

File tree

2 files changed

+85
-3
lines changed

2 files changed

+85
-3
lines changed

integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,4 +261,60 @@ public void testAuthInclusion() throws Exception {
261261
new HashSet<>(Arrays.asList("admin,", "test,")));
262262
}
263263
}
264+
265+
@Test
266+
public void testTimeSeriesInclusion() throws Exception {
267+
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
268+
269+
final String receiverIp = receiverDataNode.getIp();
270+
final int receiverPort = receiverDataNode.getPort();
271+
272+
try (final SyncConfigNodeIServiceClient client =
273+
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
274+
275+
// Do not fail if the failure has nothing to do with pipe
276+
// Because the failures will randomly generate due to resource limitation
277+
if (!TestUtils.tryExecuteNonQueriesWithRetry(
278+
senderEnv,
279+
Arrays.asList(
280+
"create database root.sg",
281+
"create timeseries root.sg.a.b int32",
282+
"create aligned timeseries root.sg.`apache|timecho-tag-attr`.d1(s1 INT32 tags(tag1=v1, tag2=v2) attributes(attr1=v1, attr2=v2), s2 DOUBLE tags(tag3=v3, tag4=v4) attributes(attr3=v3, attr4=v4))"))) {
283+
return;
284+
}
285+
286+
final Map<String, String> extractorAttributes = new HashMap<>();
287+
final Map<String, String> processorAttributes = new HashMap<>();
288+
final Map<String, String> connectorAttributes = new HashMap<>();
289+
290+
extractorAttributes.put("extractor.inclusion", "schema");
291+
292+
connectorAttributes.put("connector", "iotdb-thrift-connector");
293+
connectorAttributes.put("connector.ip", receiverIp);
294+
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
295+
connectorAttributes.put("connector.exception.conflict.resolve-strategy", "retry");
296+
connectorAttributes.put("connector.exception.conflict.retry-max-time-seconds", "-1");
297+
298+
final TSStatus status =
299+
client.createPipe(
300+
new TCreatePipeReq("testPipe", connectorAttributes)
301+
.setExtractorAttributes(extractorAttributes)
302+
.setProcessorAttributes(processorAttributes));
303+
304+
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
305+
306+
Assert.assertEquals(
307+
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());
308+
309+
TestUtils.assertDataEventuallyOnEnv(
310+
receiverEnv,
311+
"show timeseries",
312+
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
313+
new HashSet<>(
314+
Arrays.asList(
315+
"root.sg.a.b,null,root.sg,INT32,TS_2DIFF,LZ4,null,null,null,null,BASE,",
316+
"root.sg.`apache|timecho-tag-attr`.d1.s1,null,root.sg,INT32,TS_2DIFF,LZ4,{\"tag1\":\"v1\",\"tag2\":\"v2\"},{\"attr2\":\"v2\",\"attr1\":\"v1\"},null,null,BASE,",
317+
"root.sg.`apache|timecho-tag-attr`.d1.s2,null,root.sg,DOUBLE,GORILLA,LZ4,{\"tag4\":\"v4\",\"tag3\":\"v3\"},{\"attr4\":\"v4\",\"attr3\":\"v3\"},null,null,BASE,")));
318+
}
319+
}
264320
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import java.util.ArrayList;
8080
import java.util.List;
8181
import java.util.Map;
82+
import java.util.Objects;
8283

8384
/** Schema write {@link PlanNode} visitor */
8485
public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
@@ -277,16 +278,21 @@ private void executeInternalCreateAlignedTimeSeries(
277278
final List<TSDataType> dataTypeList = measurementGroup.getDataTypes();
278279
final List<TSEncoding> encodingList = measurementGroup.getEncodings();
279280
final List<CompressionType> compressionTypeList = measurementGroup.getCompressors();
281+
final List<String> aliasList = measurementGroup.getAliasList();
282+
final List<Map<String, String>> tagsList = measurementGroup.getTagsList();
283+
final List<Map<String, String>> attributesList = measurementGroup.getAttributesList();
284+
280285
final ICreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
281286
SchemaRegionWritePlanFactory.getCreateAlignedTimeSeriesPlan(
282287
devicePath,
283288
measurementList,
284289
dataTypeList,
285290
encodingList,
286291
compressionTypeList,
287-
null,
288-
null,
289-
null);
292+
aliasList,
293+
tagsList,
294+
attributesList);
295+
290296
// With merge is only true for pipe to upsert the receiver alias/tags/attributes in historical
291297
// transfer.
292298
// For normal internal creation, the alias/tags/attributes are not set
@@ -313,6 +319,16 @@ private void executeInternalCreateAlignedTimeSeries(
313319
encodingList.remove(index);
314320
compressionTypeList.remove(index);
315321

322+
if (Objects.nonNull(aliasList)) {
323+
aliasList.remove(index);
324+
}
325+
if (Objects.nonNull(tagsList)) {
326+
tagsList.remove(index);
327+
}
328+
if (Objects.nonNull(attributesList)) {
329+
attributesList.remove(index);
330+
}
331+
316332
// If with merge is set, the lists are deep copied and need to be altered here.
317333
// We still remove the element from the original list to help cascading pipe transfer
318334
// schema.
@@ -323,6 +339,16 @@ private void executeInternalCreateAlignedTimeSeries(
323339
createAlignedTimeSeriesPlan.getDataTypes().remove(index);
324340
createAlignedTimeSeriesPlan.getEncodings().remove(index);
325341
createAlignedTimeSeriesPlan.getCompressors().remove(index);
342+
343+
if (Objects.nonNull(aliasList)) {
344+
createAlignedTimeSeriesPlan.getAliasList().remove(index);
345+
}
346+
if (Objects.nonNull(tagsList)) {
347+
createAlignedTimeSeriesPlan.getTagsList().remove(index);
348+
}
349+
if (Objects.nonNull(attributesList)) {
350+
createAlignedTimeSeriesPlan.getAttributesList().remove(index);
351+
}
326352
}
327353

328354
if (measurementList.isEmpty()) {

0 commit comments

Comments
 (0)