Skip to content

Commit f78e6d7

Browse files
committed
fix
1 parent 6be7b4d commit f78e6d7

File tree

2 files changed

+162
-1
lines changed

2 files changed

+162
-1
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,22 @@ public void validate(final PipeParameterValidator validator) throws Exception {
154154
Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
155155
Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY, SINK_IOTDB_USERNAME_KEY),
156156
false);
157+
158+
final PipeParameters parameters = validator.getParameters();
159+
if (validator
160+
.getParameters()
161+
.hasAnyAttributes(CONNECTOR_OPC_UA_NODE_URL_KEY, SINK_OPC_UA_NODE_URL_KEY)) {
162+
validator.validate(
163+
CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE::equals,
164+
String.format(
165+
"When the OPC UA sink points to an outer server or specifies 'with-quality', the %s or %s must be %s.",
166+
CONNECTOR_OPC_UA_MODEL_KEY,
167+
SINK_OPC_UA_MODEL_KEY,
168+
CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE),
169+
parameters.getStringOrDefault(
170+
Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY),
171+
CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE));
172+
}
157173
}
158174

159175
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java

Lines changed: 146 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,31 @@
2424
import org.apache.tsfile.write.record.Tablet;
2525
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
2626
import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
27+
import org.eclipse.milo.opcua.sdk.core.AccessLevel;
28+
import org.eclipse.milo.opcua.sdk.core.ValueRanks;
29+
import org.eclipse.milo.opcua.stack.core.Identifiers;
2730
import org.eclipse.milo.opcua.stack.core.UaException;
2831
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
32+
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
33+
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
34+
import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
35+
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
36+
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
37+
import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
38+
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
39+
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
40+
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
41+
import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
42+
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
43+
import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesItem;
44+
import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResponse;
45+
import org.eclipse.milo.opcua.stack.core.types.structured.DeleteNodesItem;
2946
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
47+
import org.eclipse.milo.opcua.stack.core.types.structured.ObjectAttributes;
48+
import org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
3049

50+
import java.util.Arrays;
51+
import java.util.Collections;
3152
import java.util.concurrent.atomic.AtomicLong;
3253
import java.util.function.Predicate;
3354

@@ -54,7 +75,10 @@ public void run(OpcUaClient client) throws Exception {
5475
client.connect().get();
5576
}
5677

57-
public void transfer(final Tablet tablet, final OpcUaSink sink) throws UaException {}
78+
// Only support tree model & client-server
79+
public void transfer(final Tablet tablet, final OpcUaSink sink) throws UaException {
80+
81+
}
5882

5983
/////////////////////////////// Getter ///////////////////////////////
6084

