@@ -4359,6 +4359,242 @@ def test_ps_s3_multiple_topics_notification():
43594359 http_server .close ()
43604360
43614361
4362+ @attr ('data_path_v2_test' )
4363+ def test_ps_s3_list_topics_migration ():
4364+ """ test list topics on migration"""
4365+ if get_config_cluster () == 'noname' :
4366+ return SkipTest ('realm is needed for migration test' )
4367+
4368+ # Initialize connections and configurations
4369+ conn1 = connection ()
4370+ tenant = 'kaboom1'
4371+ conn2 = connect_random_user (tenant )
4372+ bucket_name = gen_bucket_name ()
4373+ topics = [f"{ bucket_name } { TOPIC_SUFFIX } { i } " for i in range (1 , 7 )]
4374+ tenant_topics = [f"{ tenant } _{ topic } " for topic in topics ]
4375+
4376+ # Define topic names with version
4377+ topic_versions = {
4378+ "topic1_v2" : f"{ topics [0 ]} _v2" ,
4379+ "topic2_v2" : f"{ topics [1 ]} _v2" ,
4380+ "topic3_v1" : f"{ topics [2 ]} _v1" ,
4381+ "topic4_v1" : f"{ topics [3 ]} _v1" ,
4382+ "topic5_v1" : f"{ topics [4 ]} _v1" ,
4383+ "topic6_v1" : f"{ topics [5 ]} _v1" ,
4384+ "tenant_topic1_v2" : f"{ tenant_topics [0 ]} _v2" ,
4385+ "tenant_topic2_v1" : f"{ tenant_topics [1 ]} _v1" ,
4386+ "tenant_topic3_v1" : f"{ tenant_topics [2 ]} _v1"
4387+ }
4388+
4389+ # Get necessary configurations
4390+ host = get_ip ()
4391+ http_port = random .randint (10000 , 20000 )
4392+ endpoint_address = 'http://' + host + ':' + str (http_port )
4393+ endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
4394+ zonegroup = get_config_zonegroup ()
4395+ conf_cluster = get_config_cluster ()
4396+
4397+ # Make sure there are no leftover topics on v2
4398+ zonegroup_modify_feature (enable = True , feature_name = zonegroup_feature_notification_v2 )
4399+ delete_all_topics (conn1 , '' , conf_cluster )
4400+ delete_all_topics (conn2 , tenant , conf_cluster )
4401+
4402+ # Start v1 notification
4403+ # Make sure there are no leftover topics on v1
4404+ zonegroup_modify_feature (enable = False , feature_name = zonegroup_feature_notification_v2 )
4405+ delete_all_topics (conn1 , '' , conf_cluster )
4406+ delete_all_topics (conn2 , tenant , conf_cluster )
4407+
4408+ # Create s3 - v1 topics
4409+ topic_conf = PSTopicS3 (conn1 , topic_versions ['topic3_v1' ], zonegroup , endpoint_args = endpoint_args )
4410+ topic_arn3 = topic_conf .set_config ()
4411+ topic_conf = PSTopicS3 (conn1 , topic_versions ['topic4_v1' ], zonegroup , endpoint_args = endpoint_args )
4412+ topic_arn4 = topic_conf .set_config ()
4413+ topic_conf = PSTopicS3 (conn1 , topic_versions ['topic5_v1' ], zonegroup , endpoint_args = endpoint_args )
4414+ topic_arn5 = topic_conf .set_config ()
4415+ topic_conf = PSTopicS3 (conn1 , topic_versions ['topic6_v1' ], zonegroup , endpoint_args = endpoint_args )
4416+ topic_arn6 = topic_conf .set_config ()
4417+ tenant_topic_conf = PSTopicS3 (conn2 , topic_versions ['tenant_topic2_v1' ], zonegroup , endpoint_args = endpoint_args )
4418+ tenant_topic_arn2 = tenant_topic_conf .set_config ()
4419+ tenant_topic_conf = PSTopicS3 (conn2 , topic_versions ['tenant_topic3_v1' ], zonegroup , endpoint_args = endpoint_args )
4420+ tenant_topic_arn3 = tenant_topic_conf .set_config ()
4421+
4422+ # Start v2 notification
4423+ zonegroup_modify_feature (enable = True , feature_name = zonegroup_feature_notification_v2 )
4424+
4425+ # Create s3 - v2 topics
4426+ topic_conf = PSTopicS3 (conn1 , topic_versions ['topic1_v2' ], zonegroup , endpoint_args = endpoint_args )
4427+ topic_arn1 = topic_conf .set_config ()
4428+ topic_conf = PSTopicS3 (conn1 , topic_versions ['topic2_v2' ], zonegroup , endpoint_args = endpoint_args )
4429+ topic_arn2 = topic_conf .set_config ()
4430+ tenant_topic_conf = PSTopicS3 (conn2 , topic_versions ['tenant_topic1_v2' ], zonegroup , endpoint_args = endpoint_args )
4431+ tenant_topic_arn1 = tenant_topic_conf .set_config ()
4432+
4433+ # Verify topics list
4434+ try :
4435+ # Verify no tenant topics
4436+ res , status = topic_conf .get_list ()
4437+ assert_equal (status // 100 , 2 )
4438+ listTopicsResponse = res .get ('ListTopicsResponse' , {})
4439+ listTopicsResult = listTopicsResponse .get ('ListTopicsResult' , {})
4440+ topics = listTopicsResult .get ('Topics' , {})
4441+ member = topics ['member' ] if topics else []
4442+ assert_equal (len (member ), 6 )
4443+
4444+ # Verify tenant topics
4445+ res , status = tenant_topic_conf .get_list ()
4446+ assert_equal (status // 100 , 2 )
4447+ listTopicsResponse = res .get ('ListTopicsResponse' , {})
4448+ listTopicsResult = listTopicsResponse .get ('ListTopicsResult' , {})
4449+ topics = listTopicsResult .get ('Topics' , {})
4450+ member = topics ['member' ] if topics else []
4451+ assert_equal (len (member ), 3 )
4452+ finally :
4453+ # Cleanup created topics
4454+ topic_conf .del_config (topic_arn1 )
4455+ topic_conf .del_config (topic_arn2 )
4456+ topic_conf .del_config (topic_arn3 )
4457+ topic_conf .del_config (topic_arn4 )
4458+ topic_conf .del_config (topic_arn5 )
4459+ topic_conf .del_config (topic_arn6 )
4460+ tenant_topic_conf .del_config (tenant_topic_arn1 )
4461+ tenant_topic_conf .del_config (tenant_topic_arn2 )
4462+ tenant_topic_conf .del_config (tenant_topic_arn3 )
4463+
4464+
4465+ @attr ('basic_test' )
4466+ def test_ps_s3_list_topics ():
4467+ """ test list topics"""
4468+
4469+ # Initialize connections, topic names and configurations
4470+ conn1 = connection ()
4471+ tenant = 'kaboom1'
4472+ conn2 = connect_random_user (tenant )
4473+ bucket_name = gen_bucket_name ()
4474+ topic_name1 = bucket_name + TOPIC_SUFFIX + '1'
4475+ topic_name2 = bucket_name + TOPIC_SUFFIX + '2'
4476+ topic_name3 = bucket_name + TOPIC_SUFFIX + '3'
4477+ tenant_topic_name1 = tenant + "_" + topic_name1
4478+ tenant_topic_name2 = tenant + "_" + topic_name2
4479+ host = get_ip ()
4480+ http_port = random .randint (10000 , 20000 )
4481+ endpoint_address = 'http://' + host + ':' + str (http_port )
4482+ endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
4483+ zonegroup = get_config_zonegroup ()
4484+
4485+ # Make sure there are no leftover topics
4486+ delete_all_topics (conn1 , '' , get_config_cluster ())
4487+ delete_all_topics (conn2 , tenant , get_config_cluster ())
4488+
4489+ # Create s3 - v2 topics
4490+ topic_conf = PSTopicS3 (conn1 , topic_name1 , zonegroup , endpoint_args = endpoint_args )
4491+ topic_arn1 = topic_conf .set_config ()
4492+ topic_conf = PSTopicS3 (conn1 , topic_name2 , zonegroup , endpoint_args = endpoint_args )
4493+ topic_arn2 = topic_conf .set_config ()
4494+ topic_conf = PSTopicS3 (conn1 , topic_name3 , zonegroup , endpoint_args = endpoint_args )
4495+ topic_arn3 = topic_conf .set_config ()
4496+ tenant_topic_conf = PSTopicS3 (conn2 , tenant_topic_name1 , zonegroup , endpoint_args = endpoint_args )
4497+ tenant_topic_arn1 = tenant_topic_conf .set_config ()
4498+ tenant_topic_conf = PSTopicS3 (conn2 , tenant_topic_name2 , zonegroup , endpoint_args = endpoint_args )
4499+ tenant_topic_arn2 = tenant_topic_conf .set_config ()
4500+
4501+ # Verify topics list
4502+ try :
4503+ # Verify no tenant topics
4504+ res , status = topic_conf .get_list ()
4505+ assert_equal (status // 100 , 2 )
4506+ listTopicsResponse = res .get ('ListTopicsResponse' , {})
4507+ listTopicsResult = listTopicsResponse .get ('ListTopicsResult' , {})
4508+ topics = listTopicsResult .get ('Topics' , {})
4509+ member = topics ['member' ] if topics else [] # version 2
4510+ assert_equal (len (member ), 3 )
4511+
4512+ # Verify topics for tenant
4513+ res , status = tenant_topic_conf .get_list ()
4514+ assert_equal (status // 100 , 2 )
4515+ listTopicsResponse = res .get ('ListTopicsResponse' , {})
4516+ listTopicsResult = listTopicsResponse .get ('ListTopicsResult' , {})
4517+ topics = listTopicsResult .get ('Topics' , {})
4518+ member = topics ['member' ] if topics else []
4519+ assert_equal (len (member ), 2 )
4520+ finally :
4521+ # Cleanup created topics
4522+ topic_conf .del_config (topic_arn1 )
4523+ topic_conf .del_config (topic_arn2 )
4524+ topic_conf .del_config (topic_arn3 )
4525+ tenant_topic_conf .del_config (tenant_topic_arn1 )
4526+ tenant_topic_conf .del_config (tenant_topic_arn2 )
4527+
4528+ @attr ('data_path_v2_test' )
4529+ def test_ps_s3_list_topics_v1 ():
4530+ """ test list topics on v1"""
4531+ if get_config_cluster () == 'noname' :
4532+ return SkipTest ('realm is needed' )
4533+
4534+ # Initialize connections and configurations
4535+ conn1 = connection ()
4536+ tenant = 'kaboom1'
4537+ conn2 = connect_random_user (tenant )
4538+ bucket_name = gen_bucket_name ()
4539+ topic_name1 = bucket_name + TOPIC_SUFFIX + '1'
4540+ topic_name2 = bucket_name + TOPIC_SUFFIX + '2'
4541+ topic_name3 = bucket_name + TOPIC_SUFFIX + '3'
4542+ tenant_topic_name1 = tenant + "_" + topic_name1
4543+ tenant_topic_name2 = tenant + "_" + topic_name2
4544+ host = get_ip ()
4545+ http_port = random .randint (10000 , 20000 )
4546+ endpoint_address = 'http://' + host + ':' + str (http_port )
4547+ endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
4548+ zonegroup = get_config_zonegroup ()
4549+ conf_cluster = get_config_cluster ()
4550+
4551+ # Make sure there are no leftover topics
4552+ delete_all_topics (conn1 , '' , conf_cluster )
4553+ delete_all_topics (conn2 , tenant , conf_cluster )
4554+
4555+ # Make sure that we disable v2
4556+ zonegroup_modify_feature (enable = False , feature_name = zonegroup_feature_notification_v2 )
4557+
4558+ # Create s3 - v1 topics
4559+ topic_conf = PSTopicS3 (conn1 , topic_name1 , zonegroup , endpoint_args = endpoint_args )
4560+ topic_arn1 = topic_conf .set_config ()
4561+ topic_conf = PSTopicS3 (conn1 , topic_name2 , zonegroup , endpoint_args = endpoint_args )
4562+ topic_arn2 = topic_conf .set_config ()
4563+ topic_conf = PSTopicS3 (conn1 , topic_name3 , zonegroup , endpoint_args = endpoint_args )
4564+ topic_arn3 = topic_conf .set_config ()
4565+ tenant_topic_conf = PSTopicS3 (conn2 , tenant_topic_name1 , zonegroup , endpoint_args = endpoint_args )
4566+ tenant_topic_arn1 = tenant_topic_conf .set_config ()
4567+ tenant_topic_conf = PSTopicS3 (conn2 , tenant_topic_name2 , zonegroup , endpoint_args = endpoint_args )
4568+ tenant_topic_arn2 = tenant_topic_conf .set_config ()
4569+
4570+ # Verify topics list
4571+ try :
4572+ # Verify no tenant topics
4573+ res , status = topic_conf .get_list ()
4574+ assert_equal (status // 100 , 2 )
4575+ listTopicsResponse = res .get ('ListTopicsResponse' , {})
4576+ listTopicsResult = listTopicsResponse .get ('ListTopicsResult' , {})
4577+ topics = listTopicsResult .get ('Topics' , {})
4578+ member = topics ['member' ] if topics else []
4579+ assert_equal (len (member ), 3 )
4580+
4581+ # Verify tenant topics
4582+ res , status = tenant_topic_conf .get_list ()
4583+ assert_equal (status // 100 , 2 )
4584+ listTopicsResponse = res .get ('ListTopicsResponse' , {})
4585+ listTopicsResult = listTopicsResponse .get ('ListTopicsResult' , {})
4586+ topics = listTopicsResult .get ('Topics' , {})
4587+ member = topics ['member' ] if topics else []
4588+ assert_equal (len (member ), 2 )
4589+ finally :
4590+ # Cleanup created topics
4591+ topic_conf .del_config (topic_arn1 )
4592+ topic_conf .del_config (topic_arn2 )
4593+ topic_conf .del_config (topic_arn3 )
4594+ tenant_topic_conf .del_config (tenant_topic_arn1 )
4595+ tenant_topic_conf .del_config (tenant_topic_arn2 )
4596+
4597+
43624598@attr ('basic_test' )
43634599def test_ps_s3_topic_permissions ():
43644600 """ test s3 topic set/get/delete permissions """
0 commit comments