Skip to content

Commit 9a0b216

Browse files
committed
qualification
1 parent 02e3517 commit 9a0b216

File tree

4 files changed

+127
-64
lines changed

4 files changed

+127
-64
lines changed

iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -367,24 +367,32 @@ public PipeParameters addOrReplaceEquivalentAttributesWithClone(final PipeParame
367367

368368
private static class KeyReducer {
369369

370-
private static final Set<String> PREFIXES = new HashSet<>();
370+
private static final Set<String> FIRST_PREFIXES = new HashSet<>();
371+
private static final Set<String> SECOND_PREFIXES = new HashSet<>();
371372

372373
static {
373-
PREFIXES.add("extractor.");
374-
PREFIXES.add("source.");
375-
PREFIXES.add("processor.");
376-
PREFIXES.add("connector.");
377-
PREFIXES.add("sink.");
374+
FIRST_PREFIXES.add("extractor.");
375+
FIRST_PREFIXES.add("source.");
376+
FIRST_PREFIXES.add("processor.");
377+
FIRST_PREFIXES.add("connector.");
378+
FIRST_PREFIXES.add("sink.");
379+
380+
SECOND_PREFIXES.add("opcua.");
378381
}
379382

380-
static String reduce(final String key) {
383+
static String reduce(String key) {
381384
if (key == null) {
382385
return null;
383386
}
384387
final String lowerCaseKey = key.toLowerCase();
385-
for (final String prefix : PREFIXES) {
388+
for (final String prefix : FIRST_PREFIXES) {
389+
if (lowerCaseKey.startsWith(prefix)) {
390+
key = key.substring(prefix.length());
391+
}
392+
}
393+
for (final String prefix : SECOND_PREFIXES) {
386394
if (lowerCaseKey.startsWith(prefix)) {
387-
return key.substring(prefix.length());
395+
key = key.substring(prefix.length());
388396
}
389397
}
390398
return key;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java

Lines changed: 50 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
2323
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
24-
import org.apache.iotdb.commons.utils.PathUtils;
2524
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
2625
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
2726
import org.apache.iotdb.db.utils.DateTimeUtils;
@@ -70,22 +69,14 @@
7069

7170
public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
7271
public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
73-
private final boolean isClientServerModel;
7472
private final SubscriptionModel subscriptionModel;
7573
private final OpcUaServerBuilder builder;
76-
private final String databaseName;
7774
private final String placeHolder;
7875

7976
OpcUaNameSpace(
80-
final OpcUaServer server,
81-
final boolean isClientServerModel,
82-
final OpcUaServerBuilder builder,
83-
final String qualifiedDatabaseName,
84-
final String placeHolder) {
77+
final OpcUaServer server, final OpcUaServerBuilder builder, final String placeHolder) {
8578
super(server, NAMESPACE_URI);
86-
this.isClientServerModel = isClientServerModel;
8779
this.builder = builder;
88-
this.databaseName = PathUtils.unQualifyDatabaseName(qualifiedDatabaseName);
8980
this.placeHolder = placeHolder;
9081

9182
subscriptionModel = new SubscriptionModel(server, this);
@@ -106,15 +97,17 @@ public void shutdown() {
10697
});
10798
}
10899

109-
void transfer(final Tablet tablet, final boolean isTableModel) throws UaException {
110-
if (isClientServerModel) {
111-
transferTabletForClientServerModel(tablet, isTableModel);
100+
void transfer(final Tablet tablet, final boolean isTableModel, final OpcUaSink sink)
101+
throws UaException {
102+
if (sink.isClientServerModel) {
103+
transferTabletForClientServerModel(tablet, isTableModel, sink);
112104
} else {
113-
transferTabletForPubSubModel(tablet, isTableModel);
105+
transferTabletForPubSubModel(tablet, isTableModel, sink);
114106
}
115107
}
116108

117-
private void transferTabletForClientServerModel(final Tablet tablet, final boolean isTableModel) {
109+
private void transferTabletForClientServerModel(
110+
final Tablet tablet, final boolean isTableModel, final OpcUaSink sink) {
118111
final List<IMeasurementSchema> schemas = tablet.getSchemas();
119112
final List<IMeasurementSchema> newSchemas = new ArrayList<>();
120113
if (!isTableModel) {
@@ -136,7 +129,7 @@ private void transferTabletForClientServerModel(final Tablet tablet, final boole
136129
}
137130

138131
transferTabletRowForClientServerModel(
139-
tablet.getDeviceId().split("\\."), newSchemas, timestamps, values);
132+
tablet.getDeviceId().split("\\."), newSchemas, timestamps, values, sink);
140133
} else {
141134
new PipeTableModelTabletEventSorter(tablet).sortByTimestampIfNecessary();
142135

@@ -151,7 +144,7 @@ private void transferTabletForClientServerModel(final Tablet tablet, final boole
151144
for (int i = 0; i < tablet.getRowSize(); ++i) {
152145
final Object[] segments = tablet.getDeviceID(i).getSegments();
153146
final String[] folderSegments = new String[segments.length + 1];
154-
folderSegments[0] = databaseName;
147+
folderSegments[0] = sink.unQualifiedDatabaseName;
155148

156149
for (int j = 0; j < segments.length; ++j) {
157150
folderSegments[j + 1] = Objects.isNull(segments[j]) ? placeHolder : (String) segments[j];
@@ -169,7 +162,8 @@ private void transferTabletForClientServerModel(final Tablet tablet, final boole
169162
? null
170163
: getTabletObjectValue4Opc(
171164
tablet.getValues()[index], finalI, schemas.get(index).getType()))
172-
.collect(Collectors.toList()));
165+
.collect(Collectors.toList()),
166+
sink);
173167
}
174168
}
175169
}
@@ -178,14 +172,18 @@ private void transferTabletRowForClientServerModel(
178172
final String[] segments,
179173
final List<IMeasurementSchema> measurementSchemas,
180174
final List<Long> timestamps,
181-
final List<Object> values) {
175+
final List<Object> values,
176+
final OpcUaSink sink) {
182177
if (segments.length == 0) {
183178
throw new PipeRuntimeCriticalException("The segments of tablets must exist");
184179
}
185180
final StringBuilder currentStr = new StringBuilder();
186181
UaNode folderNode = null;
187182
NodeId folderNodeId;
188-
for (final String segment : segments) {
183+
for (int i = 0;
184+
i < (Objects.isNull(sink.valueName) ? segments.length : segments.length - 1);
185+
++i) {
186+
final String segment = segments[i];
189187
final UaNode nextFolderNode;
190188

191189
currentStr.append(segment);
@@ -230,32 +228,50 @@ private void transferTabletRowForClientServerModel(
230228
}
231229

232230
final String currentFolder = currentStr.toString();
231+
StatusCode currentQuality =
232+
Objects.isNull(sink.valueName) ? StatusCode.GOOD : StatusCode.UNCERTAIN;
233233
for (int i = 0; i < measurementSchemas.size(); ++i) {
234234
if (Objects.isNull(values.get(i))) {
235235
continue;
236236
}
237237
final String name = measurementSchemas.get(i).getMeasurementName();
238238
final TSDataType type = measurementSchemas.get(i).getType();
239-
final NodeId nodeId = newNodeId(currentFolder + name);
239+
if (Objects.nonNull(sink.qualityName) && sink.qualityName.equals(name)) {
240+
if (!type.equals(TSDataType.BOOLEAN)) {
241+
throw new UnsupportedOperationException(
242+
"The quality value only supports boolean type, while true == GOOD and false == BAD.");
243+
}
244+
currentQuality = values.get(i) == Boolean.TRUE ? StatusCode.GOOD : StatusCode.BAD;
245+
continue;
246+
}
247+
if (Objects.nonNull(sink.valueName) && !sink.valueName.equals(name)) {
248+
throw new UnsupportedOperationException(
249+
"When the 'with-quality' mode is enabled, the measurement must be either \"value-name\" or \"quality-name\"");
250+
}
251+
final String nodeName = Objects.isNull(sink.valueName) ? name : segments[segments.length - 1];
252+
final NodeId nodeId = newNodeId(currentFolder + nodeName);
240253
final UaVariableNode measurementNode;
241254
if (!getNodeManager().containsNode(nodeId)) {
242255
measurementNode =
243256
new UaVariableNode.UaVariableNodeBuilder(getNodeContext())
244-
.setNodeId(newNodeId(currentFolder + name))
257+
.setNodeId(nodeId)
245258
.setAccessLevel(AccessLevel.READ_WRITE)
246259
.setUserAccessLevel(AccessLevel.READ_ONLY)
247-
.setBrowseName(newQualifiedName(name))
248-
.setDisplayName(LocalizedText.english(name))
260+
.setBrowseName(newQualifiedName(nodeName))
261+
.setDisplayName(LocalizedText.english(nodeName))
249262
.setDataType(convertToOpcDataType(type))
250263
.setTypeDefinition(Identifiers.BaseDataVariableType)
251264
.build();
252265
getNodeManager().addNode(measurementNode);
253-
folderNode.addReference(
254-
new Reference(
255-
folderNode.getNodeId(),
256-
Identifiers.Organizes,
257-
measurementNode.getNodeId().expanded(),
258-
true));
266+
if (Objects.nonNull(folderNode)) {
267+
folderNode.addReference(
268+
new Reference(
269+
folderNode.getNodeId(), Identifiers.Organizes, nodeId.expanded(), true));
270+
} else {
271+
measurementNode.addReference(
272+
new Reference(
273+
nodeId, Identifiers.Organizes, Identifiers.ObjectsFolder.expanded(), false));
274+
}
259275
} else {
260276
// This must exist
261277
measurementNode =
@@ -275,7 +291,7 @@ private void transferTabletRowForClientServerModel(
275291
measurementNode.setValue(
276292
new DataValue(
277293
new Variant(values.get(i)),
278-
StatusCode.GOOD,
294+
currentQuality,
279295
new DateTime(utcTimestamp),
280296
new DateTime()));
281297
}
@@ -319,8 +335,8 @@ private static long timestampToUtc(final long timeStamp) {
319335
* @param tablet the tablet to send
320336
* @throws UaException if failed to create {@link Event}
321337
*/
322-
private void transferTabletForPubSubModel(final Tablet tablet, final boolean isTableModel)
323-
throws UaException {
338+
private void transferTabletForPubSubModel(
339+
final Tablet tablet, final boolean isTableModel, final OpcUaSink sink) throws UaException {
324340
final BaseEventTypeNode eventNode =
325341
getServer()
326342
.getEventFactory()
@@ -331,7 +347,7 @@ private void transferTabletForPubSubModel(final Tablet tablet, final boolean isT
331347
if (isTableModel) {
332348
sourceNameList = new ArrayList<>(tablet.getRowSize());
333349
for (int i = 0; i < tablet.getRowSize(); ++i) {
334-
final StringBuilder idBuilder = new StringBuilder(databaseName);
350+
final StringBuilder idBuilder = new StringBuilder(sink.unQualifiedDatabaseName);
335351
for (final Object segment : tablet.getDeviceID(i).getSegments()) {
336352
idBuilder
337353
.append(TsFileConstant.PATH_SEPARATOR)

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java

Lines changed: 48 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.pipe.sink.protocol.opcua;
2121

2222
import org.apache.iotdb.commons.consensus.DataRegionId;
23+
import org.apache.iotdb.commons.utils.PathUtils;
2324
import org.apache.iotdb.db.conf.IoTDBConfig;
2425
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
2526
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -41,6 +42,8 @@
4142
import org.slf4j.Logger;
4243
import org.slf4j.LoggerFactory;
4344

45+
import javax.annotation.Nullable;
46+
4447
import java.io.File;
4548
import java.util.Arrays;
4649
import java.util.Map;
@@ -63,19 +66,26 @@
6366
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE;
6467
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_DEFAULT_VALUE;
6568
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_KEY;
69+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_KEY;
6670
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
6771
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_KEY;
6872
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
6973
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
74+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_KEY;
75+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE;
76+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_KEY;
7077
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY;
7178
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY;
7279
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY;
7380
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
7481
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY;
7582
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_MODEL_KEY;
7683
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_PLACEHOLDER_KEY;
84+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_QUALITY_NAME_KEY;
7785
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_DIR_KEY;
7886
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY;
87+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_VALUE_NAME_KEY;
88+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_WITH_QUALITY_KEY;
7989

8090
/**
8191
* Send data in IoTDB based on Opc Ua protocol, using Eclipse Milo. All data are converted into
@@ -95,6 +105,11 @@ public class OpcUaSink implements PipeConnector {
95105
SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP = new ConcurrentHashMap<>();
96106

97107
private String serverKey;
108+
boolean isClientServerModel;
109+
String unQualifiedDatabaseName;
110+
String placeHolder;
111+
@Nullable String valueName;
112+
@Nullable String qualityName;
98113
private OpcUaNameSpace nameSpace;
99114

100115
@Override
@@ -156,10 +171,40 @@ public void customize(
156171
CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY,
157172
SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY),
158173
CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE);
159-
final String placeHolder =
174+
placeHolder =
160175
parameters.getStringOrDefault(
161176
Arrays.asList(CONNECTOR_OPC_UA_PLACEHOLDER_KEY, SINK_OPC_UA_PLACEHOLDER_KEY),
162177
CONNECTOR_OPC_UA_PLACEHOLDER_DEFAULT_VALUE);
178+
final boolean withQuality =
179+
parameters.getBooleanOrDefault(
180+
Arrays.asList(CONNECTOR_OPC_UA_WITH_QUALITY_KEY, SINK_OPC_UA_WITH_QUALITY_KEY),
181+
CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE);
182+
valueName =
183+
withQuality
184+
? parameters.getStringOrDefault(
185+
Arrays.asList(CONNECTOR_OPC_UA_VALUE_NAME_KEY, SINK_OPC_UA_VALUE_NAME_KEY),
186+
CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
187+
: null;
188+
qualityName =
189+
withQuality
190+
? parameters.getStringOrDefault(
191+
Arrays.asList(CONNECTOR_OPC_UA_QUALITY_NAME_KEY, SINK_OPC_UA_QUALITY_NAME_KEY),
192+
CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
193+
: null;
194+
isClientServerModel =
195+
parameters
196+
.getStringOrDefault(
197+
Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY),
198+
CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
199+
.equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE);
200+
201+
final DataRegion region =
202+
StorageEngine.getInstance()
203+
.getDataRegion(new DataRegionId(configuration.getRuntimeEnvironment().getRegionId()));
204+
unQualifiedDatabaseName =
205+
Objects.nonNull(region)
206+
? PathUtils.unQualifyDatabaseName(region.getDatabaseName())
207+
: "__temp_db";
163208

164209
synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) {
165210
serverKey = httpsBindPort + ":" + tcpBindPort;
@@ -180,25 +225,7 @@ public void customize(
180225
.setSecurityDir(securityDir)
181226
.setEnableAnonymousAccess(enableAnonymousAccess);
182227
final OpcUaServer newServer = builder.build();
183-
final DataRegion region =
184-
StorageEngine.getInstance()
185-
.getDataRegion(
186-
new DataRegionId(
187-
configuration.getRuntimeEnvironment().getRegionId()));
188-
nameSpace =
189-
new OpcUaNameSpace(
190-
newServer,
191-
parameters
192-
.getStringOrDefault(
193-
Arrays.asList(
194-
CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY),
195-
CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
196-
.equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE),
197-
builder,
198-
Objects.nonNull(region)
199-
? region.getDatabaseName()
200-
: "root.__temp_db",
201-
placeHolder);
228+
nameSpace = new OpcUaNameSpace(newServer, builder, placeHolder);
202229
nameSpace.startup();
203230
newServer.startup().get();
204231
return new Pair<>(new AtomicInteger(0), nameSpace);
@@ -239,7 +266,7 @@ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exc
239266
transferByTablet(
240267
tabletInsertionEvent,
241268
LOGGER,
242-
(tablet, isTableModel) -> nameSpace.transfer(tablet, isTableModel));
269+
(tablet, isTableModel) -> nameSpace.transfer(tablet, isTableModel, this));
243270
}
244271

245272
public static void transferByTablet(

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,18 @@ public class PipeSinkConstant {
182182
public static final String SINK_OPC_UA_PLACEHOLDER_KEY = "sink.opcua.placeholder";
183183
public static final String CONNECTOR_OPC_UA_PLACEHOLDER_DEFAULT_VALUE = "null";
184184

185+
public static final String CONNECTOR_OPC_UA_WITH_QUALITY_KEY = "connector.opcua.with-quality";
186+
public static final String SINK_OPC_UA_WITH_QUALITY_KEY = "sink.opcua.with-quality";
187+
public static final boolean CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE = false;
188+
189+
public static final String CONNECTOR_OPC_UA_VALUE_NAME_KEY = "connector.opcua.value-name";
190+
public static final String SINK_OPC_UA_VALUE_NAME_KEY = "sink.opcua.value-name";
191+
public static final String CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE = "value";
192+
193+
public static final String CONNECTOR_OPC_UA_QUALITY_NAME_KEY = "connector.opcua.quality-name";
194+
public static final String SINK_OPC_UA_QUALITY_NAME_KEY = "sink.opcua.quality-name";
195+
public static final String CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE = "quality";
196+
185197
public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY = "connector.leader-cache.enable";
186198
public static final String SINK_LEADER_CACHE_ENABLE_KEY = "sink.leader-cache.enable";
187199
public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = true;

0 commit comments

Comments
 (0)