2121
2222import org .apache .iotdb .commons .exception .pipe .PipeRuntimeCriticalException ;
2323import org .apache .iotdb .commons .exception .pipe .PipeRuntimeNonCriticalException ;
24- import org .apache .iotdb .commons .utils .PathUtils ;
2524import org .apache .iotdb .db .pipe .sink .util .sorter .PipeTableModelTabletEventSorter ;
2625import org .apache .iotdb .db .pipe .sink .util .sorter .PipeTreeModelTabletEventSorter ;
2726import org .apache .iotdb .db .utils .DateTimeUtils ;
7069
7170public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
7271 public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server" ;
73- private final boolean isClientServerModel ;
7472 private final SubscriptionModel subscriptionModel ;
7573 private final OpcUaServerBuilder builder ;
76- private final String databaseName ;
77- private final String placeHolder ;
78-
79- OpcUaNameSpace (
80- final OpcUaServer server ,
81- final boolean isClientServerModel ,
82- final OpcUaServerBuilder builder ,
83- final String qualifiedDatabaseName ,
84- final String placeHolder ) {
74+
75+ OpcUaNameSpace (final OpcUaServer server , final OpcUaServerBuilder builder ) {
8576 super (server , NAMESPACE_URI );
86- this .isClientServerModel = isClientServerModel ;
8777 this .builder = builder ;
88- this .databaseName = PathUtils .unQualifyDatabaseName (qualifiedDatabaseName );
89- this .placeHolder = placeHolder ;
9078
9179 subscriptionModel = new SubscriptionModel (server , this );
9280 getLifecycleManager ().addLifecycle (subscriptionModel );
@@ -106,15 +94,17 @@ public void shutdown() {
10694 });
10795 }
10896
109- void transfer (final Tablet tablet , final boolean isTableModel ) throws UaException {
110- if (isClientServerModel ) {
111- transferTabletForClientServerModel (tablet , isTableModel );
97+ void transfer (final Tablet tablet , final boolean isTableModel , final OpcUaSink sink )
98+ throws UaException {
99+ if (sink .isClientServerModel ) {
100+ transferTabletForClientServerModel (tablet , isTableModel , sink );
112101 } else {
113- transferTabletForPubSubModel (tablet , isTableModel );
102+ transferTabletForPubSubModel (tablet , isTableModel , sink );
114103 }
115104 }
116105
117- private void transferTabletForClientServerModel (final Tablet tablet , final boolean isTableModel ) {
106+ private void transferTabletForClientServerModel (
107+ final Tablet tablet , final boolean isTableModel , final OpcUaSink sink ) {
118108 final List <IMeasurementSchema > schemas = tablet .getSchemas ();
119109 final List <IMeasurementSchema > newSchemas = new ArrayList <>();
120110 if (!isTableModel ) {
@@ -136,7 +126,7 @@ private void transferTabletForClientServerModel(final Tablet tablet, final boole
136126 }
137127
138128 transferTabletRowForClientServerModel (
139- tablet .getDeviceId ().split ("\\ ." ), newSchemas , timestamps , values );
129+ tablet .getDeviceId ().split ("\\ ." ), newSchemas , timestamps , values , sink );
140130 } else {
141131 new PipeTableModelTabletEventSorter (tablet ).sortByTimestampIfNecessary ();
142132
@@ -151,10 +141,11 @@ private void transferTabletForClientServerModel(final Tablet tablet, final boole
151141 for (int i = 0 ; i < tablet .getRowSize (); ++i ) {
152142 final Object [] segments = tablet .getDeviceID (i ).getSegments ();
153143 final String [] folderSegments = new String [segments .length + 1 ];
154- folderSegments [0 ] = databaseName ;
144+ folderSegments [0 ] = sink . unQualifiedDatabaseName ;
155145
156146 for (int j = 0 ; j < segments .length ; ++j ) {
157- folderSegments [j + 1 ] = Objects .isNull (segments [j ]) ? placeHolder : (String ) segments [j ];
147+ folderSegments [j + 1 ] =
148+ Objects .isNull (segments [j ]) ? sink .placeHolder : (String ) segments [j ];
158149 }
159150
160151 final int finalI = i ;
@@ -169,7 +160,8 @@ private void transferTabletForClientServerModel(final Tablet tablet, final boole
169160 ? null
170161 : getTabletObjectValue4Opc (
171162 tablet .getValues ()[index ], finalI , schemas .get (index ).getType ()))
172- .collect (Collectors .toList ()));
163+ .collect (Collectors .toList ()),
164+ sink );
173165 }
174166 }
175167 }
@@ -178,14 +170,18 @@ private void transferTabletRowForClientServerModel(
178170 final String [] segments ,
179171 final List <IMeasurementSchema > measurementSchemas ,
180172 final List <Long > timestamps ,
181- final List <Object > values ) {
173+ final List <Object > values ,
174+ final OpcUaSink sink ) {
182175 if (segments .length == 0 ) {
183176 throw new PipeRuntimeCriticalException ("The segments of tablets must exist" );
184177 }
185178 final StringBuilder currentStr = new StringBuilder ();
186179 UaNode folderNode = null ;
187180 NodeId folderNodeId ;
188- for (final String segment : segments ) {
181+ for (int i = 0 ;
182+ i < (Objects .isNull (sink .valueName ) ? segments .length : segments .length - 1 );
183+ ++i ) {
184+ final String segment = segments [i ];
189185 final UaNode nextFolderNode ;
190186
191187 currentStr .append (segment );
@@ -230,32 +226,55 @@ private void transferTabletRowForClientServerModel(
230226 }
231227
232228 final String currentFolder = currentStr .toString ();
229+
230+ StatusCode currentQuality =
231+ Objects .isNull (sink .valueName ) ? StatusCode .GOOD : StatusCode .UNCERTAIN ;
232+ UaVariableNode valueNode = null ;
233+ Object value = null ;
234+ long timestamp = 0 ;
235+
233236 for (int i = 0 ; i < measurementSchemas .size (); ++i ) {
234237 if (Objects .isNull (values .get (i ))) {
235238 continue ;
236239 }
237240 final String name = measurementSchemas .get (i ).getMeasurementName ();
238241 final TSDataType type = measurementSchemas .get (i ).getType ();
239- final NodeId nodeId = newNodeId (currentFolder + name );
242+ if (Objects .nonNull (sink .qualityName ) && sink .qualityName .equals (name )) {
243+ if (!type .equals (TSDataType .BOOLEAN )) {
244+ throw new UnsupportedOperationException (
245+ "The quality value only supports boolean type, while true == GOOD and false == BAD." );
246+ }
247+ currentQuality = values .get (i ) == Boolean .TRUE ? StatusCode .GOOD : StatusCode .BAD ;
248+ continue ;
249+ }
250+ if (Objects .nonNull (sink .valueName ) && !sink .valueName .equals (name )) {
251+ throw new UnsupportedOperationException (
252+ "When the 'with-quality' mode is enabled, the measurement must be either \" value-name\" or \" quality-name\" " );
253+ }
254+ final String nodeName = Objects .isNull (sink .valueName ) ? name : segments [segments .length - 1 ];
255+ final NodeId nodeId = newNodeId (currentFolder + nodeName );
240256 final UaVariableNode measurementNode ;
241257 if (!getNodeManager ().containsNode (nodeId )) {
242258 measurementNode =
243259 new UaVariableNode .UaVariableNodeBuilder (getNodeContext ())
244- .setNodeId (newNodeId ( currentFolder + name ) )
260+ .setNodeId (nodeId )
245261 .setAccessLevel (AccessLevel .READ_WRITE )
246262 .setUserAccessLevel (AccessLevel .READ_ONLY )
247- .setBrowseName (newQualifiedName (name ))
248- .setDisplayName (LocalizedText .english (name ))
263+ .setBrowseName (newQualifiedName (nodeName ))
264+ .setDisplayName (LocalizedText .english (nodeName ))
249265 .setDataType (convertToOpcDataType (type ))
250266 .setTypeDefinition (Identifiers .BaseDataVariableType )
251267 .build ();
252268 getNodeManager ().addNode (measurementNode );
253- folderNode .addReference (
254- new Reference (
255- folderNode .getNodeId (),
256- Identifiers .Organizes ,
257- measurementNode .getNodeId ().expanded (),
258- true ));
269+ if (Objects .nonNull (folderNode )) {
270+ folderNode .addReference (
271+ new Reference (
272+ folderNode .getNodeId (), Identifiers .Organizes , nodeId .expanded (), true ));
273+ } else {
274+ measurementNode .addReference (
275+ new Reference (
276+ nodeId , Identifiers .Organizes , Identifiers .ObjectsFolder .expanded (), false ));
277+ }
259278 } else {
260279 // This must exist
261280 measurementNode =
@@ -269,15 +288,30 @@ private void transferTabletRowForClientServerModel(
269288 }
270289
271290 final long utcTimestamp = timestampToUtc (timestamps .get (timestamps .size () > 1 ? i : 0 ));
272- if (Objects .isNull (measurementNode .getValue ())
273- || Objects .requireNonNull (measurementNode .getValue ().getSourceTime ()).getUtcTime ()
274- < utcTimestamp ) {
275- measurementNode .setValue (
291+ if (Objects .isNull (sink .valueName )) {
292+ if (Objects .isNull (measurementNode .getValue ())
293+ || Objects .requireNonNull (measurementNode .getValue ().getSourceTime ()).getUtcTime ()
294+ < utcTimestamp ) {
295+ measurementNode .setValue (
296+ new DataValue (
297+ new Variant (values .get (i )),
298+ currentQuality ,
299+ new DateTime (utcTimestamp ),
300+ new DateTime ()));
301+ }
302+ } else {
303+ valueNode = measurementNode ;
304+ value = values .get (i );
305+ timestamp = utcTimestamp ;
306+ }
307+ }
308+ if (Objects .nonNull (valueNode )) {
309+ if (Objects .isNull (valueNode .getValue ())
310+ || Objects .requireNonNull (valueNode .getValue ().getSourceTime ()).getUtcTime ()
311+ < timestamp ) {
312+ valueNode .setValue (
276313 new DataValue (
277- new Variant (values .get (i )),
278- StatusCode .GOOD ,
279- new DateTime (utcTimestamp ),
280- new DateTime ()));
314+ new Variant (value ), currentQuality , new DateTime (timestamp ), new DateTime ()));
281315 }
282316 }
283317 }
@@ -319,8 +353,8 @@ private static long timestampToUtc(final long timeStamp) {
319353 * @param tablet the tablet to send
320354 * @throws UaException if failed to create {@link Event}
321355 */
322- private void transferTabletForPubSubModel (final Tablet tablet , final boolean isTableModel )
323- throws UaException {
356+ private void transferTabletForPubSubModel (
357+ final Tablet tablet , final boolean isTableModel , final OpcUaSink sink ) throws UaException {
324358 final BaseEventTypeNode eventNode =
325359 getServer ()
326360 .getEventFactory ()
@@ -331,11 +365,11 @@ private void transferTabletForPubSubModel(final Tablet tablet, final boolean isT
331365 if (isTableModel ) {
332366 sourceNameList = new ArrayList <>(tablet .getRowSize ());
333367 for (int i = 0 ; i < tablet .getRowSize (); ++i ) {
334- final StringBuilder idBuilder = new StringBuilder (databaseName );
368+ final StringBuilder idBuilder = new StringBuilder (sink . unQualifiedDatabaseName );
335369 for (final Object segment : tablet .getDeviceID (i ).getSegments ()) {
336370 idBuilder
337371 .append (TsFileConstant .PATH_SEPARATOR )
338- .append (Objects .isNull (segment ) ? placeHolder : segment );
372+ .append (Objects .isNull (segment ) ? sink . placeHolder : segment );
339373 }
340374 sourceNameList .add (idBuilder .toString ());
341375 }
0 commit comments