Skip to content

Commit 4a1ad70

Browse files
committed
sit-sit
1 parent 1f07ae3 commit 4a1ad70

File tree

2 files changed

+48
-37
lines changed

2 files changed

+48
-37
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -178,17 +178,50 @@ public void validate(final PipeParameterValidator validator) throws Exception {
178178
public void customize(
179179
final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration)
180180
throws Exception {
181+
final boolean withQuality =
182+
parameters.getBooleanOrDefault(
183+
Arrays.asList(CONNECTOR_OPC_UA_WITH_QUALITY_KEY, SINK_OPC_UA_WITH_QUALITY_KEY),
184+
CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE);
185+
valueName =
186+
withQuality
187+
? parameters.getStringOrDefault(
188+
Arrays.asList(CONNECTOR_OPC_UA_VALUE_NAME_KEY, SINK_OPC_UA_VALUE_NAME_KEY),
189+
CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE)
190+
: null;
191+
qualityName =
192+
withQuality
193+
? parameters.getStringOrDefault(
194+
Arrays.asList(CONNECTOR_OPC_UA_QUALITY_NAME_KEY, SINK_OPC_UA_QUALITY_NAME_KEY),
195+
CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE)
196+
: null;
197+
isClientServerModel =
198+
parameters
199+
.getStringOrDefault(
200+
Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY),
201+
CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
202+
.equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE);
203+
placeHolder =
204+
parameters.getStringOrDefault(
205+
Arrays.asList(CONNECTOR_OPC_UA_PLACEHOLDER_KEY, SINK_OPC_UA_PLACEHOLDER_KEY),
206+
CONNECTOR_OPC_UA_PLACEHOLDER_DEFAULT_VALUE);
207+
final DataRegion region =
208+
StorageEngine.getInstance()
209+
.getDataRegion(new DataRegionId(configuration.getRuntimeEnvironment().getRegionId()));
210+
unQualifiedDatabaseName =
211+
Objects.nonNull(region)
212+
? PathUtils.unQualifyDatabaseName(region.getDatabaseName())
213+
: "__temp_db";
214+
181215
final String nodeUrl =
182216
parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY, SINK_OPC_UA_NODE_URL_KEY);
183217
if (Objects.isNull(nodeUrl)) {
184-
customizeServer(parameters, configuration);
218+
customizeServer(parameters);
185219
} else {
186220
customizeClient(nodeUrl, parameters);
187221
}
188222
}
189223

190-
private void customizeServer(
191-
final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration) {
224+
private void customizeServer(final PipeParameters parameters) {
192225
final int tcpBindPort =
193226
parameters.getIntOrDefault(
194227
Arrays.asList(CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY, SINK_OPC_UA_TCP_BIND_PORT_KEY),
@@ -225,40 +258,6 @@ private void customizeServer(
225258
CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY,
226259
SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY),
227260
CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE);
228-
placeHolder =
229-
parameters.getStringOrDefault(
230-
Arrays.asList(CONNECTOR_OPC_UA_PLACEHOLDER_KEY, SINK_OPC_UA_PLACEHOLDER_KEY),
231-
CONNECTOR_OPC_UA_PLACEHOLDER_DEFAULT_VALUE);
232-
final boolean withQuality =
233-
parameters.getBooleanOrDefault(
234-
Arrays.asList(CONNECTOR_OPC_UA_WITH_QUALITY_KEY, SINK_OPC_UA_WITH_QUALITY_KEY),
235-
CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE);
236-
valueName =
237-
withQuality
238-
? parameters.getStringOrDefault(
239-
Arrays.asList(CONNECTOR_OPC_UA_VALUE_NAME_KEY, SINK_OPC_UA_VALUE_NAME_KEY),
240-
CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE)
241-
: null;
242-
qualityName =
243-
withQuality
244-
? parameters.getStringOrDefault(
245-
Arrays.asList(CONNECTOR_OPC_UA_QUALITY_NAME_KEY, SINK_OPC_UA_QUALITY_NAME_KEY),
246-
CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE)
247-
: null;
248-
isClientServerModel =
249-
parameters
250-
.getStringOrDefault(
251-
Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY),
252-
CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
253-
.equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE);
254-
255-
final DataRegion region =
256-
StorageEngine.getInstance()
257-
.getDataRegion(new DataRegionId(configuration.getRuntimeEnvironment().getRegionId()));
258-
unQualifiedDatabaseName =
259-
Objects.nonNull(region)
260-
? PathUtils.unQualifyDatabaseName(region.getDatabaseName())
261-
: "__temp_db";
262261

263262
synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) {
264263
serverKey = httpsBindPort + ":" + tcpBindPort;
@@ -448,6 +447,10 @@ public interface ThrowingBiConsumer<T, U, E extends Exception> {
448447

449448
@Override
450449
public void close() throws Exception {
450+
if (Objects.nonNull(client)) {
451+
client.disconnect();
452+
}
453+
451454
if (serverKey == null) {
452455
return;
453456
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ private void transferTabletRowForClientServerModel(
128128
timestamp = utcTimestamp;
129129
opcDataType = convertToOpcDataType(type);
130130
}
131+
if (Objects.isNull(value)) {
132+
return;
133+
}
134+
131135
final Variant variant = new Variant(value);
132136
final DataValue dataValue =
133137
new DataValue(variant, currentQuality, new DateTime(timestamp), new DateTime());
@@ -223,6 +227,10 @@ public List<AddNodesItem> getNodesToAdd(
223227
return addNodesItems;
224228
}
225229

230+
public void disconnect() throws Exception {
231+
client.disconnect().get();
232+
}
233+
226234
/////////////////////////////// Getter ///////////////////////////////
227235

228236
String getNodeUrl() {

0 commit comments

Comments
 (0)