Skip to content

Commit 3aef6ad

Browse files
authored
Pipe: Fixed the bug that historical aligned timeseries' attributes/tags/alias is not transferred (apache#14071)
1 parent 32e701e commit 3aef6ad

File tree

2 files changed

+85
-3
lines changed

2 files changed

+85
-3
lines changed

integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeMetaHistoricalIT.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,4 +261,60 @@ public void testAuthInclusion() throws Exception {
261261
new HashSet<>(Arrays.asList("admin,", "test,")));
262262
}
263263
}
264+
265+
@Test
266+
public void testTimeSeriesInclusion() throws Exception {
267+
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
268+
269+
final String receiverIp = receiverDataNode.getIp();
270+
final int receiverPort = receiverDataNode.getPort();
271+
272+
try (final SyncConfigNodeIServiceClient client =
273+
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
274+
275+
// Do not fail if the failure has nothing to do with pipe
276+
// Because the failures will randomly generate due to resource limitation
277+
if (!TestUtils.tryExecuteNonQueriesWithRetry(
278+
senderEnv,
279+
Arrays.asList(
280+
"create database root.sg",
281+
"create timeseries root.sg.a.b int32",
282+
"create aligned timeseries root.sg.`apache|timecho-tag-attr`.d1(s1 INT32 tags(tag1=v1, tag2=v2) attributes(attr1=v1, attr2=v2), s2 DOUBLE tags(tag3=v3, tag4=v4) attributes(attr3=v3, attr4=v4))"))) {
283+
return;
284+
}
285+
286+
final Map<String, String> extractorAttributes = new HashMap<>();
287+
final Map<String, String> processorAttributes = new HashMap<>();
288+
final Map<String, String> connectorAttributes = new HashMap<>();
289+
290+
extractorAttributes.put("extractor.inclusion", "schema");
291+
292+
connectorAttributes.put("connector", "iotdb-thrift-connector");
293+
connectorAttributes.put("connector.ip", receiverIp);
294+
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
295+
connectorAttributes.put("connector.exception.conflict.resolve-strategy", "retry");
296+
connectorAttributes.put("connector.exception.conflict.retry-max-time-seconds", "-1");
297+
298+
final TSStatus status =
299+
client.createPipe(
300+
new TCreatePipeReq("testPipe", connectorAttributes)
301+
.setExtractorAttributes(extractorAttributes)
302+
.setProcessorAttributes(processorAttributes));
303+
304+
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
305+
306+
Assert.assertEquals(
307+
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());
308+
309+
TestUtils.assertDataEventuallyOnEnv(
310+
receiverEnv,
311+
"show timeseries",
312+
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
313+
new HashSet<>(
314+
Arrays.asList(
315+
"root.sg.a.b,null,root.sg,INT32,TS_2DIFF,LZ4,null,null,null,null,BASE,",
316+
"root.sg.`apache|timecho-tag-attr`.d1.s1,null,root.sg,INT32,TS_2DIFF,LZ4,{\"tag1\":\"v1\",\"tag2\":\"v2\"},{\"attr2\":\"v2\",\"attr1\":\"v1\"},null,null,BASE,",
317+
"root.sg.`apache|timecho-tag-attr`.d1.s2,null,root.sg,DOUBLE,GORILLA,LZ4,{\"tag4\":\"v4\",\"tag3\":\"v3\"},{\"attr4\":\"v4\",\"attr3\":\"v3\"},null,null,BASE,")));
318+
}
319+
}
264320
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import java.util.ArrayList;
8989
import java.util.List;
9090
import java.util.Map;
91+
import java.util.Objects;
9192

9293
/** Schema write {@link PlanNode} visitor */
9394
public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
@@ -286,16 +287,21 @@ private void executeInternalCreateAlignedTimeSeries(
286287
final List<TSDataType> dataTypeList = measurementGroup.getDataTypes();
287288
final List<TSEncoding> encodingList = measurementGroup.getEncodings();
288289
final List<CompressionType> compressionTypeList = measurementGroup.getCompressors();
290+
final List<String> aliasList = measurementGroup.getAliasList();
291+
final List<Map<String, String>> tagsList = measurementGroup.getTagsList();
292+
final List<Map<String, String>> attributesList = measurementGroup.getAttributesList();
293+
289294
final ICreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
290295
SchemaRegionWritePlanFactory.getCreateAlignedTimeSeriesPlan(
291296
devicePath,
292297
measurementList,
293298
dataTypeList,
294299
encodingList,
295300
compressionTypeList,
296-
null,
297-
null,
298-
null);
301+
aliasList,
302+
tagsList,
303+
attributesList);
304+
299305
// With merge is only true for pipe to upsert the receiver alias/tags/attributes in historical
300306
// transfer.
301307
// For normal internal creation, the alias/tags/attributes are not set
@@ -322,6 +328,16 @@ private void executeInternalCreateAlignedTimeSeries(
322328
encodingList.remove(index);
323329
compressionTypeList.remove(index);
324330

331+
if (Objects.nonNull(aliasList)) {
332+
aliasList.remove(index);
333+
}
334+
if (Objects.nonNull(tagsList)) {
335+
tagsList.remove(index);
336+
}
337+
if (Objects.nonNull(attributesList)) {
338+
attributesList.remove(index);
339+
}
340+
325341
// If with merge is set, the lists are deep copied and need to be altered here.
326342
// We still remove the element from the original list to help cascading pipe transfer
327343
// schema.
@@ -332,6 +348,16 @@ private void executeInternalCreateAlignedTimeSeries(
332348
createAlignedTimeSeriesPlan.getDataTypes().remove(index);
333349
createAlignedTimeSeriesPlan.getEncodings().remove(index);
334350
createAlignedTimeSeriesPlan.getCompressors().remove(index);
351+
352+
if (Objects.nonNull(aliasList)) {
353+
createAlignedTimeSeriesPlan.getAliasList().remove(index);
354+
}
355+
if (Objects.nonNull(tagsList)) {
356+
createAlignedTimeSeriesPlan.getTagsList().remove(index);
357+
}
358+
if (Objects.nonNull(attributesList)) {
359+
createAlignedTimeSeriesPlan.getAttributesList().remove(index);
360+
}
335361
}
336362

337363
if (measurementList.isEmpty()) {

0 commit comments

Comments
 (0)