Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions iotdb-api/pipe-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
<version>${tsfile.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,24 +367,35 @@ public PipeParameters addOrReplaceEquivalentAttributesWithClone(final PipeParame

private static class KeyReducer {

private static final Set<String> PREFIXES = new HashSet<>();
private static final Set<String> FIRST_PREFIXES = new HashSet<>();
private static final Set<String> SECOND_PREFIXES = new HashSet<>();

static {
PREFIXES.add("extractor.");
PREFIXES.add("source.");
PREFIXES.add("processor.");
PREFIXES.add("connector.");
PREFIXES.add("sink.");
FIRST_PREFIXES.add("extractor.");
FIRST_PREFIXES.add("source.");
FIRST_PREFIXES.add("processor.");
FIRST_PREFIXES.add("connector.");
FIRST_PREFIXES.add("sink.");

SECOND_PREFIXES.add("opcua.");
}

static String reduce(final String key) {
static String reduce(String key) {
if (key == null) {
return null;
}
final String lowerCaseKey = key.toLowerCase();
for (final String prefix : PREFIXES) {
String lowerCaseKey = key.toLowerCase();
for (final String prefix : FIRST_PREFIXES) {
if (lowerCaseKey.startsWith(prefix)) {
key = key.substring(prefix.length());
lowerCaseKey = lowerCaseKey.substring(prefix.length());
break;
}
}
for (final String prefix : SECOND_PREFIXES) {
if (lowerCaseKey.startsWith(prefix)) {
return key.substring(prefix.length());
key = key.substring(prefix.length());
break;
}
}
return key;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.pipe.api.customizer.parameter;

import org.junit.Assert;
import org.junit.Test;

import java.util.HashMap;

public class PipeParametersTest {

@Test
public void keyReducerTest() {
final PipeParameters parameters = new PipeParameters(new HashMap<>());
parameters.addAttribute("sink.opcua.with-quality", "false");

Assert.assertEquals(false, parameters.getBoolean("with-quality"));
Assert.assertEquals(false, parameters.getBoolean("opcua.with-quality"));

// Invalid
parameters.addAttribute("sink.source.opcua.value-name", "false");
parameters.addAttribute("opcua.sink.value-name", "false");
Assert.assertNull(parameters.getString("value-name"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
import org.apache.iotdb.db.utils.DateTimeUtils;
Expand Down Expand Up @@ -70,23 +69,12 @@

public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
private final boolean isClientServerModel;
private final SubscriptionModel subscriptionModel;
private final OpcUaServerBuilder builder;
private final String databaseName;
private final String placeHolder;

OpcUaNameSpace(
final OpcUaServer server,
final boolean isClientServerModel,
final OpcUaServerBuilder builder,
final String qualifiedDatabaseName,
final String placeHolder) {

OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder builder) {
super(server, NAMESPACE_URI);
this.isClientServerModel = isClientServerModel;
this.builder = builder;
this.databaseName = PathUtils.unQualifyDatabaseName(qualifiedDatabaseName);
this.placeHolder = placeHolder;

subscriptionModel = new SubscriptionModel(server, this);
getLifecycleManager().addLifecycle(subscriptionModel);
Expand All @@ -106,15 +94,17 @@ public void shutdown() {
});
}

void transfer(final Tablet tablet, final boolean isTableModel) throws UaException {
if (isClientServerModel) {
transferTabletForClientServerModel(tablet, isTableModel);
void transfer(final Tablet tablet, final boolean isTableModel, final OpcUaSink sink)
throws UaException {
if (sink.isClientServerModel) {
transferTabletForClientServerModel(tablet, isTableModel, sink);
} else {
transferTabletForPubSubModel(tablet, isTableModel);
transferTabletForPubSubModel(tablet, isTableModel, sink);
}
}

private void transferTabletForClientServerModel(final Tablet tablet, final boolean isTableModel) {
private void transferTabletForClientServerModel(
final Tablet tablet, final boolean isTableModel, final OpcUaSink sink) {
final List<IMeasurementSchema> schemas = tablet.getSchemas();
final List<IMeasurementSchema> newSchemas = new ArrayList<>();
if (!isTableModel) {
Expand All @@ -136,7 +126,7 @@ private void transferTabletForClientServerModel(final Tablet tablet, final boole
}

transferTabletRowForClientServerModel(
tablet.getDeviceId().split("\\."), newSchemas, timestamps, values);
tablet.getDeviceId().split("\\."), newSchemas, timestamps, values, sink);
} else {
new PipeTableModelTabletEventSorter(tablet).sortByTimestampIfNecessary();

Expand All @@ -151,10 +141,11 @@ private void transferTabletForClientServerModel(final Tablet tablet, final boole
for (int i = 0; i < tablet.getRowSize(); ++i) {
final Object[] segments = tablet.getDeviceID(i).getSegments();
final String[] folderSegments = new String[segments.length + 1];
folderSegments[0] = databaseName;
folderSegments[0] = sink.unQualifiedDatabaseName;

for (int j = 0; j < segments.length; ++j) {
folderSegments[j + 1] = Objects.isNull(segments[j]) ? placeHolder : (String) segments[j];
folderSegments[j + 1] =
Objects.isNull(segments[j]) ? sink.placeHolder : (String) segments[j];
}

final int finalI = i;
Expand All @@ -169,7 +160,8 @@ private void transferTabletForClientServerModel(final Tablet tablet, final boole
? null
: getTabletObjectValue4Opc(
tablet.getValues()[index], finalI, schemas.get(index).getType()))
.collect(Collectors.toList()));
.collect(Collectors.toList()),
sink);
}
}
}
Expand All @@ -178,14 +170,18 @@ private void transferTabletRowForClientServerModel(
final String[] segments,
final List<IMeasurementSchema> measurementSchemas,
final List<Long> timestamps,
final List<Object> values) {
final List<Object> values,
final OpcUaSink sink) {
if (segments.length == 0) {
throw new PipeRuntimeCriticalException("The segments of tablets must exist");
}
final StringBuilder currentStr = new StringBuilder();
UaNode folderNode = null;
NodeId folderNodeId;
for (final String segment : segments) {
for (int i = 0;
i < (Objects.isNull(sink.valueName) ? segments.length : segments.length - 1);
++i) {
final String segment = segments[i];
final UaNode nextFolderNode;

currentStr.append(segment);
Expand Down Expand Up @@ -230,32 +226,55 @@ private void transferTabletRowForClientServerModel(
}

final String currentFolder = currentStr.toString();

StatusCode currentQuality =
Objects.isNull(sink.valueName) ? StatusCode.GOOD : StatusCode.UNCERTAIN;
UaVariableNode valueNode = null;
Object value = null;
long timestamp = 0;

for (int i = 0; i < measurementSchemas.size(); ++i) {
if (Objects.isNull(values.get(i))) {
continue;
}
final String name = measurementSchemas.get(i).getMeasurementName();
final TSDataType type = measurementSchemas.get(i).getType();
final NodeId nodeId = newNodeId(currentFolder + name);
if (Objects.nonNull(sink.qualityName) && sink.qualityName.equals(name)) {
if (!type.equals(TSDataType.BOOLEAN)) {
throw new UnsupportedOperationException(
"The quality value only supports boolean type, while true == GOOD and false == BAD.");
}
currentQuality = values.get(i) == Boolean.TRUE ? StatusCode.GOOD : StatusCode.BAD;
continue;
}
if (Objects.nonNull(sink.valueName) && !sink.valueName.equals(name)) {
throw new UnsupportedOperationException(
"When the 'with-quality' mode is enabled, the measurement must be either \"value-name\" or \"quality-name\"");
}
final String nodeName = Objects.isNull(sink.valueName) ? name : segments[segments.length - 1];
final NodeId nodeId = newNodeId(currentFolder + nodeName);
final UaVariableNode measurementNode;
if (!getNodeManager().containsNode(nodeId)) {
measurementNode =
new UaVariableNode.UaVariableNodeBuilder(getNodeContext())
.setNodeId(newNodeId(currentFolder + name))
.setNodeId(nodeId)
.setAccessLevel(AccessLevel.READ_WRITE)
.setUserAccessLevel(AccessLevel.READ_ONLY)
.setBrowseName(newQualifiedName(name))
.setDisplayName(LocalizedText.english(name))
.setBrowseName(newQualifiedName(nodeName))
.setDisplayName(LocalizedText.english(nodeName))
.setDataType(convertToOpcDataType(type))
.setTypeDefinition(Identifiers.BaseDataVariableType)
.build();
getNodeManager().addNode(measurementNode);
folderNode.addReference(
new Reference(
folderNode.getNodeId(),
Identifiers.Organizes,
measurementNode.getNodeId().expanded(),
true));
if (Objects.nonNull(folderNode)) {
folderNode.addReference(
new Reference(
folderNode.getNodeId(), Identifiers.Organizes, nodeId.expanded(), true));
} else {
measurementNode.addReference(
new Reference(
nodeId, Identifiers.Organizes, Identifiers.ObjectsFolder.expanded(), false));
}
} else {
// This must exist
measurementNode =
Expand All @@ -269,15 +288,30 @@ private void transferTabletRowForClientServerModel(
}

final long utcTimestamp = timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
if (Objects.isNull(measurementNode.getValue())
|| Objects.requireNonNull(measurementNode.getValue().getSourceTime()).getUtcTime()
< utcTimestamp) {
measurementNode.setValue(
if (Objects.isNull(sink.valueName)) {
if (Objects.isNull(measurementNode.getValue())
|| Objects.requireNonNull(measurementNode.getValue().getSourceTime()).getUtcTime()
< utcTimestamp) {
measurementNode.setValue(
new DataValue(
new Variant(values.get(i)),
currentQuality,
new DateTime(utcTimestamp),
new DateTime()));
}
} else {
valueNode = measurementNode;
value = values.get(i);
timestamp = utcTimestamp;
}
}
if (Objects.nonNull(valueNode)) {
if (Objects.isNull(valueNode.getValue())
|| Objects.requireNonNull(valueNode.getValue().getSourceTime()).getUtcTime()
< timestamp) {
valueNode.setValue(
new DataValue(
new Variant(values.get(i)),
StatusCode.GOOD,
new DateTime(utcTimestamp),
new DateTime()));
new Variant(value), currentQuality, new DateTime(timestamp), new DateTime()));
}
}
}
Expand Down Expand Up @@ -319,8 +353,8 @@ private static long timestampToUtc(final long timeStamp) {
* @param tablet the tablet to send
* @throws UaException if failed to create {@link Event}
*/
private void transferTabletForPubSubModel(final Tablet tablet, final boolean isTableModel)
throws UaException {
private void transferTabletForPubSubModel(
final Tablet tablet, final boolean isTableModel, final OpcUaSink sink) throws UaException {
final BaseEventTypeNode eventNode =
getServer()
.getEventFactory()
Expand All @@ -331,11 +365,11 @@ private void transferTabletForPubSubModel(final Tablet tablet, final boolean isT
if (isTableModel) {
sourceNameList = new ArrayList<>(tablet.getRowSize());
for (int i = 0; i < tablet.getRowSize(); ++i) {
final StringBuilder idBuilder = new StringBuilder(databaseName);
final StringBuilder idBuilder = new StringBuilder(sink.unQualifiedDatabaseName);
for (final Object segment : tablet.getDeviceID(i).getSegments()) {
idBuilder
.append(TsFileConstant.PATH_SEPARATOR)
.append(Objects.isNull(segment) ? placeHolder : segment);
.append(Objects.isNull(segment) ? sink.placeHolder : segment);
}
sourceNameList.add(idBuilder.toString());
}
Expand Down
Loading