Skip to content

Commit b04f819

Browse files
authored
Pipe: Fixed the unstable OPC UA IT (#16972)
1 parent 9478b77 commit b04f819

File tree

2 files changed

+103
-36
lines changed

2 files changed

+103
-36
lines changed

integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java

Lines changed: 101 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iotdb.db.it.utils.TestUtils;
2626
import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
2727
import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
28+
import org.apache.iotdb.it.env.MultiEnvFactory;
2829
import org.apache.iotdb.it.env.cluster.EnvUtils;
2930
import org.apache.iotdb.it.framework.IoTDBTestRunner;
3031
import org.apache.iotdb.itbase.category.MultiClusterIT1;
@@ -45,11 +46,14 @@
4546
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
4647
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
4748
import org.junit.Assert;
49+
import org.junit.Before;
4850
import org.junit.Test;
4951
import org.junit.experimental.categories.Category;
5052
import org.junit.runner.RunWith;
5153

5254
import java.io.File;
55+
import java.net.ConnectException;
56+
import java.util.Arrays;
5357
import java.util.Collections;
5458
import java.util.HashMap;
5559
import java.util.Map;
@@ -62,8 +66,25 @@
6266
@RunWith(IoTDBTestRunner.class)
6367
@Category({MultiClusterIT1.class})
6468
public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
69+
70+
@Before
71+
public void setUp() {
72+
MultiEnvFactory.createEnv(1);
73+
env = MultiEnvFactory.getEnv(0);
74+
env.getConfig()
75+
.getCommonConfig()
76+
.setAutoCreateSchemaEnabled(true)
77+
.setPipeMemoryManagementEnabled(false)
78+
.setDataReplicationFactor(1)
79+
.setSchemaReplicationFactor(1)
80+
.setIsPipeEnableMemoryCheck(false)
81+
.setPipeAutoSplitFullEnabled(false);
82+
env.initClusterEnvironment(1, 1);
83+
}
84+
6585
@Test
6686
public void testOPCUAServerSink() throws Exception {
87+
int tcpPort = -1;
6788
try (final SyncConfigNodeIServiceClient client =
6889
(SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
6990

@@ -75,44 +96,82 @@ public void testOPCUAServerSink() throws Exception {
7596
sinkAttributes.put("opcua.model", "client-server");
7697
sinkAttributes.put("security-policy", "None");
7798

78-
final int[] ports = EnvUtils.searchAvailablePorts();
79-
final int tcpPort = ports[0];
80-
final int httpsPort = ports[1];
81-
sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
82-
sinkAttributes.put("https.port", Integer.toString(httpsPort));
99+
OpcUaClient opcUaClient;
100+
DataValue value;
101+
while (true) {
102+
final int[] ports = EnvUtils.searchAvailablePorts();
103+
tcpPort = ports[0];
104+
final int httpsPort = ports[1];
105+
sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
106+
sinkAttributes.put("https.port", Integer.toString(httpsPort));
83107

84-
Assert.assertEquals(
85-
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
86-
client
87-
.createPipe(
88-
new TCreatePipeReq("testPipe", sinkAttributes)
89-
.setExtractorAttributes(Collections.singletonMap("user", "root"))
90-
.setProcessorAttributes(Collections.emptyMap()))
91-
.getCode());
108+
Assert.assertEquals(
109+
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
110+
client
111+
.createPipe(
112+
new TCreatePipeReq("testPipe", sinkAttributes)
113+
.setExtractorAttributes(Collections.singletonMap("user", "root"))
114+
.setProcessorAttributes(Collections.emptyMap()))
115+
.getCode());
92116

93-
final OpcUaClient opcUaClient =
94-
getOpcUaClient(
95-
"opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root");
96-
DataValue value =
97-
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get();
98-
Assert.assertEquals(new Variant(1.0), value.getValue());
99-
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
117+
try {
118+
opcUaClient =
119+
getOpcUaClient(
120+
"opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root");
121+
} catch (final PipeException e) {
122+
if (e.getCause() instanceof ConnectException) {
123+
continue;
124+
} else {
125+
throw e;
126+
}
127+
}
128+
value =
129+
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get();
130+
Assert.assertEquals(new Variant(1.0), value.getValue());
131+
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
132+
opcUaClient.disconnect().get();
133+
break;
134+
}
100135

101-
Assert.assertEquals(
102-
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
103-
client
104-
.alterPipe(
105-
new TAlterPipeReq()
106-
.setPipeName("testPipe")
107-
.setIsReplaceAllConnectorAttributes(false)
108-
.setConnectorAttributes(Collections.singletonMap("with-quality", "true"))
109-
.setProcessorAttributes(Collections.emptyMap())
110-
.setExtractorAttributes(Collections.emptyMap()))
111-
.getCode());
136+
while (true) {
137+
final int[] ports = EnvUtils.searchAvailablePorts();
138+
tcpPort = ports[0];
139+
final int httpsPort = ports[1];
140+
sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
141+
sinkAttributes.put("https.port", Integer.toString(httpsPort));
142+
sinkAttributes.put("with-quality", "true");
112143

113-
TestUtils.executeNonQuery(
144+
Assert.assertEquals(
145+
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
146+
client
147+
.alterPipe(
148+
new TAlterPipeReq()
149+
.setPipeName("testPipe")
150+
.setIsReplaceAllConnectorAttributes(true)
151+
.setConnectorAttributes(sinkAttributes)
152+
.setProcessorAttributes(Collections.emptyMap())
153+
.setExtractorAttributes(Collections.emptyMap()))
154+
.getCode());
155+
try {
156+
opcUaClient =
157+
getOpcUaClient(
158+
"opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root");
159+
} catch (final PipeException e) {
160+
if (e.getCause() instanceof ConnectException) {
161+
continue;
162+
} else {
163+
throw e;
164+
}
165+
}
166+
break;
167+
}
168+
169+
// Create aligned timeSeries to avoid tsFile parsing
170+
TestUtils.executeNonQueries(
114171
env,
115-
"insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)",
172+
Arrays.asList(
173+
"create aligned timeSeries root.db.opc(value double, quality boolean, other int32)",
174+
"insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)"),
116175
null);
117176

118177
long startTime = System.currentTimeMillis();
@@ -168,11 +227,12 @@ public void testOPCUAServerSink() throws Exception {
168227
.getCode());
169228

170229
// Banned none, only allows basic256sha256
230+
final int finalTcpPort = tcpPort;
171231
Assert.assertThrows(
172232
PipeException.class,
173233
() ->
174234
getOpcUaClient(
175-
"opc.tcp://127.0.0.1:" + tcpPort + "/iotdb",
235+
"opc.tcp://127.0.0.1:" + finalTcpPort + "/iotdb",
176236
SecurityPolicy.None,
177237
"root",
178238
"root"));
@@ -188,6 +248,13 @@ public void testOPCUAServerSink() throws Exception {
188248
"org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing server with tcp port 12686 and https port 8443's password **** conflicts to the new password ****, reject reusing.",
189249
e.getMessage());
190250
}
251+
} finally {
252+
if (tcpPort >= 0) {
253+
final String lockPath = EnvUtils.getLockFilePath(tcpPort);
254+
if (!new File(lockPath).delete()) {
255+
System.out.printf("Delete lock file %s failed%n", lockPath);
256+
}
257+
}
191258
}
192259
}
193260

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,11 @@ public void run() {
102102
configurableUaClient.run(client);
103103
} catch (final Exception e) {
104104
throw new PipeException(
105-
"Error running opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage());
105+
"Error running opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage(), e);
106106
}
107107
} catch (final Exception e) {
108108
throw new PipeException(
109-
"Error getting opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage());
109+
"Error getting opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage(), e);
110110
}
111111
}
112112
}

0 commit comments

Comments
 (0)