Skip to content

Commit 798e656

Browse files
committed
fix
1 parent 9478b77 commit 798e656

File tree

2 files changed

+69
-33
lines changed

2 files changed

+69
-33
lines changed

integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java

Lines changed: 67 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.junit.runner.RunWith;
5151

5252
import java.io.File;
53+
import java.net.ConnectException;
5354
import java.util.Collections;
5455
import java.util.HashMap;
5556
import java.util.Map;
@@ -75,40 +76,75 @@ public void testOPCUAServerSink() throws Exception {
7576
sinkAttributes.put("opcua.model", "client-server");
7677
sinkAttributes.put("security-policy", "None");
7778

78-
final int[] ports = EnvUtils.searchAvailablePorts();
79-
final int tcpPort = ports[0];
80-
final int httpsPort = ports[1];
81-
sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
82-
sinkAttributes.put("https.port", Integer.toString(httpsPort));
79+
OpcUaClient opcUaClient;
80+
DataValue value;
81+
while (true) {
82+
final int[] ports = EnvUtils.searchAvailablePorts();
83+
final int tcpPort = ports[0];
84+
final int httpsPort = ports[1];
85+
sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
86+
sinkAttributes.put("https.port", Integer.toString(httpsPort));
8387

84-
Assert.assertEquals(
85-
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
86-
client
87-
.createPipe(
88-
new TCreatePipeReq("testPipe", sinkAttributes)
89-
.setExtractorAttributes(Collections.singletonMap("user", "root"))
90-
.setProcessorAttributes(Collections.emptyMap()))
91-
.getCode());
88+
Assert.assertEquals(
89+
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
90+
client
91+
.createPipe(
92+
new TCreatePipeReq("testPipe", sinkAttributes)
93+
.setExtractorAttributes(Collections.singletonMap("user", "root"))
94+
.setProcessorAttributes(Collections.emptyMap()))
95+
.getCode());
9296

93-
final OpcUaClient opcUaClient =
94-
getOpcUaClient(
95-
"opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root");
96-
DataValue value =
97-
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get();
98-
Assert.assertEquals(new Variant(1.0), value.getValue());
99-
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
97+
try {
98+
opcUaClient =
99+
getOpcUaClient(
100+
"opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root");
101+
} catch (final PipeException e) {
102+
if (e.getCause() instanceof ConnectException) {
103+
continue;
104+
} else {
105+
throw e;
106+
}
107+
}
108+
value =
109+
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get();
110+
Assert.assertEquals(new Variant(1.0), value.getValue());
111+
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
112+
opcUaClient.disconnect().get();
113+
break;
114+
}
100115

101-
Assert.assertEquals(
102-
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
103-
client
104-
.alterPipe(
105-
new TAlterPipeReq()
106-
.setPipeName("testPipe")
107-
.setIsReplaceAllConnectorAttributes(false)
108-
.setConnectorAttributes(Collections.singletonMap("with-quality", "true"))
109-
.setProcessorAttributes(Collections.emptyMap())
110-
.setExtractorAttributes(Collections.emptyMap()))
111-
.getCode());
116+
while (true) {
117+
final int[] ports = EnvUtils.searchAvailablePorts();
118+
final int tcpPort = ports[0];
119+
final int httpsPort = ports[1];
120+
sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
121+
sinkAttributes.put("https.port", Integer.toString(httpsPort));
122+
sinkAttributes.put("with-quality", "true");
123+
124+
Assert.assertEquals(
125+
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
126+
client
127+
.alterPipe(
128+
new TAlterPipeReq()
129+
.setPipeName("testPipe")
130+
.setIsReplaceAllConnectorAttributes(true)
131+
.setConnectorAttributes(sinkAttributes)
132+
.setProcessorAttributes(Collections.emptyMap())
133+
.setExtractorAttributes(Collections.emptyMap()))
134+
.getCode());
135+
try {
136+
opcUaClient =
137+
getOpcUaClient(
138+
"opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root");
139+
} catch (final PipeException e) {
140+
if (e.getCause() instanceof ConnectException) {
141+
continue;
142+
} else {
143+
throw e;
144+
}
145+
}
146+
break;
147+
}
112148

113149
TestUtils.executeNonQuery(
114150
env,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,11 @@ public void run() {
102102
configurableUaClient.run(client);
103103
} catch (final Exception e) {
104104
throw new PipeException(
105-
"Error running opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage());
105+
"Error running opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage(), e);
106106
}
107107
} catch (final Exception e) {
108108
throw new PipeException(
109-
"Error getting opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage());
109+
"Error getting opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage(), e);
110110
}
111111
}
112112
}

0 commit comments

Comments
 (0)