Skip to content

Commit 341cb2d

Browse files
committed
pj
1 parent 302864a commit 341cb2d

File tree

4 files changed

+52
-24
lines changed

4 files changed

+52
-24
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.iotdb.db.conf.IoTDBConfig;
2525
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
2626
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
27+
import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
28+
import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder;
2729
import org.apache.iotdb.db.storageengine.StorageEngine;
2830
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
2931
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -356,4 +358,28 @@ public void close() throws Exception {
356358
}
357359
}
358360
}
361+
362+
// Getter
363+
364+
public boolean isClientServerModel() {
365+
return isClientServerModel;
366+
}
367+
368+
public String getUnQualifiedDatabaseName() {
369+
return unQualifiedDatabaseName;
370+
}
371+
372+
public String getPlaceHolder() {
373+
return placeHolder;
374+
}
375+
376+
@Nullable
377+
public String getValueName() {
378+
return valueName;
379+
}
380+
381+
@Nullable
382+
public String getQualityName() {
383+
return qualityName;
384+
}
359385
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaKeyStoreLoader.java renamed to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* under the License.
1818
*/
1919

20-
package org.apache.iotdb.db.pipe.sink.protocol.opcua;
20+
package org.apache.iotdb.db.pipe.sink.protocol.opcua.server;
2121

2222
import org.apache.iotdb.commons.utils.FileUtils;
2323

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java renamed to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717
* under the License.
1818
*/
1919

20-
package org.apache.iotdb.db.pipe.sink.protocol.opcua;
20+
package org.apache.iotdb.db.pipe.sink.protocol.opcua.server;
2121

2222
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
2323
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
24+
import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
2425
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
2526
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
2627
import org.apache.iotdb.db.utils.DateTimeUtils;
@@ -72,7 +73,7 @@ public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
7273
private final SubscriptionModel subscriptionModel;
7374
private final OpcUaServerBuilder builder;
7475

75-
OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder builder) {
76+
public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder builder) {
7677
super(server, NAMESPACE_URI);
7778
this.builder = builder;
7879

@@ -94,9 +95,9 @@ public void shutdown() {
9495
});
9596
}
9697

