Skip to content

Commit 97897c3

Browse files
committed
fix
1 parent b871925 commit 97897c3

File tree

3 files changed

+81
-130
lines changed

3 files changed

+81
-130
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,13 @@ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exc
361361
transferByTablet(
362362
tabletInsertionEvent,
363363
LOGGER,
364-
(tablet, isTableModel) -> nameSpace.transfer(tablet, isTableModel, this));
364+
(tablet, isTableModel) -> {
365+
if (Objects.nonNull(nameSpace)) {
366+
nameSpace.transfer(tablet, isTableModel, this);
367+
} else if (Objects.nonNull(client)) {
368+
client.transfer(tablet, this);
369+
}
370+
});
365371
}
366372

367373
public static void transferByTablet(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java

Lines changed: 73 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
3636
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
3737
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
38+
import org.eclipse.milo.opcua.stack.core.types.builtin.ExpandedNodeId;
3839
import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
3940
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
4041
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
@@ -43,25 +44,23 @@
4344
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
4445
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
4546
import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
46-
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
4747
import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesItem;
4848
import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResponse;
4949
import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResult;
50-
import org.eclipse.milo.opcua.stack.core.types.structured.DeleteNodesItem;
5150
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
5251
import org.eclipse.milo.opcua.stack.core.types.structured.ObjectAttributes;
5352
import org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
5453

55-
import java.util.Arrays;
56-
import java.util.Collections;
54+
import java.util.ArrayList;
5755
import java.util.List;
5856
import java.util.Objects;
5957
import java.util.function.Predicate;
6058

59+
import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.convertToOpcDataType;
6160
import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
6261

6362
public class IoTDBOpcUaClient {
64-
63+
private static final int NAME_SPACE_INDEX = 2;
6564
private final String nodeUrl;
6665

6766
private final SecurityPolicy securityPolicy;
@@ -101,6 +100,7 @@ private void transferTabletRowForClientServerModel(
101100
Object value = null;
102101
long timestamp = 0;
103102
NodeId nodeId = null;
103+
NodeId opcDataType = null;
104104

105105
for (int i = 0; i < measurementSchemas.size(); ++i) {
106106
if (Objects.isNull(values.get(i))) {
@@ -120,59 +120,21 @@ private void transferTabletRowForClientServerModel(
120120
throw new UnsupportedOperationException(
121121
"When the 'with-quality' mode is enabled, the measurement must be either \"value-name\" or \"quality-name\"");
122122
}
123-
nodeId = new NodeId(2, String.join("/", segments));
123+
nodeId = new NodeId(NAME_SPACE_INDEX, String.join("/", segments));
124124

125125
final long utcTimestamp = timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
126126
value = values.get(i);
127127
timestamp = utcTimestamp;
128+
opcDataType = convertToOpcDataType(type);
128129
}
130+
final Variant variant = new Variant(value);
129131
final DataValue dataValue =
130-
new DataValue(new Variant(value), currentQuality, new DateTime(timestamp), new DateTime());
132+
new DataValue(variant, currentQuality, new DateTime(timestamp), new DateTime());
131133
StatusCode writeStatus = client.writeValue(nodeId, dataValue).get();
132134

133135
if (writeStatus.getValue() == StatusCodes.Bad_NodeIdUnknown) {
134136
final AddNodesResponse addStatus =
135-
client
136-
.addNodes(
137-
Arrays.asList(
138-
new AddNodesItem(
139-
Identifiers.ObjectsFolder.expanded(),
140-
Identifiers.Organizes,
141-
new NodeId(2, "root").expanded(),
142-
new QualifiedName(2, "root"),
143-
NodeClass.Object,
144-
ExtensionObject.encode(
145-
client.getStaticSerializationContext(), createFolder0Attributes()),
146-
Identifiers.FolderType.expanded()),
147-
new AddNodesItem(
148-
new NodeId(2, "root").expanded(),
149-
Identifiers.Organizes,
150-
new NodeId(2, "root/sg").expanded(),
151-
new QualifiedName(2, "sg"),
152-
NodeClass.Object,
153-
ExtensionObject.encode(
154-
client.getStaticSerializationContext(), createFolder1Attributes()),
155-
Identifiers.FolderType.expanded()),
156-
new AddNodesItem(
157-
new NodeId(2, "root/sg").expanded(),
158-
Identifiers.Organizes,
159-
new NodeId(2, "root/sg/d1").expanded(),
160-
new QualifiedName(2, "d2"),
161-
NodeClass.Object,
162-
ExtensionObject.encode(
163-
client.getStaticSerializationContext(), createFolder2Attributes()),
164-
Identifiers.FolderType.expanded()),
165-
new AddNodesItem(
166-
new NodeId(2, "root/sg/d1").expanded(),
167-
Identifiers.Organizes,
168-
new NodeId(2, "root/sg/d1/s2").expanded(),
169-
new QualifiedName(2, "s2"),
170-
NodeClass.Variable,
171-
ExtensionObject.encode(
172-
client.getStaticSerializationContext(),
173-
createPressureSensorAttributes()),
174-
Identifiers.BaseDataVariableType.expanded())))
175-
.get();
137+
client.addNodes(getNodesToAdd(segments, opcDataType, variant)).get();
176138
for (final AddNodesResult result : addStatus.getResults()) {
177139
if (!result.getStatusCode().equals(StatusCode.GOOD)
178140
&& !(result.getStatusCode().getValue() == StatusCodes.Bad_NodeIdExists)) {
@@ -193,6 +155,56 @@ private void transferTabletRowForClientServerModel(
193155
}
194156
}
195157

158+
public List<AddNodesItem> getNodesToAdd(
159+
final String[] segments, final NodeId opcDataType, final Variant initialValue) {
160+
final List<AddNodesItem> addNodesItems = new ArrayList<>();
161+
final StringBuilder sb = new StringBuilder(segments[0]);
162+
ExpandedNodeId curNodeId = new NodeId(NAME_SPACE_INDEX, segments[0]).expanded();
163+
addNodesItems.add(
164+
new AddNodesItem(
165+
Identifiers.ObjectsFolder.expanded(),
166+
Identifiers.Organizes,
167+
curNodeId,
168+
new QualifiedName(NAME_SPACE_INDEX, segments[0]),
169+
NodeClass.Object,
170+
ExtensionObject.encode(
171+
client.getStaticSerializationContext(), createFolderAttributes(segments[0])),
172+
Identifiers.FolderType.expanded()));
173+
174+
// segments.length >= 3
175+
for (int i = 1; i < segments.length - 1; ++i) {
176+
sb.append("/").append(segments[i]);
177+
final ExpandedNodeId nextId = new NodeId(NAME_SPACE_INDEX, sb.toString()).expanded();
178+
addNodesItems.add(
179+
new AddNodesItem(
180+
curNodeId,
181+
Identifiers.Organizes,
182+
nextId,
183+
new QualifiedName(NAME_SPACE_INDEX, segments[i]),
184+
NodeClass.Object,
185+
ExtensionObject.encode(
186+
client.getStaticSerializationContext(), createFolderAttributes(segments[i])),
187+
Identifiers.FolderType.expanded()));
188+
curNodeId = nextId;
189+
}
190+
191+
final String measurementName = segments[segments.length - 1];
192+
sb.append("/").append(measurementName);
193+
addNodesItems.add(
194+
new AddNodesItem(
195+
curNodeId,
196+
Identifiers.Organizes,
197+
new NodeId(NAME_SPACE_INDEX, sb.toString()).expanded(),
198+
new QualifiedName(NAME_SPACE_INDEX, measurementName),
199+
NodeClass.Variable,
200+
ExtensionObject.encode(
201+
client.getStaticSerializationContext(),
202+
createMeasurementAttributes(measurementName, opcDataType, initialValue)),
203+
Identifiers.BaseDataVariableType.expanded()));
204+
205+
return addNodesItems;
206+
}
207+
196208
/////////////////////////////// Getter ///////////////////////////////
197209

198210
String getNodeUrl() {
@@ -211,99 +223,32 @@ IdentityProvider getIdentityProvider() {
211223
return identityProvider;
212224
}
213225

214-
public void runA(OpcUaClient client) throws Exception {
215-
// synchronous connect
216-
client.connect().get();
217-
System.out.println("✅ 连接成功");
218-
219-
// 读取标签值c
220-
NodeId nodeId = new NodeId(2, "root/sg/d1/s2");
221-
222-
// 1. 先读取当前值确认节点可访问
223-
DataValue readValue = client.readValue(0, TimestampsToReturn.Both, nodeId).get();
224-
System.out.println("读取当前值: " + readValue.getValue().getValue());
225-
System.out.println("读取状态: " + readValue.getStatusCode());
226+
/////////////////////////////// Attribute creator ///////////////////////////////
226227

227-
// 2. 尝试写入新值
228-
Variant newValue = new Variant(42.0f);
229-
DataValue writeValue = new DataValue(newValue, StatusCode.GOOD, new DateTime(), new DateTime());
230-
231-
System.out.println("尝试写入值: " + newValue.getValue());
232-
233-
StatusCode writeStatus = client.writeValue(nodeId, writeValue).get();
234-
System.out.println("写入状态: " + writeStatus);
235-
236-
client.deleteNodes(Collections.singletonList(new DeleteNodesItem(nodeId, true)));
237-
238-
AddNodesResponse addStatus =
239-
client
240-
.addNodes(
241-
Arrays.asList(
242-
new AddNodesItem(
243-
Identifiers.ObjectsFolder.expanded(),
244-
Identifiers.Organizes,
245-
new NodeId(2, "root").expanded(),
246-
new QualifiedName(2, "root"),
247-
NodeClass.Object,
248-
ExtensionObject.encode(
249-
client.getStaticSerializationContext(), createFolder0Attributes()),
250-
Identifiers.FolderType.expanded()),
251-
new AddNodesItem(
252-
new NodeId(2, "root").expanded(),
253-
Identifiers.Organizes,
254-
new NodeId(2, "root/sg").expanded(),
255-
new QualifiedName(2, "sg"),
256-
NodeClass.Object,
257-
ExtensionObject.encode(
258-
client.getStaticSerializationContext(), createFolder1Attributes()),
259-
Identifiers.FolderType.expanded()),
260-
new AddNodesItem(
261-
new NodeId(2, "root/sg").expanded(),
262-
Identifiers.Organizes,
263-
new NodeId(2, "root/sg/d1").expanded(),
264-
new QualifiedName(2, "d2"),
265-
NodeClass.Object,
266-
ExtensionObject.encode(
267-
client.getStaticSerializationContext(), createFolder2Attributes()),
268-
Identifiers.FolderType.expanded()),
269-
new AddNodesItem(
270-
new NodeId(2, "root/sg/d1").expanded(),
271-
Identifiers.Organizes,
272-
new NodeId(2, "root/sg/d1/s2").expanded(),
273-
new QualifiedName(2, "s2"),
274-
NodeClass.Variable,
275-
ExtensionObject.encode(
276-
client.getStaticSerializationContext(),
277-
createPressureSensorAttributes()),
278-
Identifiers.BaseDataVariableType.expanded())))
279-
.get();
280-
System.out.println("新增节点状态: " + addStatus);
281-
client.disconnect().get();
282-
}
283-
284-
public static VariableAttributes createPressureSensorAttributes() {
228+
public static VariableAttributes createMeasurementAttributes(
229+
final String name, final NodeId objectType, final Variant initialValue) {
285230
return new VariableAttributes(
286231
Unsigned.uint(0xFFFF), // specifiedAttributes
287-
LocalizedText.english("s2"),
288-
LocalizedText.english("反应釜压力传感器"),
232+
LocalizedText.english(name),
233+
LocalizedText.english(name),
289234
Unsigned.uint(0), // writeMask
290235
Unsigned.uint(0), // userWriteMask
291-
new Variant(101.3f), // 初始压力值 101.3 kPa
292-
Identifiers.Float, // 浮点数类型
293-
ValueRanks.Scalar, // 标量
236+
new Variant(initialValue),
237+
objectType,
238+
ValueRanks.Scalar,
294239
null, // arrayDimensions
295240
AccessLevel.toValue(AccessLevel.READ_WRITE),
296241
AccessLevel.toValue(AccessLevel.READ_WRITE),
297-
500.0, // 500ms 采样间隔
298-
false // 启用历史记录
242+
500.0, // samplingInterval
243+
false // historizing
299244
);
300245
}
301246

302-
public static ObjectAttributes createFolder0Attributes() {
247+
public static ObjectAttributes createFolderAttributes(final String name) {
303248
return new ObjectAttributes(
304249
Unsigned.uint(0xFFFF), // specifiedAttributes
305-
LocalizedText.english("root"),
306-
LocalizedText.english("反应釜压力传感器"),
250+
LocalizedText.english(name),
251+
LocalizedText.english(name),
307252
Unsigned.uint(0), // writeMask
308253
Unsigned.uint(0), // userWriteMask
309254
null // 启用历史记录

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ private void transferTabletForPubSubModel(
490490
eventNode.delete();
491491
}
492492

493-
private NodeId convertToOpcDataType(final TSDataType type) {
493+
public static NodeId convertToOpcDataType(final TSDataType type) {
494494
switch (type) {
495495
case BOOLEAN:
496496
return Identifiers.Boolean;

0 commit comments

Comments
 (0)