Skip to content

Commit efd189b

Browse files
committed
adjust
1 parent 4861ca3 commit efd189b

File tree

4 files changed

+60
-11
lines changed

4 files changed

+60
-11
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
4949
import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
5050
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
51+
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
5152
import org.slf4j.Logger;
5253
import org.slf4j.LoggerFactory;
5354

@@ -68,6 +69,10 @@
6869
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY;
6970
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
7071
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY;
72+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE;
73+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE;
74+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY;
75+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE;
7176
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE;
7277
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
7378
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE;
@@ -102,6 +107,7 @@
102107
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY;
103108
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY;
104109
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY;
110+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEFAULT_QUALITY_KEY;
105111
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
106112
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HISTORIZING_KEY;
107113
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY;
@@ -133,11 +139,12 @@ public class OpcUaSink implements PipeConnector {
133139
SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP = new ConcurrentHashMap<>();
134140

135141
private String serverKey;
136-
boolean isClientServerModel;
137-
String databaseName;
138-
String placeHolder;
139-
@Nullable String valueName;
140-
@Nullable String qualityName;
142+
private boolean isClientServerModel;
143+
private String databaseName;
144+
private String placeHolder;
145+
private @Nullable String valueName;
146+
private @Nullable String qualityName;
147+
private StatusCode defaultQuality;
141148

142149
// Inner server
143150
private @Nullable OpcUaNameSpace nameSpace;
@@ -203,6 +210,14 @@ public void customize(
203210
Arrays.asList(CONNECTOR_OPC_UA_QUALITY_NAME_KEY, SINK_OPC_UA_QUALITY_NAME_KEY),
204211
CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE)
205212
: null;
213+
defaultQuality =
214+
getQuality(
215+
withQuality
216+
? parameters.getStringOrDefault(
217+
Arrays.asList(
218+
CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY, SINK_OPC_UA_DEFAULT_QUALITY_KEY),
219+
CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE)
220+
: CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE);
206221
isClientServerModel =
207222
parameters
208223
.getStringOrDefault(
@@ -396,6 +411,19 @@ private SecurityPolicy getSecurityPolicy(final String securityPolicy) {
396411
}
397412
}
398413

414+
private StatusCode getQuality(final String quality) {
415+
switch (quality.toUpperCase()) {
416+
case CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE:
417+
return StatusCode.GOOD;
418+
case CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE:
419+
return StatusCode.BAD;
420+
case CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE:
421+
return StatusCode.UNCERTAIN;
422+
default:
423+
throw new PipeException("The default quality can only be 'GOOD', 'BAD' or 'UNCERTAIN'.");
424+
}
425+
}
426+
399427
@Override
400428
public void handshake() throws Exception {
401429
// Server side, do nothing
@@ -538,4 +566,8 @@ public String getValueName() {
538566
public String getQualityName() {
539567
return qualityName;
540568
}
569+
570+
public StatusCode getDefaultQuality() {
571+
return defaultQuality;
572+
}
541573
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
2121

22+
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
2223
import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
2324
import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
2425
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -50,6 +51,8 @@
5051
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
5152
import org.eclipse.milo.opcua.stack.core.types.structured.ObjectAttributes;
5253
import org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
54+
import org.slf4j.Logger;
55+
import org.slf4j.LoggerFactory;
5356

5457
import java.util.ArrayList;
5558
import java.util.Arrays;
@@ -61,6 +64,7 @@
6164
import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
6265

6366
public class IoTDBOpcUaClient {
67+
private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaNameSpace.class);
6468
private static final int NAME_SPACE_INDEX = 2;
6569
private final String nodeUrl;
6670

@@ -99,8 +103,7 @@ private void transferTabletRowForClientServerModel(
99103
final List<Object> values,
100104
final OpcUaSink sink)
101105
throws Exception {
102-
StatusCode currentQuality =
103-
Objects.isNull(sink.getValueName()) ? StatusCode.GOOD : StatusCode.UNCERTAIN;
106+
StatusCode currentQuality = sink.getDefaultQuality();
104107
Object value = null;
105108
long timestamp = 0;
106109
NodeId nodeId = null;
@@ -121,8 +124,10 @@ private void transferTabletRowForClientServerModel(
121124
continue;
122125
}
123126
if (Objects.nonNull(sink.getValueName()) && !sink.getValueName().equals(name)) {
124-
throw new UnsupportedOperationException(
127+
PipeLogger.log(
128+
LOGGER::warn,
125129
"When the 'with-quality' mode is enabled, the measurement must be either \"value-name\" or \"quality-name\"");
130+
continue;
126131
}
127132
nodeId = new NodeId(NAME_SPACE_INDEX, String.join("/", segments));
128133

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
2323
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
24+
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
2425
import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
2526
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
2627
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
@@ -56,6 +57,8 @@
5657
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
5758
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
5859
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
60+
import org.slf4j.Logger;
61+
import org.slf4j.LoggerFactory;
5962

6063
import java.nio.file.Paths;
6164
import java.sql.Date;
@@ -71,6 +74,7 @@
7174
import java.util.stream.Collectors;
7275

7376
public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
77+
private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaNameSpace.class);
7478
public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
7579
private final SubscriptionModel subscriptionModel;
7680
private final OpcUaServerBuilder builder;
@@ -245,8 +249,7 @@ private void transferTabletRowForClientServerModel(
245249

246250
final String currentFolder = currentStr.toString();
247251

248-
StatusCode currentQuality =
249-
Objects.isNull(sink.getValueName()) ? StatusCode.GOOD : StatusCode.UNCERTAIN;
252+
StatusCode currentQuality = sink.getDefaultQuality();
250253
UaVariableNode valueNode = null;
251254
Object value = null;
252255
long timestamp = 0;
@@ -266,8 +269,10 @@ private void transferTabletRowForClientServerModel(
266269
continue;
267270
}
268271
if (Objects.nonNull(sink.getValueName()) && !sink.getValueName().equals(name)) {
269-
throw new UnsupportedOperationException(
272+
PipeLogger.log(
273+
LOGGER::warn,
270274
"When the 'with-quality' mode is enabled, the measurement must be either \"value-name\" or \"quality-name\"");
275+
continue;
271276
}
272277
final String nodeName =
273278
Objects.isNull(sink.getValueName()) ? name : segments[segments.length - 1];

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,13 @@ public class PipeSinkConstant {
195195
public static final String SINK_OPC_UA_QUALITY_NAME_KEY = "sink.opcua.quality-name";
196196
public static final String CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE = "quality";
197197

198+
public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY =
199+
"connector.opcua.default-quality";
200+
public static final String SINK_OPC_UA_DEFAULT_QUALITY_KEY = "sink.opcua.default-quality";
201+
public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE = "GOOD";
202+
public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE = "BAD";
203+
public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE = "UNCERTAIN";
204+
198205
public static final String CONNECTOR_OPC_UA_NODE_URL_KEY = "connector.opcua.node-url";
199206
public static final String SINK_OPC_UA_NODE_URL_KEY = "sink.opcua.node-url";
200207

0 commit comments

Comments
 (0)