Skip to content

Commit cb18a95

Browse files
authored
Pipe: Implemented OPC Sink for outer server & Set configuration and changed the default value of the server security policies & Made the default quality configurable and does not throw when non-value/quality measurement is encountered (#16944)
* pj * cj * bone * fix * fix * framework * fix * trilog * framework * fix * fix * yl * stack-client * fix * might * sleep-removal * cleaning * fix * sec-dir * cleaning * remove-poison * f * fix * clean-sit * sit-comp * object * many-clean * sit-sit * fix * fix * fix * ref * sit * partial * security-policies * check-equals * check-err * fix * compile-fix * adjust * ut * refactor * fix_and_IT * fix * placeholder * rollback * eliminate-fault * pw * fix * f * fix
1 parent b75bcc6 commit cb18a95

File tree

14 files changed

+1135
-147
lines changed

14 files changed

+1135
-147
lines changed

integration-test/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,6 @@
183183
<dependency>
184184
<groupId>org.bouncycastle</groupId>
185185
<artifactId>bcprov-jdk18on</artifactId>
186-
<scope>test</scope>
187186
</dependency>
188187
<dependency>
189188
<groupId>junit</groupId>

integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ abstract class AbstractPipeSingleIT {
3131

3232
@Before
3333
public void setUp() {
34-
MultiEnvFactory.createEnv(2);
34+
MultiEnvFactory.createEnv(1);
3535
env = MultiEnvFactory.getEnv(0);
3636
env.getConfig()
3737
.getCommonConfig()

integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java

Lines changed: 164 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,68 +20,174 @@
2020
package org.apache.iotdb.pipe.it.single;
2121

2222
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
23+
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
2324
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
2425
import org.apache.iotdb.db.it.utils.TestUtils;
26+
import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
27+
import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
28+
import org.apache.iotdb.it.env.cluster.EnvUtils;
2529
import org.apache.iotdb.it.framework.IoTDBTestRunner;
2630
import org.apache.iotdb.itbase.category.MultiClusterIT1;
31+
import org.apache.iotdb.pipe.api.exception.PipeException;
2732
import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
2833
import org.apache.iotdb.rpc.TSStatusCode;
2934

35+
import org.apache.tsfile.common.conf.TSFileConfig;
36+
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
37+
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
38+
import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
39+
import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
40+
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
41+
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
42+
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
43+
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
44+
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
45+
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
46+
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
3047
import org.junit.Assert;
3148
import org.junit.Test;
3249
import org.junit.experimental.categories.Category;
3350
import org.junit.runner.RunWith;
3451

52+
import java.io.File;
3553
import java.util.Collections;
3654
import java.util.HashMap;
3755
import java.util.Map;
56+
import java.util.Objects;
57+
import java.util.UUID;
58+
59+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
60+
import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
3861

3962
@RunWith(IoTDBTestRunner.class)
4063
@Category({MultiClusterIT1.class})
4164
public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
4265
@Test
43-
public void testOPCUASink() throws Exception {
66+
public void testOPCUAServerSink() throws Exception {
4467
try (final SyncConfigNodeIServiceClient client =
4568
(SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
4669

4770
TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values (1, 1)", null);
4871

49-
final Map<String, String> connectorAttributes = new HashMap<>();
50-
connectorAttributes.put("sink", "opc-ua-sink");
51-
connectorAttributes.put("opcua.model", "client-server");
72+
final Map<String, String> sinkAttributes = new HashMap<>();
73+
74+
sinkAttributes.put("sink", "opc-ua-sink");
75+
sinkAttributes.put("opcua.model", "client-server");
76+
sinkAttributes.put("security-policy", "None");
77+
78+
final int[] ports = EnvUtils.searchAvailablePorts();
79+
final int tcpPort = ports[0];
80+
final int httpsPort = ports[1];
81+
sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
82+
sinkAttributes.put("https.port", Integer.toString(httpsPort));
5283

5384
Assert.assertEquals(
5485
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
5586
client
5687
.createPipe(
57-
new TCreatePipeReq("testPipe", connectorAttributes)
58-
.setExtractorAttributes(Collections.emptyMap())
88+
new TCreatePipeReq("testPipe", sinkAttributes)
89+
.setExtractorAttributes(Collections.singletonMap("user", "root"))
5990
.setProcessorAttributes(Collections.emptyMap()))
6091
.getCode());
92+
93+
final OpcUaClient opcUaClient =
94+
getOpcUaClient(
95+
"opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root");
96+
DataValue value =
97+
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get();
98+
Assert.assertEquals(new Variant(1.0), value.getValue());
99+
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
100+
101+
Assert.assertEquals(
102+
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
103+
client
104+
.alterPipe(
105+
new TAlterPipeReq()
106+
.setPipeName("testPipe")
107+
.setIsReplaceAllConnectorAttributes(false)
108+
.setConnectorAttributes(Collections.singletonMap("with-quality", "true"))
109+
.setProcessorAttributes(Collections.emptyMap())
110+
.setExtractorAttributes(Collections.emptyMap()))
111+
.getCode());
112+
113+
TestUtils.executeNonQuery(
114+
env,
115+
"insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)",
116+
null);
117+
118+
long startTime = System.currentTimeMillis();
119+
while (true) {
120+
try {
121+
value =
122+
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get();
123+
Assert.assertEquals(new Variant(1.0), value.getValue());
124+
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
125+
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
126+
break;
127+
} catch (final Throwable t) {
128+
if (System.currentTimeMillis() - startTime > 10_000L) {
129+
throw t;
130+
}
131+
}
132+
}
133+
134+
TestUtils.executeNonQuery(
135+
env, "insert into root.db.opc(time, quality) values (2, true)", null);
136+
TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value) values (2, 2)", null);
137+
138+
startTime = System.currentTimeMillis();
139+
while (true) {
140+
try {
141+
value =
142+
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get();
143+
Assert.assertEquals(new DateTime(timestampToUtc(2)), value.getSourceTime());
144+
Assert.assertEquals(new Variant(2.0), value.getValue());
145+
Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode());
146+
break;
147+
} catch (final Throwable t) {
148+
if (System.currentTimeMillis() - startTime > 10_000L) {
149+
throw t;
150+
}
151+
}
152+
}
153+
154+
opcUaClient.disconnect().get();
61155
Assert.assertEquals(
62156
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("testPipe").getCode());
63157

64158
// Test reconstruction
65-
connectorAttributes.put("password123456", "test");
159+
sinkAttributes.put("password", "test");
160+
sinkAttributes.put("security-policy", "basic256sha256");
66161
Assert.assertEquals(
67162
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
68163
client
69164
.createPipe(
70-
new TCreatePipeReq("testPipe", connectorAttributes)
165+
new TCreatePipeReq("testPipe", sinkAttributes)
71166
.setExtractorAttributes(Collections.emptyMap())
72167
.setProcessorAttributes(Collections.emptyMap()))
73168
.getCode());
74169

170+
// Banned none, only allows basic256sha256
171+
Assert.assertThrows(
172+
PipeException.class,
173+
() ->
174+
getOpcUaClient(
175+
"opc.tcp://127.0.0.1:" + tcpPort + "/iotdb",
176+
SecurityPolicy.None,
177+
"root",
178+
"root"));
179+
75180
// Test conflict
76-
connectorAttributes.put("password123456", "conflict");
77-
Assert.assertEquals(
78-
TSStatusCode.PIPE_ERROR.getStatusCode(),
79-
client
80-
.createPipe(
81-
new TCreatePipeReq("testPipe", connectorAttributes)
82-
.setExtractorAttributes(Collections.emptyMap())
83-
.setProcessorAttributes(Collections.emptyMap()))
84-
.getCode());
181+
sinkAttributes.put("password", "conflict");
182+
try {
183+
TestUtils.executeNonQuery(
184+
env, "create pipe test1 ('sink'='opc-ua-sink', 'password'='conflict')", null);
185+
Assert.fail();
186+
} catch (final Exception e) {
187+
Assert.assertEquals(
188+
"org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing server with tcp port 12686 and https port 8443's password **** conflicts to the new password ****, reject reusing.",
189+
e.getMessage());
190+
}
85191
}
86192
}
87193

@@ -93,42 +199,74 @@ public void testOPCUASinkInTableModel() throws Exception {
93199
TableModelUtils.createDataBaseAndTable(env, "test", "test");
94200
TableModelUtils.insertData("test", "test", 0, 10, env);
95201

96-
final Map<String, String> connectorAttributes = new HashMap<>();
97-
connectorAttributes.put("sink", "opc-ua-sink");
98-
connectorAttributes.put("opcua.model", "client-server");
202+
final Map<String, String> sourceAttributes = new HashMap<>();
203+
final Map<String, String> sinkAttributes = new HashMap<>();
204+
sourceAttributes.put("capture.table", "true");
205+
sourceAttributes.put("user", "root");
206+
207+
sinkAttributes.put("sink", "opc-ua-sink");
208+
sinkAttributes.put("opcua.model", "client-server");
209+
210+
final int[] ports = EnvUtils.searchAvailablePorts();
211+
final int tcpPort = ports[0];
212+
final int httpsPort = ports[1];
213+
sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
214+
sinkAttributes.put("https.port", Integer.toString(httpsPort));
99215

100216
Assert.assertEquals(
101217
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
102218
client
103219
.createPipe(
104-
new TCreatePipeReq("testPipe", connectorAttributes)
105-
.setExtractorAttributes(Collections.singletonMap("capture.table", "true"))
220+
new TCreatePipeReq("testPipe", sinkAttributes)
221+
.setExtractorAttributes(sourceAttributes)
106222
.setProcessorAttributes(Collections.emptyMap()))
107223
.getCode());
108224
Assert.assertEquals(
109225
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("testPipe").getCode());
110226

111227
// Test reconstruction
112-
connectorAttributes.put("password123456", "test");
228+
sinkAttributes.put("password123456", "test");
113229
Assert.assertEquals(
114230
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
115231
client
116232
.createPipe(
117-
new TCreatePipeReq("testPipe", connectorAttributes)
233+
new TCreatePipeReq("testPipe", sinkAttributes)
118234
.setExtractorAttributes(Collections.emptyMap())
119235
.setProcessorAttributes(Collections.emptyMap()))
120236
.getCode());
121237

122238
// Test conflict
123-
connectorAttributes.put("password123456", "conflict");
239+
sinkAttributes.put("password123456", "conflict");
124240
Assert.assertEquals(
125241
TSStatusCode.PIPE_ERROR.getStatusCode(),
126242
client
127243
.createPipe(
128-
new TCreatePipeReq("testPipe", connectorAttributes)
244+
new TCreatePipeReq("testPipe", sinkAttributes)
129245
.setExtractorAttributes(Collections.emptyMap())
130246
.setProcessorAttributes(Collections.emptyMap()))
131247
.getCode());
132248
}
133249
}
250+
251+
private static OpcUaClient getOpcUaClient(
252+
final String nodeUrl,
253+
final SecurityPolicy policy,
254+
final String userName,
255+
final String password) {
256+
final IoTDBOpcUaClient client;
257+
258+
final IdentityProvider provider =
259+
Objects.nonNull(userName)
260+
? new UsernameProvider(userName, password)
261+
: new AnonymousProvider();
262+
263+
final String securityDir =
264+
CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE
265+
+ File.separatorChar
266+
+ UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET));
267+
268+
client = new IoTDBOpcUaClient(nodeUrl, policy, provider, false);
269+
new ClientRunner(client, securityDir, password).run();
270+
return client.getClient();
271+
}
134272
}

iotdb-core/datanode/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,18 @@
191191
<groupId>org.eclipse.milo</groupId>
192192
<artifactId>stack-server</artifactId>
193193
</dependency>
194+
<dependency>
195+
<groupId>org.eclipse.milo</groupId>
196+
<artifactId>stack-client</artifactId>
197+
</dependency>
198+
<dependency>
199+
<groupId>org.eclipse.milo</groupId>
200+
<artifactId>sdk-client</artifactId>
201+
</dependency>
202+
<dependency>
203+
<groupId>org.bouncycastle</groupId>
204+
<artifactId>bcprov-jdk18on</artifactId>
205+
</dependency>
194206
<dependency>
195207
<groupId>org.eclipse.jetty</groupId>
196208
<artifactId>jetty-http</artifactId>

0 commit comments

Comments
 (0)