@@ -32,6 +32,7 @@ import (
3232 "k8s.io/apimachinery/pkg/types"
3333 "k8s.io/utils/pointer"
3434
35+ adminutils "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
3536 v1alphav1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1"
3637 "github.com/streamnative/pulsar-resources-operator/tests/utils"
3738)
@@ -878,6 +879,300 @@ var _ = Describe("Resources", func() {
878879 })
879880 })
880881
882+ Context ("PulsarTopic Subscribe Rate" , Ordered , func () {
883+ var (
884+ subscribeRateTopic * v1alphav1.PulsarTopic
885+ subscribeRateTopicName string = "test-subscribe-rate-topic"
886+ )
887+
888+ BeforeAll (func () {
889+ subscribeRateTopic = utils .MakePulsarTopicWithSubscribeRate (
890+ namespaceName ,
891+ subscribeRateTopicName ,
892+ "persistent://public/default/subscribe-rate-test" ,
893+ pconnName ,
894+ lifecyclePolicy ,
895+ )
896+ })
897+
898+ It ("should create topic with subscribe rate successfully" , func () {
899+ err := k8sClient .Create (ctx , subscribeRateTopic )
900+ Expect (err == nil || apierrors .IsAlreadyExists (err )).Should (BeTrue ())
901+ })
902+
903+ It ("should be ready" , func () {
904+ Eventually (func () bool {
905+ t := & v1alphav1.PulsarTopic {}
906+ tns := types.NamespacedName {Namespace : namespaceName , Name : subscribeRateTopicName }
907+ Expect (k8sClient .Get (ctx , tns , t )).Should (Succeed ())
908+ return v1alphav1 .IsPulsarResourceReady (t )
909+ }, "20s" , "100ms" ).Should (BeTrue ())
910+ })
911+
912+ It ("should have correct subscribe rate configuration" , func () {
913+ topic := & v1alphav1.PulsarTopic {}
914+ tns := types.NamespacedName {Namespace : namespaceName , Name : subscribeRateTopicName }
915+ Expect (k8sClient .Get (ctx , tns , topic )).Should (Succeed ())
916+
917+ // Verify SubscribeRate
918+ Expect (topic .Spec .SubscribeRate ).ShouldNot (BeNil ())
919+ Expect (* topic .Spec .SubscribeRate .SubscribeThrottlingRatePerConsumer ).Should (Equal (int32 (10 )))
920+ Expect (* topic .Spec .SubscribeRate .RatePeriodInSecond ).Should (Equal (int32 (30 )))
921+ })
922+
923+ It ("should update subscribe rate successfully" , func () {
924+ topic := & v1alphav1.PulsarTopic {}
925+ tns := types.NamespacedName {Namespace : namespaceName , Name : subscribeRateTopicName }
926+ Expect (k8sClient .Get (ctx , tns , topic )).Should (Succeed ())
927+
928+ // Update subscribe rate
929+ topic .Spec .SubscribeRate .SubscribeThrottlingRatePerConsumer = pointer .Int32 (20 )
930+ topic .Spec .SubscribeRate .RatePeriodInSecond = pointer .Int32 (60 )
931+ err := k8sClient .Update (ctx , topic )
932+ Expect (err ).Should (Succeed ())
933+ })
934+
935+ It ("should be ready after update" , func () {
936+ Eventually (func () bool {
937+ t := & v1alphav1.PulsarTopic {}
938+ tns := types.NamespacedName {Namespace : namespaceName , Name : subscribeRateTopicName }
939+ Expect (k8sClient .Get (ctx , tns , t )).Should (Succeed ())
940+ return v1alphav1 .IsPulsarResourceReady (t )
941+ }, "20s" , "100ms" ).Should (BeTrue ())
942+ })
943+
944+ AfterAll (func () {
945+ if subscribeRateTopic != nil {
946+ Eventually (func (g Gomega ) {
947+ t := & v1alphav1.PulsarTopic {}
948+ tns := types.NamespacedName {Namespace : namespaceName , Name : subscribeRateTopicName }
949+ g .Expect (k8sClient .Get (ctx , tns , t )).Should (Succeed ())
950+ g .Expect (k8sClient .Delete (ctx , t )).Should (Succeed ())
951+ }).Should (Succeed ())
952+ }
953+ })
954+ })
955+
956+ Context ("PulsarTopic Offload Policies" , Ordered , func () {
957+ var (
958+ offloadPoliciesTopic * v1alphav1.PulsarTopic
959+ offloadPoliciesTopicName string = "test-offload-policies-topic"
960+ )
961+
962+ BeforeAll (func () {
963+ offloadPoliciesTopic = utils .MakePulsarTopicWithOffloadPolicies (
964+ namespaceName ,
965+ offloadPoliciesTopicName ,
966+ "persistent://public/default/offload-policies-test" ,
967+ pconnName ,
968+ lifecyclePolicy ,
969+ )
970+ })
971+
972+ It ("should create topic with offload policies successfully" , func () {
973+ err := k8sClient .Create (ctx , offloadPoliciesTopic )
974+ Expect (err == nil || apierrors .IsAlreadyExists (err )).Should (BeTrue ())
975+ })
976+
977+ It ("should be ready" , func () {
978+ Eventually (func () bool {
979+ t := & v1alphav1.PulsarTopic {}
980+ tns := types.NamespacedName {Namespace : namespaceName , Name : offloadPoliciesTopicName }
981+ Expect (k8sClient .Get (ctx , tns , t )).Should (Succeed ())
982+ return v1alphav1 .IsPulsarResourceReady (t )
983+ }, "20s" , "100ms" ).Should (BeTrue ())
984+ })
985+
986+ It ("should have correct offload policies configuration" , func () {
987+ topic := & v1alphav1.PulsarTopic {}
988+ tns := types.NamespacedName {Namespace : namespaceName , Name : offloadPoliciesTopicName }
989+ Expect (k8sClient .Get (ctx , tns , topic )).Should (Succeed ())
990+
991+ // Verify OffloadPolicies
992+ Expect (topic .Spec .OffloadPolicies ).ShouldNot (BeNil ())
993+ Expect (topic .Spec .OffloadPolicies .ManagedLedgerOffloadDriver ).Should (Equal ("aws-s3" ))
994+ Expect (topic .Spec .OffloadPolicies .ManagedLedgerOffloadMaxThreads ).Should (Equal (int32 (5 )))
995+ Expect (topic .Spec .OffloadPolicies .ManagedLedgerOffloadThresholdInBytes ).Should (Equal (int64 (1073741824 )))
996+ })
997+
998+ It ("should update offload policies successfully" , func () {
999+ topic := & v1alphav1.PulsarTopic {}
1000+ tns := types.NamespacedName {Namespace : namespaceName , Name : offloadPoliciesTopicName }
1001+ Expect (k8sClient .Get (ctx , tns , topic )).Should (Succeed ())
1002+
1003+ // Update offload policies
1004+ topic .Spec .OffloadPolicies .ManagedLedgerOffloadMaxThreads = 10
1005+ topic .Spec .OffloadPolicies .ManagedLedgerOffloadThresholdInBytes = 2147483648 // 2GB
1006+ err := k8sClient .Update (ctx , topic )
1007+ Expect (err ).Should (Succeed ())
1008+ })
1009+
1010+ It ("should be ready after update" , func () {
1011+ Eventually (func () bool {
1012+ t := & v1alphav1.PulsarTopic {}
1013+ tns := types.NamespacedName {Namespace : namespaceName , Name : offloadPoliciesTopicName }
1014+ Expect (k8sClient .Get (ctx , tns , t )).Should (Succeed ())
1015+ return v1alphav1 .IsPulsarResourceReady (t )
1016+ }, "20s" , "100ms" ).Should (BeTrue ())
1017+ })
1018+
1019+ AfterAll (func () {
1020+ if offloadPoliciesTopic != nil {
1021+ Eventually (func (g Gomega ) {
1022+ t := & v1alphav1.PulsarTopic {}
1023+ tns := types.NamespacedName {Namespace : namespaceName , Name : offloadPoliciesTopicName }
1024+ g .Expect (k8sClient .Get (ctx , tns , t )).Should (Succeed ())
1025+ g .Expect (k8sClient .Delete (ctx , t )).Should (Succeed ())
1026+ }).Should (Succeed ())
1027+ }
1028+ })
1029+ })
1030+
1031+ Context ("PulsarTopic Auto Subscription Creation" , Ordered , func () {
1032+ var (
1033+ autoSubscriptionTopic * v1alphav1.PulsarTopic
1034+ autoSubscriptionTopicName string = "test-auto-subscription-topic"
1035+ )
1036+
1037+ BeforeAll (func () {
1038+ autoSubscriptionTopic = utils .MakePulsarTopicWithAutoSubscriptionCreation (
1039+ namespaceName ,
1040+ autoSubscriptionTopicName ,
1041+ "persistent://public/default/auto-subscription-test" ,
1042+ pconnName ,
1043+ lifecyclePolicy ,
1044+ )
1045+ })
1046+
1047+ It ("should create topic with auto subscription creation successfully" , func () {
1048+ err := k8sClient .Create (ctx , autoSubscriptionTopic )
1049+ Expect (err == nil || apierrors .IsAlreadyExists (err )).Should (BeTrue ())
1050+ })
1051+
1052+ It ("should be ready" , func () {
1053+ Eventually (func () bool {
1054+ t := & v1alphav1.PulsarTopic {}
1055+ tns := types.NamespacedName {Namespace : namespaceName , Name : autoSubscriptionTopicName }
1056+ Expect (k8sClient .Get (ctx , tns , t )).Should (Succeed ())
1057+ return v1alphav1 .IsPulsarResourceReady (t )
1058+ }, "20s" , "100ms" ).Should (BeTrue ())
1059+ })
1060+
1061+ It ("should have correct auto subscription creation configuration" , func () {
1062+ topic := & v1alphav1.PulsarTopic {}
1063+ tns := types.NamespacedName {Namespace : namespaceName , Name : autoSubscriptionTopicName }
1064+ Expect (k8sClient .Get (ctx , tns , topic )).Should (Succeed ())
1065+
1066+ // Verify AutoSubscriptionCreation
1067+ Expect (topic .Spec .AutoSubscriptionCreation ).ShouldNot (BeNil ())
1068+ Expect (topic .Spec .AutoSubscriptionCreation .AllowAutoSubscriptionCreation ).Should (Equal (true ))
1069+ })
1070+
1071+ It ("should update auto subscription creation successfully" , func () {
1072+ topic := & v1alphav1.PulsarTopic {}
1073+ tns := types.NamespacedName {Namespace : namespaceName , Name : autoSubscriptionTopicName }
1074+ Expect (k8sClient .Get (ctx , tns , topic )).Should (Succeed ())
1075+
1076+ // Update auto subscription creation
1077+ topic .Spec .AutoSubscriptionCreation .AllowAutoSubscriptionCreation = false
1078+ err := k8sClient .Update (ctx , topic )
1079+ Expect (err ).Should (Succeed ())
1080+ })
1081+
1082+ It ("should be ready after update" , func () {
1083+ Eventually (func () bool {
1084+ t := & v1alphav1.PulsarTopic {}
1085+ tns := types.NamespacedName {Namespace : namespaceName , Name : autoSubscriptionTopicName }
1086+ Expect (k8sClient .Get (ctx , tns , t )).Should (Succeed ())
1087+ return v1alphav1 .IsPulsarResourceReady (t )
1088+ }, "20s" , "100ms" ).Should (BeTrue ())
1089+ })
1090+
1091+ AfterAll (func () {
1092+ if autoSubscriptionTopic != nil {
1093+ Eventually (func (g Gomega ) {
1094+ t := & v1alphav1.PulsarTopic {}
1095+ tns := types.NamespacedName {Namespace : namespaceName , Name : autoSubscriptionTopicName }
1096+ g .Expect (k8sClient .Get (ctx , tns , t )).Should (Succeed ())
1097+ g .Expect (k8sClient .Delete (ctx , t )).Should (Succeed ())
1098+ }).Should (Succeed ())
1099+ }
1100+ })
1101+ })
1102+
1103+ Context ("PulsarTopic Schema Compatibility Strategy" , Ordered , func () {
1104+ var (
1105+ schemaCompatibilityTopic * v1alphav1.PulsarTopic
1106+ schemaCompatibilityTopicName string = "test-schema-compatibility-topic"
1107+ )
1108+
1109+ BeforeAll (func () {
1110+ schemaCompatibilityTopic = utils .MakePulsarTopicWithSchemaCompatibilityStrategy (
1111+ namespaceName ,
1112+ schemaCompatibilityTopicName ,
1113+ "persistent://public/default/schema-compatibility-test" ,
1114+ pconnName ,
1115+ lifecyclePolicy ,
1116+ )
1117+ })
1118+
1119+ It ("should create topic with schema compatibility strategy successfully" , func () {
1120+ err := k8sClient .Create (ctx , schemaCompatibilityTopic )
1121+ Expect (err == nil || apierrors .IsAlreadyExists (err )).Should (BeTrue ())
1122+ })
1123+
1124+ It ("should be ready" , func () {
1125+ Eventually (func () bool {
1126+ t := & v1alphav1.PulsarTopic {}
1127+ tns := types.NamespacedName {Namespace : namespaceName , Name : schemaCompatibilityTopicName }
1128+ Expect (k8sClient .Get (ctx , tns , t )).Should (Succeed ())
1129+ return v1alphav1 .IsPulsarResourceReady (t )
1130+ }, "20s" , "100ms" ).Should (BeTrue ())
1131+ })
1132+
1133+ It ("should have correct schema compatibility strategy configuration" , func () {
1134+ topic := & v1alphav1.PulsarTopic {}
1135+ tns := types.NamespacedName {Namespace : namespaceName , Name : schemaCompatibilityTopicName }
1136+ Expect (k8sClient .Get (ctx , tns , topic )).Should (Succeed ())
1137+
1138+ // Verify SchemaCompatibilityStrategy
1139+ Expect (topic .Spec .SchemaCompatibilityStrategy ).ShouldNot (BeNil ())
1140+ Expect (string (* topic .Spec .SchemaCompatibilityStrategy )).Should (Equal ("BACKWARD" ))
1141+ })
1142+
1143+ It ("should update schema compatibility strategy successfully" , func () {
1144+ topic := & v1alphav1.PulsarTopic {}
1145+ tns := types.NamespacedName {Namespace : namespaceName , Name : schemaCompatibilityTopicName }
1146+ Expect (k8sClient .Get (ctx , tns , topic )).Should (Succeed ())
1147+
1148+ // Update schema compatibility strategy
1149+ newStrategy := adminutils .SchemaCompatibilityStrategy ("FORWARD" )
1150+ topic .Spec .SchemaCompatibilityStrategy = & newStrategy
1151+ err := k8sClient .Update (ctx , topic )
1152+ Expect (err ).Should (Succeed ())
1153+ })
1154+
1155+ It ("should be ready after update" , func () {
1156+ Eventually (func () bool {
1157+ t := & v1alphav1.PulsarTopic {}
1158+ tns := types.NamespacedName {Namespace : namespaceName , Name : schemaCompatibilityTopicName }
1159+ Expect (k8sClient .Get (ctx , tns , t )).Should (Succeed ())
1160+ return v1alphav1 .IsPulsarResourceReady (t )
1161+ }, "20s" , "100ms" ).Should (BeTrue ())
1162+ })
1163+
1164+ AfterAll (func () {
1165+ if schemaCompatibilityTopic != nil {
1166+ Eventually (func (g Gomega ) {
1167+ t := & v1alphav1.PulsarTopic {}
1168+ tns := types.NamespacedName {Namespace : namespaceName , Name : schemaCompatibilityTopicName }
1169+ g .Expect (k8sClient .Get (ctx , tns , t )).Should (Succeed ())
1170+ g .Expect (k8sClient .Delete (ctx , t )).Should (Succeed ())
1171+ }).Should (Succeed ())
1172+ }
1173+ })
1174+ })
1175+
8811176 Context ("PulsarNamespace Rate Limiting" , Ordered , func () {
8821177 var (
8831178 rateLimitingNamespace * v1alphav1.PulsarNamespace
0 commit comments