2020package org .apache .iotdb .db .pipe .sink .protocol .opcua .client ;
2121
2222import org .apache .iotdb .db .pipe .sink .protocol .opcua .OpcUaSink ;
23+ import org .apache .iotdb .db .pipe .sink .protocol .opcua .server .OpcUaNameSpace ;
24+ import org .apache .iotdb .pipe .api .exception .PipeException ;
2325
26+ import org .apache .tsfile .enums .TSDataType ;
2427import org .apache .tsfile .write .record .Tablet ;
28+ import org .apache .tsfile .write .schema .IMeasurementSchema ;
2529import org .eclipse .milo .opcua .sdk .client .OpcUaClient ;
2630import org .eclipse .milo .opcua .sdk .client .api .identity .IdentityProvider ;
2731import org .eclipse .milo .opcua .sdk .core .AccessLevel ;
2832import org .eclipse .milo .opcua .sdk .core .ValueRanks ;
2933import org .eclipse .milo .opcua .stack .core .Identifiers ;
30- import org .eclipse .milo .opcua .stack .core .UaException ;
34+ import org .eclipse .milo .opcua .stack .core .StatusCodes ;
3135import org .eclipse .milo .opcua .stack .core .security .SecurityPolicy ;
3236import org .eclipse .milo .opcua .stack .core .types .builtin .DataValue ;
3337import org .eclipse .milo .opcua .stack .core .types .builtin .DateTime ;
4246import org .eclipse .milo .opcua .stack .core .types .enumerated .TimestampsToReturn ;
4347import org .eclipse .milo .opcua .stack .core .types .structured .AddNodesItem ;
4448import org .eclipse .milo .opcua .stack .core .types .structured .AddNodesResponse ;
49+ import org .eclipse .milo .opcua .stack .core .types .structured .AddNodesResult ;
4550import org .eclipse .milo .opcua .stack .core .types .structured .DeleteNodesItem ;
4651import org .eclipse .milo .opcua .stack .core .types .structured .EndpointDescription ;
4752import org .eclipse .milo .opcua .stack .core .types .structured .ObjectAttributes ;
4853import org .eclipse .milo .opcua .stack .core .types .structured .VariableAttributes ;
4954
5055import java .util .Arrays ;
5156import java .util .Collections ;
52- import java .util .concurrent .atomic .AtomicLong ;
57+ import java .util .List ;
58+ import java .util .Objects ;
5359import java .util .function .Predicate ;
5460
61+ import static org .apache .iotdb .db .pipe .sink .protocol .opcua .server .OpcUaNameSpace .timestampToUtc ;
62+
5563public class IoTDBOpcUaClient {
5664
5765 private final String nodeUrl ;
5866
5967 private final SecurityPolicy securityPolicy ;
6068 private final IdentityProvider identityProvider ;
61-
62- private final AtomicLong clientHandles = new AtomicLong (1L );
69+ private OpcUaClient client ;
6370
6471 public IoTDBOpcUaClient (
6572 final String nodeUrl ,
@@ -70,13 +77,121 @@ public IoTDBOpcUaClient(
7077 this .identityProvider = identityProvider ;
7178 }
7279
73- public void run (OpcUaClient client ) throws Exception {
80+ public void run (final OpcUaClient client ) throws Exception {
7481 // synchronous connect
82+ this .client = client ;
7583 client .connect ().get ();
7684 }
7785
7886 // Only support tree model & client-server
79- public void transfer (final Tablet tablet , final OpcUaSink sink ) throws UaException {}
87+ public void transfer (final Tablet tablet , final OpcUaSink sink ) throws Exception {
88+ OpcUaNameSpace .transferTabletForClientServerModel (
89+ tablet , false , sink , this ::transferTabletRowForClientServerModel );
90+ }
91+
92+ private void transferTabletRowForClientServerModel (
93+ final String [] segments ,
94+ final List <IMeasurementSchema > measurementSchemas ,
95+ final List <Long > timestamps ,
96+ final List <Object > values ,
97+ final OpcUaSink sink )
98+ throws Exception {
99+ StatusCode currentQuality =
100+ Objects .isNull (sink .getValueName ()) ? StatusCode .GOOD : StatusCode .UNCERTAIN ;
101+ Object value = null ;
102+ long timestamp = 0 ;
103+ NodeId nodeId = null ;
104+
105+ for (int i = 0 ; i < measurementSchemas .size (); ++i ) {
106+ if (Objects .isNull (values .get (i ))) {
107+ continue ;
108+ }
109+ final String name = measurementSchemas .get (i ).getMeasurementName ();
110+ final TSDataType type = measurementSchemas .get (i ).getType ();
111+ if (Objects .nonNull (sink .getQualityName ()) && sink .getQualityName ().equals (name )) {
112+ if (!type .equals (TSDataType .BOOLEAN )) {
113+ throw new UnsupportedOperationException (
114+ "The quality value only supports boolean type, while true == GOOD and false == BAD." );
115+ }
116+ currentQuality = values .get (i ) == Boolean .TRUE ? StatusCode .GOOD : StatusCode .BAD ;
117+ continue ;
118+ }
119+ if (Objects .nonNull (sink .getValueName ()) && !sink .getValueName ().equals (name )) {
120+ throw new UnsupportedOperationException (
121+ "When the 'with-quality' mode is enabled, the measurement must be either \" value-name\" or \" quality-name\" " );
122+ }
123+ nodeId = new NodeId (2 , String .join ("/" , segments ));
124+
125+ final long utcTimestamp = timestampToUtc (timestamps .get (timestamps .size () > 1 ? i : 0 ));
126+ value = values .get (i );
127+ timestamp = utcTimestamp ;
128+ }
129+ final DataValue dataValue =
130+ new DataValue (new Variant (value ), currentQuality , new DateTime (timestamp ), new DateTime ());
131+ StatusCode writeStatus = client .writeValue (nodeId , dataValue ).get ();
132+
133+ if (writeStatus .getValue () == StatusCodes .Bad_NodeIdUnknown ) {
134+ final AddNodesResponse addStatus =
135+ client
136+ .addNodes (
137+ Arrays .asList (
138+ new AddNodesItem (
139+ Identifiers .ObjectsFolder .expanded (),
140+ Identifiers .Organizes ,
141+ new NodeId (2 , "root" ).expanded (),
142+ new QualifiedName (2 , "root" ),
143+ NodeClass .Object ,
144+ ExtensionObject .encode (
145+ client .getStaticSerializationContext (), createFolder0Attributes ()),
146+ Identifiers .FolderType .expanded ()),
147+ new AddNodesItem (
148+ new NodeId (2 , "root" ).expanded (),
149+ Identifiers .Organizes ,
150+ new NodeId (2 , "root/sg" ).expanded (),
151+ new QualifiedName (2 , "sg" ),
152+ NodeClass .Object ,
153+ ExtensionObject .encode (
154+ client .getStaticSerializationContext (), createFolder1Attributes ()),
155+ Identifiers .FolderType .expanded ()),
156+ new AddNodesItem (
157+ new NodeId (2 , "root/sg" ).expanded (),
158+ Identifiers .Organizes ,
159+ new NodeId (2 , "root/sg/d1" ).expanded (),
160+ new QualifiedName (2 , "d2" ),
161+ NodeClass .Object ,
162+ ExtensionObject .encode (
163+ client .getStaticSerializationContext (), createFolder2Attributes ()),
164+ Identifiers .FolderType .expanded ()),
165+ new AddNodesItem (
166+ new NodeId (2 , "root/sg/d1" ).expanded (),
167+ Identifiers .Organizes ,
168+ new NodeId (2 , "root/sg/d1/s2" ).expanded (),
169+ new QualifiedName (2 , "s2" ),
170+ NodeClass .Variable ,
171+ ExtensionObject .encode (
172+ client .getStaticSerializationContext (),
173+ createPressureSensorAttributes ()),
174+ Identifiers .BaseDataVariableType .expanded ())))
175+ .get ();
176+ for (final AddNodesResult result : addStatus .getResults ()) {
177+ if (!result .getStatusCode ().equals (StatusCode .GOOD )
178+ && !(result .getStatusCode ().getValue () == StatusCodes .Bad_NodeIdExists )) {
179+ throw new PipeException (
180+ "Failed to create nodes after transfer data value, write status: "
181+ + writeStatus
182+ + ", creation status: "
183+ + addStatus );
184+ }
185+ }
186+ writeStatus = client .writeValue (nodeId , dataValue ).get ();
187+ if (writeStatus .getValue () != StatusCode .GOOD .getValue ()) {
188+ throw new PipeException (
189+ "Failed to transfer dataValue after successfully created nodes, error: " + writeStatus );
190+ }
191+ } else {
192+ throw new PipeException ("Failed to transfer dataValue, error: " + writeStatus );
193+ }
194+ }
80195
81196 /////////////////////////////// Getter ///////////////////////////////
82197
0 commit comments