Skip to content

Commit fd39559

Browse files
committed
fix
1 parent 8efcd63 commit fd39559

File tree

2 files changed

+2
-68
lines changed

2 files changed

+2
-68
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.db.conf.IoTDBConfig;
2525
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
2626
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
27+
import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
2728
import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
2829
import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
2930
import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder;
@@ -321,6 +322,7 @@ private void customizeClient(final String nodeUrl, final PipeParameters paramete
321322
parameters.getStringByKeys(CONNECTOR_IOTDB_PASSWORD_KEY, SINK_IOTDB_PASSWORD_KEY))
322323
: new AnonymousProvider();
323324
client = new IoTDBOpcUaClient(nodeUrl, policy, provider);
325+
new ClientRunner(client).run();
324326
}
325327

326328
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java

Lines changed: 0 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -68,74 +68,6 @@ public IoTDBOpcUaClient(
6868
public void run(OpcUaClient client) throws Exception {
6969
// synchronous connect
7070
client.connect().get();
71-
72-
// create a subscription and a monitored item
73-
final UaSubscription subscription =
74-
client.getSubscriptionManager().createSubscription(200.0).get();
75-
76-
final ReadValueId readValueId =
77-
new ReadValueId(
78-
Identifiers.Server, AttributeId.EventNotifier.uid(), null, QualifiedName.NULL_VALUE);
79-
80-
// client handle must be unique per item
81-
final UInteger clientHandle = uint(clientHandles.getAndIncrement());
82-
83-
final EventFilter eventFilter =
84-
new EventFilter(
85-
new SimpleAttributeOperand[] {
86-
new SimpleAttributeOperand(
87-
Identifiers.BaseEventType,
88-
new QualifiedName[] {new QualifiedName(0, "Time")},
89-
AttributeId.Value.uid(),
90-
null),
91-
new SimpleAttributeOperand(
92-
Identifiers.BaseEventType,
93-
new QualifiedName[] {new QualifiedName(0, "Message")},
94-
AttributeId.Value.uid(),
95-
null),
96-
new SimpleAttributeOperand(
97-
Identifiers.BaseEventType,
98-
new QualifiedName[] {new QualifiedName(0, "SourceName")},
99-
AttributeId.Value.uid(),
100-
null),
101-
new SimpleAttributeOperand(
102-
Identifiers.BaseEventType,
103-
new QualifiedName[] {new QualifiedName(0, "SourceNode")},
104-
AttributeId.Value.uid(),
105-
null)
106-
},
107-
new ContentFilter(null));
108-
109-
final MonitoringParameters parameters =
110-
new MonitoringParameters(
111-
clientHandle,
112-
0.0,
113-
ExtensionObject.encode(client.getStaticSerializationContext(), eventFilter),
114-
uint(10000),
115-
true);
116-
117-
final MonitoredItemCreateRequest request =
118-
new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
119-
120-
final List<UaMonitoredItem> items =
121-
subscription
122-
.createMonitoredItems(TimestampsToReturn.Both, Collections.singletonList(request))
123-
.get();
124-
125-
// do something with the value updates
126-
final UaMonitoredItem monitoredItem = items.get(0);
127-
128-
final AtomicInteger eventCount = new AtomicInteger(0);
129-
130-
monitoredItem.setEventConsumer(
131-
(item, vs) -> {
132-
eventCount.incrementAndGet();
133-
System.out.println("Event Received from " + item.getReadValueId().getNodeId());
134-
135-
for (int i = 0; i < vs.length; i++) {
136-
System.out.println(("\tvariant[" + i + "]: " + vs[i].getValue()));
137-
}
138-
});
13971
}
14072

14173
/////////////////////////////// Getter ///////////////////////////////

0 commit comments

Comments
 (0)