Skip to content

Commit f8888a1

Browse files
committed
fix_and_IT
1 parent e19adb5 commit f8888a1

File tree

5 files changed

+145
-28
lines changed

5 files changed

+145
-28
lines changed

integration-test/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,6 @@
183183
<dependency>
184184
<groupId>org.bouncycastle</groupId>
185185
<artifactId>bcprov-jdk18on</artifactId>
186-
<scope>test</scope>
187186
</dependency>
188187
<dependency>
189188
<groupId>junit</groupId>

integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ abstract class AbstractPipeSingleIT {
3131

3232
@Before
3333
public void setUp() {
34-
MultiEnvFactory.createEnv(2);
34+
MultiEnvFactory.createEnv(1);
3535
env = MultiEnvFactory.getEnv(0);
3636
env.getConfig()
3737
.getCommonConfig()

integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java

Lines changed: 131 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,49 +20,123 @@
2020
package org.apache.iotdb.pipe.it.single;
2121

2222
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
23+
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
2324
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
2425
import org.apache.iotdb.db.it.utils.TestUtils;
26+
import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
27+
import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
2528
import org.apache.iotdb.it.framework.IoTDBTestRunner;
2629
import org.apache.iotdb.itbase.category.MultiClusterIT1;
30+
import org.apache.iotdb.pipe.api.exception.PipeException;
2731
import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
2832
import org.apache.iotdb.rpc.TSStatusCode;
2933

34+
import org.apache.tsfile.common.conf.TSFileConfig;
35+
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
36+
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
37+
import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
38+
import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
39+
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
40+
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
41+
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
42+
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
43+
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
44+
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
45+
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
3046
import org.junit.Assert;
3147
import org.junit.Test;
3248
import org.junit.experimental.categories.Category;
3349
import org.junit.runner.RunWith;
3450

51+
import java.io.File;
3552
import java.util.Collections;
3653
import java.util.HashMap;
3754
import java.util.Map;
55+
import java.util.Objects;
56+
import java.util.UUID;
57+
58+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
59+
import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
3860

3961
@RunWith(IoTDBTestRunner.class)
4062
@Category({MultiClusterIT1.class})
4163
public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
4264
@Test
43-
public void testOPCUASink() throws Exception {
65+
public void testOPCUAServerSink() throws Exception {
4466
try (final SyncConfigNodeIServiceClient client =
4567
(SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
4668

47-
TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values (1, 1)", null);
48-
4969
final Map<String, String> sinkAttributes = new HashMap<>();
70+
5071
sinkAttributes.put("sink", "opc-ua-sink");
5172
sinkAttributes.put("opcua.model", "client-server");
73+
sinkAttributes.put("security-policy", "None");
5274

5375
Assert.assertEquals(
5476
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
5577
client
5678
.createPipe(
5779
new TCreatePipeReq("testPipe", sinkAttributes)
58-
.setExtractorAttributes(Collections.emptyMap())
80+
.setExtractorAttributes(Collections.singletonMap("user", "root"))
5981
.setProcessorAttributes(Collections.emptyMap()))
6082
.getCode());
83+
84+
TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values (1, 1)", null);
85+
86+
final OpcUaClient opcUaClient =
87+
getOpcUaClient("opc.tcp://127.0.0.1:12686/iotdb", SecurityPolicy.None, "root", "root");
88+
DataValue value =
89+
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get();
90+
Assert.assertEquals(new Variant(1.0), value.getValue());
91+
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
92+
93+
Assert.assertEquals(
94+
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
95+
client
96+
.alterPipe(
97+
new TAlterPipeReq()
98+
.setPipeName("testPipe")
99+
.setIsReplaceAllConnectorAttributes(false)
100+
.setConnectorAttributes(Collections.singletonMap("with-quality", "true"))
101+
.setProcessorAttributes(Collections.emptyMap())
102+
.setExtractorAttributes(Collections.emptyMap()))
103+
.getCode());
104+
105+
TestUtils.executeNonQuery(
106+
env,
107+
"insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)",
108+
null);
109+
value = opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get();
110+
Assert.assertEquals(new Variant(1.0), value.getValue());
111+
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
112+
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
113+
114+
TestUtils.executeNonQuery(
115+
env, "insert into root.db.opc(time, quality) values (2, true)", null);
116+
TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value) values (2, 2)", null);
117+
118+
final long startTime = System.currentTimeMillis();
119+
while (true) {
120+
try {
121+
value = opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get();
122+
Assert.assertEquals(new DateTime(timestampToUtc(2)), value.getSourceTime());
123+
Assert.assertEquals(new Variant(2.0), value.getValue());
124+
Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode());
125+
break;
126+
} catch (final Throwable t) {
127+
if (System.currentTimeMillis() - startTime > 10_000L) {
128+
throw t;
129+
}
130+
}
131+
}
132+
133+
opcUaClient.disconnect().get();
61134
Assert.assertEquals(
62135
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("testPipe").getCode());
63136

64137
// Test reconstruction
65-
sinkAttributes.put("password123456", "test");
138+
sinkAttributes.put("password", "test");
139+
sinkAttributes.put("security-policy", "basic256sha256");
66140
Assert.assertEquals(
67141
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
68142
client
@@ -72,16 +146,24 @@ public void testOPCUASink() throws Exception {
72146
.setProcessorAttributes(Collections.emptyMap()))
73147
.getCode());
74148

149+
// Banned none, only allows basic256sha256
150+
Assert.assertThrows(
151+
PipeException.class,
152+
() ->
153+
getOpcUaClient(
154+
"opc.tcp://127.0.0.1:12686/iotdb", SecurityPolicy.None, "root", "root"));
155+
75156
// Test conflict
76-
sinkAttributes.put("password123456", "conflict");
77-
Assert.assertEquals(
78-
TSStatusCode.PIPE_ERROR.getStatusCode(),
79-
client
80-
.createPipe(
81-
new TCreatePipeReq("testPipe", sinkAttributes)
82-
.setExtractorAttributes(Collections.emptyMap())
83-
.setProcessorAttributes(Collections.emptyMap()))
84-
.getCode());
157+
sinkAttributes.put("password", "conflict");
158+
try {
159+
TestUtils.executeNonQuery(
160+
env, "create pipe test1 ('sink'='opc-ua-sink', 'password'='conflict')", null);
161+
Assert.fail();
162+
} catch (final Exception e) {
163+
Assert.assertEquals(
164+
"org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing server with tcp port 12686 and https port 8443's password test conflicts to the new password conflict, reject reusing.",
165+
e.getMessage());
166+
}
85167
}
86168
}
87169

@@ -93,42 +175,68 @@ public void testOPCUASinkInTableModel() throws Exception {
93175
TableModelUtils.createDataBaseAndTable(env, "test", "test");
94176
TableModelUtils.insertData("test", "test", 0, 10, env);
95177

96-
final Map<String, String> connectorAttributes = new HashMap<>();
97-
connectorAttributes.put("sink", "opc-ua-sink");
98-
connectorAttributes.put("opcua.model", "client-server");
178+
final Map<String, String> sourceAttributes = new HashMap<>();
179+
final Map<String, String> sinkAttributes = new HashMap<>();
180+
sourceAttributes.put("capture.table", "true");
181+
sourceAttributes.put("user", "root");
182+
183+
sinkAttributes.put("sink", "opc-ua-sink");
184+
sinkAttributes.put("opcua.model", "client-server");
99185

100186
Assert.assertEquals(
101187
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
102188
client
103189
.createPipe(
104-
new TCreatePipeReq("testPipe", connectorAttributes)
105-
.setExtractorAttributes(Collections.singletonMap("capture.table", "true"))
190+
new TCreatePipeReq("testPipe", sinkAttributes)
191+
.setExtractorAttributes(sourceAttributes)
106192
.setProcessorAttributes(Collections.emptyMap()))
107193
.getCode());
108194
Assert.assertEquals(
109195
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("testPipe").getCode());
110196

111197
// Test reconstruction
112-
connectorAttributes.put("password123456", "test");
198+
sinkAttributes.put("password123456", "test");
113199
Assert.assertEquals(
114200
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
115201
client
116202
.createPipe(
117-
new TCreatePipeReq("testPipe", connectorAttributes)
203+
new TCreatePipeReq("testPipe", sinkAttributes)
118204
.setExtractorAttributes(Collections.emptyMap())
119205
.setProcessorAttributes(Collections.emptyMap()))
120206
.getCode());
121207

122208
// Test conflict
123-
connectorAttributes.put("password123456", "conflict");
209+
sinkAttributes.put("password123456", "conflict");
124210
Assert.assertEquals(
125211
TSStatusCode.PIPE_ERROR.getStatusCode(),
126212
client
127213
.createPipe(
128-
new TCreatePipeReq("testPipe", connectorAttributes)
214+
new TCreatePipeReq("testPipe", sinkAttributes)
129215
.setExtractorAttributes(Collections.emptyMap())
130216
.setProcessorAttributes(Collections.emptyMap()))
131217
.getCode());
132218
}
133219
}
220+
221+
private static OpcUaClient getOpcUaClient(
222+
final String nodeUrl,
223+
final SecurityPolicy policy,
224+
final String userName,
225+
final String password) {
226+
final IoTDBOpcUaClient client;
227+
228+
final IdentityProvider provider =
229+
Objects.nonNull(userName)
230+
? new UsernameProvider(userName, password)
231+
: new AnonymousProvider();
232+
233+
final String securityDir =
234+
CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE
235+
+ File.separatorChar
236+
+ UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET));
237+
238+
client = new IoTDBOpcUaClient(nodeUrl, policy, provider, false);
239+
new ClientRunner(client, securityDir, password).run();
240+
return client.getClient();
241+
}
134242
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
2121