@@ -73,4 +97,125 @@ SecurityPolicy getSecurityPolicy() {
7397
IdentityProvider getIdentityProvider() {
7498
return identityProvider;
7599
}
100+
101+
public void runA(OpcUaClient client) throws Exception {
102+
// synchronous connect
103+
client.connect().get();
104+
System.out.println("✅ 连接成功");
105+
106+
// 读取标签值c
107+
NodeId nodeId = new NodeId(2, "root/sg/d1/s2");
108+
109+
// 1. 先读取当前值确认节点可访问
110+
DataValue readValue = client.readValue(0, TimestampsToReturn.Both, nodeId).get();
111+
System.out.println("读取当前值: " + readValue.getValue().getValue());
112+
System.out.println("读取状态: " + readValue.getStatusCode());
113+
114+
// 2. 尝试写入新值
115+
Variant newValue = new Variant(42.0f);
116+
DataValue writeValue = new DataValue(newValue, StatusCode.GOOD, new DateTime(), new DateTime());
117+
118+
System.out.println("尝试写入值: " + newValue.getValue());
119+
120+
StatusCode writeStatus = client.writeValue(nodeId, writeValue).get();
121+
System.out.println("写入状态: " + writeStatus);
122+
123+
client.deleteNodes(Collections.singletonList(new DeleteNodesItem(nodeId, true)));
124+
125+
AddNodesResponse addStatus =
126+
client
127+
.addNodes(
128+
Arrays.asList(
129+
new AddNodesItem(
130+
Identifiers.ObjectsFolder.expanded(),
131+
Identifiers.Organizes,
132+
new NodeId(2, "root").expanded(),
133+
new QualifiedName(2, "root"),
134+
NodeClass.Object,
135+
ExtensionObject.encode(
136+
client.getStaticSerializationContext(), createFolder0Attributes()),
137+
Identifiers.FolderType.expanded()),
138+
new AddNodesItem(
139+
new NodeId(2, "root").expanded(),
140+
Identifiers.Organizes,
141+
new NodeId(2, "root/sg").expanded(),
142+
new QualifiedName(2, "sg"),
143+
NodeClass.Object,
144+
ExtensionObject.encode(
145+
client.getStaticSerializationContext(), createFolder1Attributes()),
146+
Identifiers.FolderType.expanded()),
147+
new AddNodesItem(
148+
new NodeId(2, "root/sg").expanded(),
149+
Identifiers.Organizes,
150+
new NodeId(2, "root/sg/d1").expanded(),
151+
new QualifiedName(2, "d2"),
152+
NodeClass.Object,
153+
ExtensionObject.encode(
154+
client.getStaticSerializationContext(), createFolder2Attributes()),
155+
Identifiers.FolderType.expanded()),
156+
new AddNodesItem(
157+
new NodeId(2, "root/sg/d1").expanded(),
158+
Identifiers.Organizes,
159+
new NodeId(2, "root/sg/d1/s2").expanded(),
160+
new QualifiedName(2, "s2"),
161+
NodeClass.Variable,
162+
ExtensionObject.encode(
163+
client.getStaticSerializationContext(),
164+
createPressureSensorAttributes()),
165+
Identifiers.BaseDataVariableType.expanded())))
166+
.get();
167+
System.out.println("新增节点状态: " + addStatus);
168+
client.disconnect().get();
169+
}
170+
171+
public static VariableAttributes createPressureSensorAttributes() {
172+
return new VariableAttributes(
173+
Unsigned.uint(0xFFFF), // specifiedAttributes
174+
LocalizedText.english("s2"),
175+
LocalizedText.english("反应釜压力传感器"),
176+
Unsigned.uint(0), // writeMask
177+
Unsigned.uint(0), // userWriteMask
178+
new Variant(101.3f), // 初始压力值 101.3 kPa
179+
Identifiers.Float, // 浮点数类型
180+
ValueRanks.Scalar, // 标量
181+
null, // arrayDimensions
182+
AccessLevel.toValue(AccessLevel.READ_WRITE),
183+
AccessLevel.toValue(AccessLevel.READ_WRITE),
184+
500.0, // 500ms 采样间隔
185+
false // 启用历史记录
186+
);
187+
}
188+
189+
public static ObjectAttributes createFolder0Attributes() {
190+
return new ObjectAttributes(
191+
Unsigned.uint(0xFFFF), // specifiedAttributes
192+
LocalizedText.english("root"),
193+
LocalizedText.english("反应釜压力传感器"),
194+
Unsigned.uint(0), // writeMask
195+
Unsigned.uint(0), // userWriteMask
196+
null // 启用历史记录
197+
);
198+
}
199+
200+
public static ObjectAttributes createFolder1Attributes() {
201+
return new ObjectAttributes(
202+
Unsigned.uint(0xFFFF), // specifiedAttributes
203+
LocalizedText.english("sg"),
204+
LocalizedText.english("反应釜压力传感器"),
205+
Unsigned.uint(0), // writeMask
206+
Unsigned.uint(0), // userWriteMask
207+
null // 启用历史记录
208+
);
209+
}
210+
211+
public static ObjectAttributes createFolder2Attributes() {
212+
return new ObjectAttributes(
213+
Unsigned.uint(0xFFFF), // specifiedAttributes
214+
LocalizedText.english("d1"),
215+
LocalizedText.english("反应釜压力传感器"),
216+
Unsigned.uint(0), // writeMask
217+
Unsigned.uint(0), // userWriteMask
218+
null // 启用历史记录
219+
);
220+
}
76221
}

0 commit comments

Comments
 (0)