97-
void transfer(final Tablet tablet, final boolean isTableModel, final OpcUaSink sink)
98+
public void transfer(final Tablet tablet, final boolean isTableModel, final OpcUaSink sink)
9899
throws UaException {
99-
if (sink.isClientServerModel) {
100+
if (sink.isClientServerModel()) {
100101
transferTabletForClientServerModel(tablet, isTableModel, sink);
101102
} else {
102103
transferTabletForPubSubModel(tablet, isTableModel, sink);
@@ -141,11 +142,11 @@ private void transferTabletForClientServerModel(
141142
for (int i = 0; i < tablet.getRowSize(); ++i) {
142143
final Object[] segments = tablet.getDeviceID(i).getSegments();
143144
final String[] folderSegments = new String[segments.length + 1];
144-
folderSegments[0] = sink.unQualifiedDatabaseName;
145+
folderSegments[0] = sink.getUnQualifiedDatabaseName();
145146

146147
for (int j = 0; j < segments.length; ++j) {
147148
folderSegments[j + 1] =
148-
Objects.isNull(segments[j]) ? sink.placeHolder : (String) segments[j];
149+
Objects.isNull(segments[j]) ? sink.getPlaceHolder() : (String) segments[j];
149150
}
150151

151152
final int finalI = i;
@@ -228,7 +229,7 @@ private void transferTabletRowForClientServerModel(
228229
final String currentFolder = currentStr.toString();
229230

230231
StatusCode currentQuality =
231-
Objects.isNull(sink.valueName) ? StatusCode.GOOD : StatusCode.UNCERTAIN;
232+
Objects.isNull(sink.getValueName()) ? StatusCode.GOOD : StatusCode.UNCERTAIN;
232233
UaVariableNode valueNode = null;
233234
Object value = null;
234235
long timestamp = 0;
@@ -239,19 +240,20 @@ private void transferTabletRowForClientServerModel(
239240
}
240241
final String name = measurementSchemas.get(i).getMeasurementName();
241242
final TSDataType type = measurementSchemas.get(i).getType();
242-
if (Objects.nonNull(sink.qualityName) && sink.qualityName.equals(name)) {
243+
if (Objects.nonNull(sink.getQualityName()) && sink.getQualityName().equals(name)) {
243244
if (!type.equals(TSDataType.BOOLEAN)) {
244245
throw new UnsupportedOperationException(
245246
"The quality value only supports boolean type, while true == GOOD and false == BAD.");
246247
}
247248
currentQuality = values.get(i) == Boolean.TRUE ? StatusCode.GOOD : StatusCode.BAD;
248249
continue;
249250
}
250-
if (Objects.nonNull(sink.valueName) && !sink.valueName.equals(name)) {
251+
if (Objects.nonNull(sink.getValueName()) && !sink.getValueName().equals(name)) {
251252
throw new UnsupportedOperationException(
252253
"When the 'with-quality' mode is enabled, the measurement must be either \"value-name\" or \"quality-name\"");
253254
}
254-
final String nodeName = Objects.isNull(sink.valueName) ? name : segments[segments.length - 1];
255+
final String nodeName =
256+
Objects.isNull(sink.getValueName()) ? name : segments[segments.length - 1];
255257
final NodeId nodeId = newNodeId(currentFolder + nodeName);
256258
final UaVariableNode measurementNode;
257259
if (!getNodeManager().containsNode(nodeId)) {
@@ -288,7 +290,7 @@ private void transferTabletRowForClientServerModel(
288290
}
289291

290292
final long utcTimestamp = timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
291-
if (Objects.isNull(sink.valueName)) {
293+
if (Objects.isNull(sink.getValueName())) {
292294
if (Objects.isNull(measurementNode.getValue())
293295
|| Objects.requireNonNull(measurementNode.getValue().getSourceTime()).getUtcTime()
294296
< utcTimestamp) {
@@ -365,11 +367,11 @@ private void transferTabletForPubSubModel(
365367
if (isTableModel) {
366368
sourceNameList = new ArrayList<>(tablet.getRowSize());
367369
for (int i = 0; i < tablet.getRowSize(); ++i) {
368-
final StringBuilder idBuilder = new StringBuilder(sink.unQualifiedDatabaseName);
370+
final StringBuilder idBuilder = new StringBuilder(sink.getUnQualifiedDatabaseName());
369371
for (final Object segment : tablet.getDeviceID(i).getSegments()) {
370372
idBuilder
371373
.append(TsFileConstant.PATH_SEPARATOR)
372-
.append(Objects.isNull(segment) ? sink.placeHolder : segment);
374+
.append(Objects.isNull(segment) ? sink.getPlaceHolder() : segment);
373375
}
374376
sourceNameList.add(idBuilder.toString());
375377
}
@@ -521,7 +523,7 @@ public void onMonitoringModeChanged(final List<MonitoredItem> monitoredItems) {
521523

522524
/////////////////////////////// Conflict detection ///////////////////////////////
523525

524-
void checkEquals(
526+
public void checkEquals(
525527
final String user,
526528
final String password,
527529
final String securityDir,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaServerBuilder.java renamed to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* under the License.
1818
*/
1919

20-
package org.apache.iotdb.db.pipe.sink.protocol.opcua;
20+
package org.apache.iotdb.db.pipe.sink.protocol.opcua.server;
2121

2222
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
2323
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -86,7 +86,7 @@ public class OpcUaServerBuilder implements Closeable {
8686
private boolean enableAnonymousAccess;
8787
private DefaultTrustListManager trustListManager;
8888

89-
OpcUaServerBuilder() {
89+
public OpcUaServerBuilder() {
9090
tcpBindPort = PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
9191
httpsBindPort = PipeSinkConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE;
9292
user = PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
@@ -95,37 +95,37 @@ public class OpcUaServerBuilder implements Closeable {
9595
enableAnonymousAccess = PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE;
9696
}
9797

98-
OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
98+
public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
9999
this.tcpBindPort = tcpBindPort;
100100
return this;
101101
}
102102

103-
OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) {
103+
public OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) {
104104
this.httpsBindPort = httpsBindPort;
105105
return this;
106106
}
107107

108-
OpcUaServerBuilder setUser(final String user) {
108+
public OpcUaServerBuilder setUser(final String user) {
109109
this.user = user;
110110
return this;
111111
}
112112

113-
OpcUaServerBuilder setPassword(final String password) {
113+
public OpcUaServerBuilder setPassword(final String password) {
114114
this.password = password;
115115
return this;
116116
}
117117

118-
OpcUaServerBuilder setSecurityDir(final String securityDir) {
118+
public OpcUaServerBuilder setSecurityDir(final String securityDir) {
119119
this.securityDir = Paths.get(securityDir);
120120
return this;
121121
}
122122

123-
OpcUaServerBuilder setEnableAnonymousAccess(final boolean enableAnonymousAccess) {
123+
public OpcUaServerBuilder setEnableAnonymousAccess(final boolean enableAnonymousAccess) {
124124
this.enableAnonymousAccess = enableAnonymousAccess;
125125
return this;
126126
}
127127

128-
OpcUaServer build() throws Exception {
128+
public OpcUaServer build() throws Exception {
129129
Files.createDirectories(securityDir);
130130
if (!Files.exists(securityDir)) {
131131
throw new PipeException("Unable to create security dir: " + securityDir);

0 commit comments

Comments
 (0)