@@ -1710,6 +1710,59 @@ var _ = Describe("Resources", func() {
17101710 Expect (string (* ns .Spec .SubscriptionExpirationTime )).Should (Equal ("7d" ))
17111711 })
17121712
1713+ It ("should reflect subscription expiration time in broker" , func () {
1714+ podName := fmt .Sprintf ("%s-broker-0" , brokerName )
1715+ containerName := fmt .Sprintf ("%s-broker" , brokerName )
1716+ Eventually (func (g Gomega ) {
1717+ stdout , _ , err := utils .ExecInPod (k8sConfig , namespaceName , podName , containerName ,
1718+ "./bin/pulsar-admin namespaces get-subscription-expiration-time " + storagePoliciesPulsarNSName )
1719+ g .Expect (err ).Should (Succeed ())
1720+ // 7d -> 10080 minutes
1721+ g .Expect (stdout ).Should (ContainSubstring ("10080" ))
1722+ }, "30s" , "200ms" ).Should (Succeed ())
1723+ })
1724+
1725+ It ("should update subscription expiration time to infinite" , func () {
1726+ ns := & v1alphav1.PulsarNamespace {}
1727+ tns := types.NamespacedName {Namespace : namespaceName , Name : storagePoliciesNamespaceName }
1728+ Expect (k8sClient .Get (ctx , tns , ns )).Should (Succeed ())
1729+
1730+ var infiniteExp rutils.Duration = "-1"
1731+ ns .Spec .SubscriptionExpirationTime = & infiniteExp
1732+
1733+ err := k8sClient .Update (ctx , ns )
1734+ Expect (err ).Should (Succeed ())
1735+ })
1736+
1737+ It ("should be ready after subscription expiration update" , func () {
1738+ Eventually (func () bool {
1739+ ns := & v1alphav1.PulsarNamespace {}
1740+ tns := types.NamespacedName {Namespace : namespaceName , Name : storagePoliciesNamespaceName }
1741+ Expect (k8sClient .Get (ctx , tns , ns )).Should (Succeed ())
1742+ return v1alphav1 .IsPulsarResourceReady (ns )
1743+ }, "30s" , "100ms" ).Should (BeTrue ())
1744+ })
1745+
1746+ It ("should reflect updated subscription expiration time" , func () {
1747+ ns := & v1alphav1.PulsarNamespace {}
1748+ tns := types.NamespacedName {Namespace : namespaceName , Name : storagePoliciesNamespaceName }
1749+ Expect (k8sClient .Get (ctx , tns , ns )).Should (Succeed ())
1750+
1751+ Expect (ns .Spec .SubscriptionExpirationTime ).ShouldNot (BeNil ())
1752+ Expect (string (* ns .Spec .SubscriptionExpirationTime )).Should (Equal ("-1" ))
1753+ })
1754+
1755+ It ("should reflect updated subscription expiration time in broker" , func () {
1756+ podName := fmt .Sprintf ("%s-broker-0" , brokerName )
1757+ containerName := fmt .Sprintf ("%s-broker" , brokerName )
1758+ Eventually (func (g Gomega ) {
1759+ stdout , _ , err := utils .ExecInPod (k8sConfig , namespaceName , podName , containerName ,
1760+ "./bin/pulsar-admin namespaces get-subscription-expiration-time " + storagePoliciesPulsarNSName )
1761+ g .Expect (err ).Should (Succeed ())
1762+ g .Expect (stdout ).Should (ContainSubstring ("-1" ))
1763+ }, "30s" , "200ms" ).Should (Succeed ())
1764+ })
1765+
17131766 It ("should have correct custom properties" , func () {
17141767 ns := & v1alphav1.PulsarNamespace {}
17151768 tns := types.NamespacedName {Namespace : namespaceName , Name : storagePoliciesNamespaceName }
@@ -1721,6 +1774,56 @@ var _ = Describe("Resources", func() {
17211774 Expect (ns .Spec .Properties ["team" ]).Should (Equal ("qa" ))
17221775 })
17231776
1777+ It ("should reconcile when backlog quota retention policy is unset" , func () {
1778+ ns := & v1alphav1.PulsarNamespace {}
1779+ tns := types.NamespacedName {Namespace : namespaceName , Name : storagePoliciesNamespaceName }
1780+ Expect (k8sClient .Get (ctx , tns , ns )).Should (Succeed ())
1781+
1782+ ns .Spec .BacklogQuotaRetentionPolicy = nil
1783+
1784+ err := k8sClient .Update (ctx , ns )
1785+ Expect (err ).Should (Succeed ())
1786+ })
1787+
1788+ It ("should be ready after removing backlog quota retention policy" , func () {
1789+ Eventually (func () bool {
1790+ ns := & v1alphav1.PulsarNamespace {}
1791+ tns := types.NamespacedName {Namespace : namespaceName , Name : storagePoliciesNamespaceName }
1792+ Expect (k8sClient .Get (ctx , tns , ns )).Should (Succeed ())
1793+ return v1alphav1 .IsPulsarResourceReady (ns )
1794+ }, "30s" , "100ms" ).Should (BeTrue ())
1795+ })
1796+
1797+ It ("should allow clearing inactive topic policies and properties" , func () {
1798+ ns := & v1alphav1.PulsarNamespace {}
1799+ tns := types.NamespacedName {Namespace : namespaceName , Name : storagePoliciesNamespaceName }
1800+ Expect (k8sClient .Get (ctx , tns , ns )).Should (Succeed ())
1801+
1802+ ns .Spec .InactiveTopicPolicies = nil
1803+ ns .Spec .Properties = nil
1804+
1805+ err := k8sClient .Update (ctx , ns )
1806+ Expect (err ).Should (Succeed ())
1807+ })
1808+
1809+ It ("should be ready after clearing inactive topic policies and properties" , func () {
1810+ Eventually (func () bool {
1811+ ns := & v1alphav1.PulsarNamespace {}
1812+ tns := types.NamespacedName {Namespace : namespaceName , Name : storagePoliciesNamespaceName }
1813+ Expect (k8sClient .Get (ctx , tns , ns )).Should (Succeed ())
1814+ return v1alphav1 .IsPulsarResourceReady (ns )
1815+ }, "30s" , "100ms" ).Should (BeTrue ())
1816+ })
1817+
1818+ It ("should reflect cleared inactive topic policies and properties" , func () {
1819+ ns := & v1alphav1.PulsarNamespace {}
1820+ tns := types.NamespacedName {Namespace : namespaceName , Name : storagePoliciesNamespaceName }
1821+ Expect (k8sClient .Get (ctx , tns , ns )).Should (Succeed ())
1822+
1823+ Expect (ns .Spec .InactiveTopicPolicies ).Should (BeNil ())
1824+ Expect (ns .Spec .Properties ).Should (BeNil ())
1825+ })
1826+
17241827 It ("should update persistence policies successfully" , func () {
17251828 ns := & v1alphav1.PulsarNamespace {}
17261829 tns := types.NamespacedName {Namespace : namespaceName , Name : storagePoliciesNamespaceName }
0 commit comments