2424import org .apache .iotdb .db .conf .IoTDBConfig ;
2525import org .apache .iotdb .db .pipe .event .common .tablet .PipeInsertNodeTabletInsertionEvent ;
2626import org .apache .iotdb .db .pipe .event .common .tablet .PipeRawTabletInsertionEvent ;
27+ import org .apache .iotdb .db .pipe .sink .protocol .opcua .client .IoTDBOpcUaClient ;
2728import org .apache .iotdb .db .pipe .sink .protocol .opcua .server .OpcUaNameSpace ;
2829import org .apache .iotdb .db .pipe .sink .protocol .opcua .server .OpcUaServerBuilder ;
2930import org .apache .iotdb .db .storageengine .StorageEngine ;
4041
4142import org .apache .tsfile .utils .Pair ;
4243import org .apache .tsfile .write .record .Tablet ;
44+ import org .eclipse .milo .opcua .sdk .client .api .identity .AnonymousProvider ;
45+ import org .eclipse .milo .opcua .sdk .client .api .identity .IdentityProvider ;
46+ import org .eclipse .milo .opcua .sdk .client .api .identity .UsernameProvider ;
4347import org .eclipse .milo .opcua .sdk .server .OpcUaServer ;
48+ import org .eclipse .milo .opcua .stack .core .security .SecurityPolicy ;
4449import org .slf4j .Logger ;
4550import org .slf4j .LoggerFactory ;
4651
6671import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE ;
6772import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_MODEL_KEY ;
6873import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE ;
74+ import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_NODE_URL_KEY ;
6975import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_PLACEHOLDER_DEFAULT_VALUE ;
7076import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_PLACEHOLDER_KEY ;
7177import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE ;
7278import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_QUALITY_NAME_KEY ;
79+ import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE ;
80+ import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE ;
81+ import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_128_RSA_15_VALUE ;
82+ import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_SHA_256_VALUE ;
83+ import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_VALUE ;
84+ import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_NONE_VALUE ;
7385import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE ;
7486import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_SECURITY_DIR_KEY ;
87+ import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_SECURITY_POLICY_KEY ;
7588import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE ;
7689import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY ;
7790import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE ;
8497import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY ;
8598import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .SINK_OPC_UA_HTTPS_BIND_PORT_KEY ;
8699import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .SINK_OPC_UA_MODEL_KEY ;
100+ import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .SINK_OPC_UA_NODE_URL_KEY ;
87101import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .SINK_OPC_UA_PLACEHOLDER_KEY ;
88102import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .SINK_OPC_UA_QUALITY_NAME_KEY ;
89103import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .SINK_OPC_UA_SECURITY_DIR_KEY ;
104+ import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .SINK_OPC_UA_SECURITY_POLICY_KEY ;
90105import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .SINK_OPC_UA_TCP_BIND_PORT_KEY ;
91106import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .SINK_OPC_UA_VALUE_NAME_KEY ;
92107import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .SINK_OPC_UA_WITH_QUALITY_KEY ;
@@ -114,7 +129,12 @@ public class OpcUaSink implements PipeConnector {
114129 String placeHolder ;
115130 @ Nullable String valueName ;
116131 @ Nullable String qualityName ;
117- private OpcUaNameSpace nameSpace ;
132+
133+ // Inner server
134+ private @ Nullable OpcUaNameSpace nameSpace ;
135+
136+ // Outer server
137+ private @ Nullable IoTDBOpcUaClient client ;
118138
119139 @ Override
120140 public void validate (final PipeParameterValidator validator ) throws Exception {
@@ -139,6 +159,17 @@ public void validate(final PipeParameterValidator validator) throws Exception {
139159 public void customize (
140160 final PipeParameters parameters , final PipeConnectorRuntimeConfiguration configuration )
141161 throws Exception {
162+ final String nodeUrl =
163+ parameters .getStringByKeys (CONNECTOR_OPC_UA_NODE_URL_KEY , SINK_OPC_UA_NODE_URL_KEY );
164+ if (Objects .isNull (nodeUrl )) {
165+ customizeServer (parameters , configuration );
166+ } else {
167+ customizeClient (nodeUrl , parameters );
168+ }
169+ }
170+
171+ private void customizeServer (
172+ final PipeParameters parameters , final PipeConnectorRuntimeConfiguration configuration ) {
142173 final int tcpBindPort =
143174 parameters .getIntOrDefault (
144175 Arrays .asList (CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY , SINK_OPC_UA_TCP_BIND_PORT_KEY ),
@@ -250,6 +281,48 @@ public void customize(
250281 }
251282 }
252283
284+ private void customizeClient (final String nodeUrl , final PipeParameters parameters ) {
285+ final SecurityPolicy policy ;
286+ switch (parameters
287+ .getStringOrDefault (
288+ Arrays .asList (CONNECTOR_OPC_UA_SECURITY_POLICY_KEY , SINK_OPC_UA_SECURITY_POLICY_KEY ),
289+ CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_SHA_256_VALUE )
290+ .toUpperCase ()) {
291+ case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_NONE_VALUE :
292+ policy = SecurityPolicy .None ;
293+ break ;
294+ case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_128_RSA_15_VALUE :
295+ policy = SecurityPolicy .Basic128Rsa15 ;
296+ break ;
297+ case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_VALUE :
298+ policy = SecurityPolicy .Basic256 ;
299+ break ;
300+ case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_SHA_256_VALUE :
301+ policy = SecurityPolicy .Basic256Sha256 ;
302+ break ;
303+ case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE :
304+ policy = SecurityPolicy .Aes128_Sha256_RsaOaep ;
305+ break ;
306+ case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE :
307+ policy = SecurityPolicy .Aes256_Sha256_RsaPss ;
308+ break ;
309+ default :
310+ policy = null ;
311+ break ;
312+ }
313+
314+ final IdentityProvider provider ;
315+ final String userName =
316+ parameters .getStringByKeys (CONNECTOR_IOTDB_USER_KEY , SINK_IOTDB_USER_KEY );
317+ provider =
318+ Objects .nonNull (userName )
319+ ? new UsernameProvider (
320+ userName ,
321+ parameters .getStringByKeys (CONNECTOR_IOTDB_PASSWORD_KEY , SINK_IOTDB_PASSWORD_KEY ))
322+ : new AnonymousProvider ();
323+ client = new IoTDBOpcUaClient (nodeUrl , policy , provider );
324+ }
325+
253326 @ Override
254327 public void handshake () throws Exception {
255328 // Server side, do nothing
@@ -359,7 +432,7 @@ public void close() throws Exception {
359432 }
360433 }
361434
362- // Getter
435+ /////////////////////////////// Getter ///////////////////////////////
363436
364437 public boolean isClientServerModel () {
365438 return isClientServerModel ;
0 commit comments