5757import java .util .Arrays ;
5858import java .util .Map ;
5959import java .util .Objects ;
60+ import java .util .Set ;
6061import java .util .UUID ;
6162import java .util .concurrent .ConcurrentHashMap ;
6263import java .util .concurrent .atomic .AtomicInteger ;
64+ import java .util .stream .Collectors ;
6365
6466import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE ;
6567import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_IOTDB_PASSWORD_KEY ;
9092import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE ;
9193import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_SECURITY_DIR_KEY ;
9294import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_SECURITY_POLICY_KEY ;
95+ import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES ;
9396import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE ;
9497import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY ;
9598import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE ;
@@ -270,6 +273,18 @@ private void customizeServer(final PipeParameters parameters) {
270273 CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY ,
271274 SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY ),
272275 CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE );
276+ final Set <SecurityPolicy > securityPolicies =
277+ (parameters .hasAnyAttributes (
278+ CONNECTOR_OPC_UA_SECURITY_POLICY_KEY , SINK_OPC_UA_SECURITY_POLICY_KEY )
279+ ? Arrays .stream (
280+ parameters
281+ .getStringByKeys (
282+ CONNECTOR_OPC_UA_SECURITY_POLICY_KEY , SINK_OPC_UA_SECURITY_POLICY_KEY )
283+ .replace (" " , "" )
284+ .split ("," ))
285+ : CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES .stream ())
286+ .map (this ::getSecurityPolicy )
287+ .collect (Collectors .toSet ());
273288
274289 synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP ) {
275290 serverKey = httpsBindPort + ":" + tcpBindPort ;
@@ -288,7 +303,8 @@ private void customizeServer(final PipeParameters parameters) {
288303 .setUser (user )
289304 .setPassword (password )
290305 .setSecurityDir (securityDir )
291- .setEnableAnonymousAccess (enableAnonymousAccess );
306+ .setEnableAnonymousAccess (enableAnonymousAccess )
307+ .setSecurityPolicies (securityPolicies );
292308 final OpcUaServer newServer = builder .build ();
293309 nameSpace = new OpcUaNameSpace (newServer , builder );
294310 nameSpace .startup ();
@@ -312,34 +328,14 @@ private void customizeServer(final PipeParameters parameters) {
312328 }
313329
314330 private void customizeClient (final String nodeUrl , final PipeParameters parameters ) {
315- final SecurityPolicy policy ;
316- switch (parameters
317- .getStringOrDefault (
318- Arrays .asList (CONNECTOR_OPC_UA_SECURITY_POLICY_KEY , SINK_OPC_UA_SECURITY_POLICY_KEY ),
319- CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_SHA_256_VALUE )
320- .toUpperCase ()) {
321- case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_NONE_VALUE :
322- policy = SecurityPolicy .None ;
323- break ;
324- case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_128_RSA_15_VALUE :
325- policy = SecurityPolicy .Basic128Rsa15 ;
326- break ;
327- case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_VALUE :
328- policy = SecurityPolicy .Basic256 ;
329- break ;
330- case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_SHA_256_VALUE :
331- policy = SecurityPolicy .Basic256Sha256 ;
332- break ;
333- case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE :
334- policy = SecurityPolicy .Aes128_Sha256_RsaOaep ;
335- break ;
336- case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE :
337- policy = SecurityPolicy .Aes256_Sha256_RsaPss ;
338- break ;
339- default :
340- throw new PipeException (
341- "The security policy can only be 'None', 'Basic128Rsa15', 'Basic256', 'Basic256Sha256', 'Aes128_Sha256_RsaOaep' or 'Aes256_Sha256_RsaPss'." );
342- }
331+ final SecurityPolicy policy =
332+ getSecurityPolicy (
333+ parameters
334+ .getStringOrDefault (
335+ Arrays .asList (
336+ CONNECTOR_OPC_UA_SECURITY_POLICY_KEY , SINK_OPC_UA_SECURITY_POLICY_KEY ),
337+ CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_SHA_256_VALUE )
338+ .toUpperCase ());
343339
344340 final IdentityProvider provider ;
345341 final String userName =
@@ -372,6 +368,26 @@ private void customizeClient(final String nodeUrl, final PipeParameters paramete
372368 new ClientRunner (client , securityDir , password ).run ();
373369 }
374370
371+ private SecurityPolicy getSecurityPolicy (final String securityPolicy ) {
372+ switch (securityPolicy .toUpperCase ()) {
373+ case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_NONE_VALUE :
374+ return SecurityPolicy .None ;
375+ case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_128_RSA_15_VALUE :
376+ return SecurityPolicy .Basic128Rsa15 ;
377+ case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_VALUE :
378+ return SecurityPolicy .Basic256 ;
379+ case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_SHA_256_VALUE :
380+ return SecurityPolicy .Basic256Sha256 ;
381+ case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE :
382+ return SecurityPolicy .Aes128_Sha256_RsaOaep ;
383+ case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE :
384+ return SecurityPolicy .Aes256_Sha256_RsaPss ;
385+ default :
386+ throw new PipeException (
387+ "The security policy can only be 'None', 'Basic128Rsa15', 'Basic256', 'Basic256Sha256', 'Aes128_Sha256_RsaOaep' or 'Aes256_Sha256_RsaPss'." );
388+ }
389+ }
390+
375391 @ Override
376392 public void handshake () throws Exception {
377393 // Server side, do nothing
0 commit comments