Skip to content

Commit ca26dd3

Browse files
authored
Pipe: Optimized the key reducer logic & OPC UA sink reuse logic & value + quality in OPC UA tree client-server model (apache#16921)
1 parent a572c1c commit ca26dd3

File tree

7 files changed

+312
-78
lines changed

7 files changed

+312
-78
lines changed

iotdb-api/pipe-api/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@
3535
<version>${tsfile.version}</version>
3636
<scope>provided</scope>
3737
</dependency>
38+
<dependency>
39+
<groupId>junit</groupId>
40+
<artifactId>junit</artifactId>
41+
<scope>test</scope>
42+
</dependency>
3843
</dependencies>
3944
<profiles>
4045
<profile>

iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -367,24 +367,35 @@ public PipeParameters addOrReplaceEquivalentAttributesWithClone(final PipeParame
367367

368368
private static class KeyReducer {
369369

370-
private static final Set<String> PREFIXES = new HashSet<>();
370+
private static final Set<String> FIRST_PREFIXES = new HashSet<>();
371+
private static final Set<String> SECOND_PREFIXES = new HashSet<>();
371372

372373
static {
373-
PREFIXES.add("extractor.");
374-
PREFIXES.add("source.");
375-
PREFIXES.add("processor.");
376-
PREFIXES.add("connector.");
377-
PREFIXES.add("sink.");
374+
FIRST_PREFIXES.add("extractor.");
375+
FIRST_PREFIXES.add("source.");
376+
FIRST_PREFIXES.add("processor.");
377+
FIRST_PREFIXES.add("connector.");
378+
FIRST_PREFIXES.add("sink.");
379+
380+
SECOND_PREFIXES.add("opcua.");
378381
}
379382

380-
static String reduce(final String key) {
383+
static String reduce(String key) {
381384
if (key == null) {
382385
return null;
383386
}
384-
final String lowerCaseKey = key.toLowerCase();
385-
for (final String prefix : PREFIXES) {
387+
String lowerCaseKey = key.toLowerCase();
388+
for (final String prefix : FIRST_PREFIXES) {
389+
if (lowerCaseKey.startsWith(prefix)) {
390+
key = key.substring(prefix.length());
391+
lowerCaseKey = lowerCaseKey.substring(prefix.length());
392+
break;
393+
}
394+
}
395+
for (final String prefix : SECOND_PREFIXES) {
386396
if (lowerCaseKey.startsWith(prefix)) {
387-
return key.substring(prefix.length());
397+
key = key.substring(prefix.length());
398+
break;
388399
}
389400
}
390401
return key;
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.pipe.api.customizer.parameter;
21+
22+
import org.junit.Assert;
23+
import org.junit.Test;
24+
25+
import java.util.HashMap;
26+
27+
public class PipeParametersTest {
28+
29+
@Test
30+
public void keyReducerTest() {
31+
final PipeParameters parameters = new PipeParameters(new HashMap<>());
32+
parameters.addAttribute("sink.opcua.with-quality", "false");
33+
34+
Assert.assertEquals(false, parameters.getBoolean("with-quality"));
35+
Assert.assertEquals(false, parameters.getBoolean("opcua.with-quality"));
36+
37+
// Invalid
38+
parameters.addAttribute("sink.source.opcua.value-name", "false");
39+
parameters.addAttribute("opcua.sink.value-name", "false");
40+
Assert.assertNull(parameters.getString("value-name"));
41+
}
42+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java

Lines changed: 81 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
2323
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
24-
import org.apache.iotdb.commons.utils.PathUtils;
2524
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
2625
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
2726
import org.apache.iotdb.db.utils.DateTimeUtils;
@@ -70,23 +69,12 @@
7069

7170
public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
7271
public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
73-
private final boolean isClientServerModel;
7472
private final SubscriptionModel subscriptionModel;
7573
private final OpcUaServerBuilder builder;
76-
private final String databaseName;
77-
private final String placeHolder;
78-
79-
OpcUaNameSpace(
80-
final OpcUaServer server,
81-
final boolean isClientServerModel,
82-
final OpcUaServerBuilder builder,
83-
final String qualifiedDatabaseName,
84-
final String placeHolder) {
74+
75+
OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder builder) {
8576
super(server, NAMESPACE_URI);
86-
this.isClientServerModel = isClientServerModel;
8777
this.builder = builder;
88-
this.databaseName = PathUtils.unQualifyDatabaseName(qualifiedDatabaseName);
89-
this.placeHolder = placeHolder;
9078

9179
subscriptionModel = new SubscriptionModel(server, this);
9280
getLifecycleManager().addLifecycle(subscriptionModel);
@@ -106,15 +94,17 @@ public void shutdown() {
10694
});
10795
}
10896

109-
void transfer(final Tablet tablet, final boolean isTableModel) throws UaException {
110-
if (isClientServerModel) {
111-
transferTabletForClientServerModel(tablet, isTableModel);
97+
void transfer(final Tablet tablet, final boolean isTableModel, final OpcUaSink sink)
98+
throws UaException {
99+
if (sink.isClientServerModel) {
100+
transferTabletForClientServerModel(tablet, isTableModel, sink);
112101
} else {
113-
transferTabletForPubSubModel(tablet, isTableModel);
102+
transferTabletForPubSubModel(tablet, isTableModel, sink);
114103
}
115104
}
116105

117-
private void transferTabletForClientServerModel(final Tablet tablet, final boolean isTableModel) {
106+
private void transferTabletForClientServerModel(
107+
final Tablet tablet, final boolean isTableModel, final OpcUaSink sink) {
118108
final List<IMeasurementSchema> schemas = tablet.getSchemas();
119109
final List<IMeasurementSchema> newSchemas = new ArrayList<>();
120110
if (!isTableModel) {
@@ -136,7 +126,7 @@ private void transferTabletForClientServerModel(final Tablet tablet, final boole
136126
}
137127

138128
transferTabletRowForClientServerModel(
139-
tablet.getDeviceId().split("\\."), newSchemas, timestamps, values);
129+
tablet.getDeviceId().split("\\."), newSchemas, timestamps, values, sink);
140130
} else {
141131
new PipeTableModelTabletEventSorter(tablet).sortByTimestampIfNecessary();
142132

@@ -151,10 +141,11 @@ private void transferTabletForClientServerModel(final Tablet tablet, final boole
151141
for (int i = 0; i < tablet.getRowSize(); ++i) {
152142
final Object[] segments = tablet.getDeviceID(i).getSegments();
153143
final String[] folderSegments = new String[segments.length + 1];
154-
folderSegments[0] = databaseName;
144+
folderSegments[0] = sink.unQualifiedDatabaseName;
155145

156146
for (int j = 0; j < segments.length; ++j) {
157-
folderSegments[j + 1] = Objects.isNull(segments[j]) ? placeHolder : (String) segments[j];
147+
folderSegments[j + 1] =
148+
Objects.isNull(segments[j]) ? sink.placeHolder : (String) segments[j];
158149
}
159150

160151
final int finalI = i;
@@ -169,7 +160,8 @@ private void transferTabletForClientServerModel(final Tablet tablet, final boole
169160
? null
170161
: getTabletObjectValue4Opc(
171162
tablet.getValues()[index], finalI, schemas.get(index).getType()))
172-
.collect(Collectors.toList()));
163+
.collect(Collectors.toList()),
164+
sink);
173165
}
174166
}
175167
}
@@ -178,14 +170,18 @@ private void transferTabletRowForClientServerModel(
178170
final String[] segments,
179171
final List<IMeasurementSchema> measurementSchemas,
180172
final List<Long> timestamps,
181-
final List<Object> values) {
173+
final List<Object> values,
174+
final OpcUaSink sink) {
182175
if (segments.length == 0) {
183176
throw new PipeRuntimeCriticalException("The segments of tablets must exist");
184177
}
185178
final StringBuilder currentStr = new StringBuilder();
186179
UaNode folderNode = null;
187180
NodeId folderNodeId;
188-
for (final String segment : segments) {
181+
for (int i = 0;
182+
i < (Objects.isNull(sink.valueName) ? segments.length : segments.length - 1);
183+
++i) {
184+
final String segment = segments[i];
189185
final UaNode nextFolderNode;
190186

191187
currentStr.append(segment);
@@ -230,32 +226,55 @@ private void transferTabletRowForClientServerModel(
230226
}
231227

232228
final String currentFolder = currentStr.toString();
229+
230+
StatusCode currentQuality =
231+
Objects.isNull(sink.valueName) ? StatusCode.GOOD : StatusCode.UNCERTAIN;
232+
UaVariableNode valueNode = null;
233+
Object value = null;
234+
long timestamp = 0;
235+
233236
for (int i = 0; i < measurementSchemas.size(); ++i) {
234237
if (Objects.isNull(values.get(i))) {
235238
continue;
236239
}
237240
final String name = measurementSchemas.get(i).getMeasurementName();
238241
final TSDataType type = measurementSchemas.get(i).getType();
239-
final NodeId nodeId = newNodeId(currentFolder + name);
242+
if (Objects.nonNull(sink.qualityName) && sink.qualityName.equals(name)) {
243+
if (!type.equals(TSDataType.BOOLEAN)) {
244+
throw new UnsupportedOperationException(
245+
"The quality value only supports boolean type, while true == GOOD and false == BAD.");
246+
}
247+
currentQuality = values.get(i) == Boolean.TRUE ? StatusCode.GOOD : StatusCode.BAD;
248+
continue;
249+
}
250+
if (Objects.nonNull(sink.valueName) && !sink.valueName.equals(name)) {
251+
throw new UnsupportedOperationException(
252+
"When the 'with-quality' mode is enabled, the measurement must be either \"value-name\" or \"quality-name\"");
253+
}
254+
final String nodeName = Objects.isNull(sink.valueName) ? name : segments[segments.length - 1];
255+
final NodeId nodeId = newNodeId(currentFolder + nodeName);
240256
final UaVariableNode measurementNode;
241257
if (!getNodeManager().containsNode(nodeId)) {
242258
measurementNode =
243259
new UaVariableNode.UaVariableNodeBuilder(getNodeContext())
244-
.setNodeId(newNodeId(currentFolder + name))
260+
.setNodeId(nodeId)
245261
.setAccessLevel(AccessLevel.READ_WRITE)
246262
.setUserAccessLevel(AccessLevel.READ_ONLY)
247-
.setBrowseName(newQualifiedName(name))
248-
.setDisplayName(LocalizedText.english(name))
263+
.setBrowseName(newQualifiedName(nodeName))
264+
.setDisplayName(LocalizedText.english(nodeName))
249265
.setDataType(convertToOpcDataType(type))
250266
.setTypeDefinition(Identifiers.BaseDataVariableType)
251267
.build();
252268
getNodeManager().addNode(measurementNode);
253-
folderNode.addReference(
254-
new Reference(
255-
folderNode.getNodeId(),
256-
Identifiers.Organizes,
257-
measurementNode.getNodeId().expanded(),
258-
true));
269+
if (Objects.nonNull(folderNode)) {
270+
folderNode.addReference(
271+
new Reference(
272+
folderNode.getNodeId(), Identifiers.Organizes, nodeId.expanded(), true));
273+
} else {
274+
measurementNode.addReference(
275+
new Reference(
276+
nodeId, Identifiers.Organizes, Identifiers.ObjectsFolder.expanded(), false));
277+
}
259278
} else {
260279
// This must exist
261280
measurementNode =
@@ -269,15 +288,30 @@ private void transferTabletRowForClientServerModel(
269288
}
270289

271290
final long utcTimestamp = timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
272-
if (Objects.isNull(measurementNode.getValue())
273-
|| Objects.requireNonNull(measurementNode.getValue().getSourceTime()).getUtcTime()
274-
< utcTimestamp) {
275-
measurementNode.setValue(
291+
if (Objects.isNull(sink.valueName)) {
292+
if (Objects.isNull(measurementNode.getValue())
293+
|| Objects.requireNonNull(measurementNode.getValue().getSourceTime()).getUtcTime()
294+
< utcTimestamp) {
295+
measurementNode.setValue(
296+
new DataValue(
297+
new Variant(values.get(i)),
298+
currentQuality,
299+
new DateTime(utcTimestamp),
300+
new DateTime()));
301+
}
302+
} else {
303+
valueNode = measurementNode;
304+
value = values.get(i);
305+
timestamp = utcTimestamp;
306+
}
307+
}
308+
if (Objects.nonNull(valueNode)) {
309+
if (Objects.isNull(valueNode.getValue())
310+
|| Objects.requireNonNull(valueNode.getValue().getSourceTime()).getUtcTime()
311+
< timestamp) {
312+
valueNode.setValue(
276313
new DataValue(
277-
new Variant(values.get(i)),
278-
StatusCode.GOOD,
279-
new DateTime(utcTimestamp),
280-
new DateTime()));
314+
new Variant(value), currentQuality, new DateTime(timestamp), new DateTime()));
281315
}
282316
}
283317
}
@@ -319,8 +353,8 @@ private static long timestampToUtc(final long timeStamp) {
319353
* @param tablet the tablet to send
320354
* @throws UaException if failed to create {@link Event}
321355
*/
322-
private void transferTabletForPubSubModel(final Tablet tablet, final boolean isTableModel)
323-
throws UaException {
356+
private void transferTabletForPubSubModel(
357+
final Tablet tablet, final boolean isTableModel, final OpcUaSink sink) throws UaException {
324358
final BaseEventTypeNode eventNode =
325359
getServer()
326360
.getEventFactory()
@@ -331,11 +365,11 @@ private void transferTabletForPubSubModel(final Tablet tablet, final boolean isT
331365
if (isTableModel) {
332366
sourceNameList = new ArrayList<>(tablet.getRowSize());
333367
for (int i = 0; i < tablet.getRowSize(); ++i) {
334-
final StringBuilder idBuilder = new StringBuilder(databaseName);
368+
final StringBuilder idBuilder = new StringBuilder(sink.unQualifiedDatabaseName);
335369
for (final Object segment : tablet.getDeviceID(i).getSegments()) {
336370
idBuilder
337371
.append(TsFileConstant.PATH_SEPARATOR)
338-
.append(Objects.isNull(segment) ? placeHolder : segment);
372+
.append(Objects.isNull(segment) ? sink.placeHolder : segment);
339373
}
340374
sourceNameList.add(idBuilder.toString());
341375
}

0 commit comments

Comments
 (0)