2222
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
23+
import org.apache.iotdb.commons.utils.TestOnly;
2324
import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
2425
import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
2526
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -65,6 +66,8 @@
6566

6667
public class IoTDBOpcUaClient {
6768
private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaNameSpace.class);
69+
70+
// Customized nodes
6871
private static final int NAME_SPACE_INDEX = 2;
6972
private final String nodeUrl;
7073

@@ -257,6 +260,11 @@ IdentityProvider getIdentityProvider() {
257260
return identityProvider;
258261
}
259262

263+
@TestOnly
264+
public OpcUaClient getClient() {
265+
return client;
266+
}
267+
260268
/////////////////////////////// Attribute creator ///////////////////////////////
261269

262270
private VariableAttributes createMeasurementAttributes(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.nio.file.Paths;
5858
import java.security.KeyPair;
5959
import java.security.cert.X509Certificate;
60+
import java.util.HashSet;
6061
import java.util.LinkedHashSet;
6162
import java.util.List;
6263
import java.util.Objects;
@@ -243,7 +244,8 @@ private Set<EndpointConfiguration> createEndpointConfigurations(
243244
USER_TOKEN_POLICY_USERNAME,
244245
USER_TOKEN_POLICY_X509);
245246

246-
if (securityPolicies.contains(SecurityPolicy.None)) {
247+
final Set<SecurityPolicy> securityPolicySet = new HashSet<>(securityPolicies);
248+
if (securityPolicySet.contains(SecurityPolicy.None)) {
247249
final EndpointConfiguration.Builder noSecurityBuilder =
248250
builder
249251
.copy()
@@ -252,10 +254,10 @@ private Set<EndpointConfiguration> createEndpointConfigurations(
252254

253255
endpointConfigurations.add(buildTcpEndpoint(noSecurityBuilder, tcpBindPort));
254256
endpointConfigurations.add(buildHttpsEndpoint(noSecurityBuilder, httpsBindPort));
255-
securityPolicies.remove(SecurityPolicy.None);
257+
securityPolicySet.remove(SecurityPolicy.None);
256258
}
257259

258-
for (final SecurityPolicy securityPolicy : securityPolicies) {
260+
for (final SecurityPolicy securityPolicy : securityPolicySet) {
259261
endpointConfigurations.add(
260262
buildTcpEndpoint(
261263
builder

0 commit comments

Comments
 (0)