@@ -432,6 +432,88 @@ def setUp(self):
432432 )
433433
434434
435+ class ClusterLinkingTopicSyncingWithPlain (ClusterLinkingTopicSyncingTestBase ):
436+ """
437+ Run the same battery of tests with PLAIN
438+ """
439+
440+ def __init__ (self , test_context , * args , ** kwargs ):
441+ security = SecurityConfig ()
442+ security .enable_sasl = True
443+ security .sasl_mechanisms = ["SCRAM" , "PLAIN" ]
444+ secondary_args : SecondaryClusterArgs = SecondaryClusterArgs (security = security )
445+ self .cluster_link_user = "cluster-link-user"
446+ self .cluster_link_password = "cluster-link-password"
447+
448+ super ().__init__ (
449+ test_context = test_context ,
450+ secondary_cluster_args = secondary_args ,
451+ * args ,
452+ ** kwargs ,
453+ )
454+
455+ def validate_created_link (self , shadow_link : shadow_link_pb2 .ShadowLink ) -> None :
456+ now = time .time ()
457+ assert (
458+ shadow_link .configurations .client_options .authentication_configuration .WhichOneof (
459+ "authentication"
460+ )
461+ == "plain_configuration"
462+ ), (
463+ f"Expected 'plain_configuration' but got { shadow_link .configurations .client_options .authentication_configuration .WhichOneof ('authentication' )} "
464+ )
465+
466+ plain_config = shadow_link .configurations .client_options .authentication_configuration .plain_configuration
467+ assert plain_config .password_set , "Password not set in scram configuration"
468+ assert plain_config .password == "" , "Password should not be set"
469+ assert plain_config .username == self .cluster_link_user , (
470+ f"Username does not match: { plain_config .username } != { self .cluster_link_user } "
471+ )
472+ assert (
473+ plain_config .password_set_at != google .protobuf .timestamp_pb2 .Timestamp ()
474+ ), "Password set time not set"
475+
476+ assert now - 5 <= plain_config .password_set_at .seconds <= now + 5 , (
477+ f"Password set time not recent: { plain_config .password_set_at .seconds } vs { now } "
478+ )
479+
480+ def add_credentials_to_link (
481+ self , shadow_link : shadow_link_pb2 .ShadowLink
482+ ) -> shadow_link_pb2 .ShadowLink :
483+ self .logger .debug (
484+ f"Adding PLAIN credentials for user { self .cluster_link_user } to link"
485+ )
486+
487+ shadow_link .configurations .client_options .authentication_configuration .plain_configuration .CopyFrom (
488+ shadow_link_pb2 .PlainConfig (
489+ username = self .cluster_link_user , password = self .cluster_link_password
490+ )
491+ )
492+ return shadow_link
493+
494+ def get_source_cluster_rpk (self ) -> RpkTool :
495+ return RpkTool (
496+ self .source_cluster .service ,
497+ username = self .redpanda .SUPERUSER_CREDENTIALS .username ,
498+ password = self .redpanda .SUPERUSER_CREDENTIALS .password ,
499+ sasl_mechanism = self .redpanda .SUPERUSER_CREDENTIALS .mechanism ,
500+ )
501+
502+ def setUp (self ):
503+ super ().setUp ()
504+ self .get_source_cluster_rpk ().sasl_create_user (
505+ self .cluster_link_user , self .cluster_link_password
506+ )
507+ self .source_cluster .service .set_cluster_config (
508+ {
509+ "superusers" : [
510+ self .redpanda .SUPERUSER_CREDENTIALS .username ,
511+ self .cluster_link_user ,
512+ ]
513+ }
514+ )
515+
516+
435517class ClusterLinkingTopicSyncingWithTlsFiles (ClusterLinkingTopicSyncingTestBase ):
436518 """
437519 Runs the base tests with TLS enabled on both endpoints
0 commit comments