Skip to content

Commit 00b9b2f

Browse files
committed
Fix system test for rabbitmq 4
We were creating policies with mirroring keys, that are not valid in RabbitMQ 4. Some readiness steps were asserting on number of global parameters present, which has changed in RMQ 4 with the vhost tags. Now we assert on the presence on specific parameter.
1 parent 03b50cf commit 00b9b2f

File tree

6 files changed

+64
-53
lines changed

6 files changed

+64
-53
lines changed

system_tests/operatorpolicy_system_test.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,20 @@ var _ = Describe("OperatorPolicy", func() {
3838
Pattern: "test-queue",
3939
ApplyTo: "queues",
4040
Definition: &runtime.RawExtension{
41-
Raw: []byte(`{"ha-mode":"all"}`),
41+
Raw: []byte(`{"delivery-limit": 5}`),
4242
},
4343
},
4444
}
4545
})
4646

4747
AfterEach(func() {
48+
if CurrentSpecReport().Failed() {
49+
out, err := kubectl("logs", "-n", "rabbitmq-system", rmq.Name+"server-0")
50+
if err != nil {
51+
GinkgoWriter.Printf("error getting rabbitmq logs: %v\n", err)
52+
}
53+
GinkgoWriter.Printf("rabbitmq logs:\n%s\n", string(out))
54+
}
4855
_ = k8sClient.Delete(ctx, policy)
4956
})
5057

@@ -66,7 +73,7 @@ var _ = Describe("OperatorPolicy", func() {
6673
"Priority": Equal(0),
6774
}))
6875

69-
Expect(fetchedPolicy.Definition).To(HaveKeyWithValue("ha-mode", "all"))
76+
Expect(fetchedPolicy.Definition).To(HaveKeyWithValue("delivery-limit", BeEquivalentTo(5)))
7077

7178
By("updating status condition 'Ready'")
7279
updatedPolicy := topology.OperatorPolicy{}
@@ -97,9 +104,7 @@ var _ = Describe("OperatorPolicy", func() {
97104
By("updating operator policy definitions successfully")
98105
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: policy.Name, Namespace: policy.Namespace}, policy)).To(Succeed())
99106
policy.Spec.Definition = &runtime.RawExtension{
100-
Raw: []byte(`{"ha-mode":"exactly",
101-
"ha-params": 2
102-
}`)}
107+
Raw: []byte(`{"delivery-limit": 3, "expires": 1800}`)}
103108
Expect(k8sClient.Update(ctx, policy, &client.UpdateOptions{})).To(Succeed())
104109

105110
Eventually(func() rabbithole.PolicyDefinition {
@@ -109,8 +114,8 @@ var _ = Describe("OperatorPolicy", func() {
109114
return fetchedPolicy.Definition
110115
}, 10, 2).Should(HaveLen(2))
111116

112-
Expect(fetchedPolicy.Definition).To(HaveKeyWithValue("ha-mode", "exactly"))
113-
Expect(fetchedPolicy.Definition).To(HaveKeyWithValue("ha-params", float64(2)))
117+
Expect(fetchedPolicy.Definition).To(HaveKeyWithValue("delivery-limit", BeEquivalentTo(3)))
118+
Expect(fetchedPolicy.Definition).To(HaveKeyWithValue("expires", BeEquivalentTo(1800)))
114119

115120
By("deleting policy")
116121
Expect(k8sClient.Delete(ctx, policy)).To(Succeed())

system_tests/policy_system_test.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ var _ = Describe("Policy", func() {
3737
Pattern: "test-queue",
3838
ApplyTo: "queues",
3939
Definition: &runtime.RawExtension{
40-
Raw: []byte(`{"ha-mode":"all"}`),
40+
Raw: []byte(`{"delivery-limit": 5}`),
4141
},
4242
},
4343
}
@@ -65,7 +65,7 @@ var _ = Describe("Policy", func() {
6565
"Priority": Equal(0),
6666
}))
6767

68-
Expect(fetchedPolicy.Definition).To(HaveKeyWithValue("ha-mode", "all"))
68+
Expect(fetchedPolicy.Definition).To(HaveKeyWithValue("delivery-limit", BeEquivalentTo(5)))
6969

7070
By("updating status condition 'Ready'")
7171
updatedPolicy := topology.Policy{}
@@ -96,9 +96,7 @@ var _ = Describe("Policy", func() {
9696
By("updating policy definitions successfully")
9797
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: policy.Name, Namespace: policy.Namespace}, policy)).To(Succeed())
9898
policy.Spec.Definition = &runtime.RawExtension{
99-
Raw: []byte(`{"ha-mode":"exactly",
100-
"ha-params": 2
101-
}`)}
99+
Raw: []byte(`{"delivery-limit": 3, "expires": 180}`)}
102100
Expect(k8sClient.Update(ctx, policy, &client.UpdateOptions{})).To(Succeed())
103101

104102
Eventually(func() rabbithole.PolicyDefinition {
@@ -108,8 +106,8 @@ var _ = Describe("Policy", func() {
108106
return fetchedPolicy.Definition
109107
}, 10, 2).Should(HaveLen(2))
110108

111-
Expect(fetchedPolicy.Definition).To(HaveKeyWithValue("ha-mode", "exactly"))
112-
Expect(fetchedPolicy.Definition).To(HaveKeyWithValue("ha-params", float64(2)))
109+
Expect(fetchedPolicy.Definition).To(HaveKeyWithValue("delivery-limit", BeEquivalentTo(3)))
110+
Expect(fetchedPolicy.Definition).To(HaveKeyWithValue("expires", BeEquivalentTo(180)))
113111

114112
By("deleting policy")
115113
Expect(k8sClient.Delete(ctx, policy)).To(Succeed())

system_tests/schema_replication_system_test.go

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
88
"k8s.io/apimachinery/pkg/types"
99
"sigs.k8s.io/controller-runtime/pkg/client"
10+
"time"
1011

1112
. "github.com/onsi/ginkgo/v2"
1213
. "github.com/onsi/gomega"
@@ -57,39 +58,41 @@ var _ = Describe("schema replication", func() {
5758
})
5859

5960
It("works", func() {
61+
SetDefaultEventuallyPollingInterval(2 * time.Second)
62+
SetDefaultEventuallyTimeout(30 * time.Second)
63+
getRabbitGlobalParams := func() ([]rabbithole.GlobalRuntimeParameter, error) {
64+
return rabbitClient.ListGlobalParameters()
65+
}
66+
6067
By("setting schema replication upstream global parameters successfully")
6168
Expect(k8sClient.Create(ctx, replication, &client.CreateOptions{})).To(Succeed())
62-
var allGlobalParams []rabbithole.GlobalRuntimeParameter
63-
Eventually(func() []rabbithole.GlobalRuntimeParameter {
64-
var err error
65-
allGlobalParams, err = rabbitClient.ListGlobalParameters()
66-
Expect(err).NotTo(HaveOccurred())
67-
return allGlobalParams
68-
}, 30, 2).Should(HaveLen(3)) // cluster_name and internal_cluster_id are set by default by RabbitMQ
69-
70-
Expect(allGlobalParams).To(ContainElement(
71-
rabbithole.GlobalRuntimeParameter{
72-
Name: "schema_definition_sync_upstream",
73-
Value: map[string]interface{}{
74-
"endpoints": []interface{}{"abc.endpoints.local:5672", "efg.endpoints.local:1234"},
75-
"username": "some-username",
76-
"password": "some-password",
77-
},
78-
}))
69+
DeferCleanup(func() {
70+
// leaving a cleanup step in case the test fails, so that it does not leave behind resources
71+
// In the happy path, the schemareplication object is deleted, and the following command is a no-op
72+
_ = k8sClient.Delete(ctx, replication, &client.DeleteOptions{})
73+
})
74+
Eventually(getRabbitGlobalParams).Should(ContainElement(And(
75+
HaveField("Name", "schema_definition_sync_upstream"),
76+
HaveField("Value", And(
77+
HaveKeyWithValue("endpoints", ContainElements("abc.endpoints.local:5672", "efg.endpoints.local:1234")),
78+
HaveKeyWithValue("username", "some-username"),
79+
HaveKeyWithValue("password", "some-password"),
80+
)),
81+
)))
7982

8083
By("updating status condition 'Ready'")
8184
updatedReplication := topology.SchemaReplication{}
8285

8386
Eventually(func() []topology.Condition {
8487
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &updatedReplication)).To(Succeed())
8588
return updatedReplication.Status.Conditions
86-
}, waitUpdatedStatusCondition, 2).Should(HaveLen(1), "Schema Replication status condition should be present")
89+
}, waitUpdatedStatusCondition).Should(HaveLen(1), "Schema Replication status condition should be present")
8790

8891
readyCondition := updatedReplication.Status.Conditions[0]
8992
Expect(string(readyCondition.Type)).To(Equal("Ready"))
9093
Expect(readyCondition.Status).To(Equal(corev1.ConditionTrue))
9194
Expect(readyCondition.Reason).To(Equal("SuccessfulCreateOrUpdate"))
92-
Expect(readyCondition.LastTransitionTime).NotTo(Equal(metav1.Time{}))
95+
Expect(readyCondition.LastTransitionTime).NotTo(BeZero())
9396

9497
By("setting correct finalizer")
9598
Expect(updatedReplication.ObjectMeta.Finalizers).To(ConsistOf("deletion.finalizers.schemareplications.rabbitmq.com"))
@@ -105,11 +108,6 @@ var _ = Describe("schema replication", func() {
105108

106109
By("unsetting schema replication upstream global parameters on deletion")
107110
Expect(k8sClient.Delete(ctx, replication)).To(Succeed())
108-
Eventually(func() []rabbithole.GlobalRuntimeParameter {
109-
var err error
110-
allGlobalParams, err = rabbitClient.ListGlobalParameters()
111-
Expect(err).NotTo(HaveOccurred())
112-
return allGlobalParams
113-
}, 30, 2).Should(HaveLen(2)) // cluster_name and internal_cluster_id are set by default by RabbitMQ
111+
Eventually(getRabbitGlobalParams).ShouldNot(ContainElement(HaveField("Name", "schema_definition_sync_upstream")))
114112
})
115113
})

system_tests/tls_system_test.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package system_tests
33
import (
44
"context"
55
"fmt"
6+
"sigs.k8s.io/controller-runtime/pkg/envtest/komega"
7+
"time"
68

79
"github.com/google/uuid"
810
. "github.com/onsi/ginkgo/v2"
@@ -12,7 +14,6 @@ import (
1214
corev1 "k8s.io/api/core/v1"
1315
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1416
"k8s.io/apimachinery/pkg/runtime"
15-
"k8s.io/apimachinery/pkg/types"
1617
"sigs.k8s.io/controller-runtime/pkg/client"
1718
)
1819

@@ -119,6 +120,8 @@ var _ = Describe("RabbitMQ Cluster with TLS enabled", func() {
119120
})
120121

121122
It("works", func() {
123+
k := komega.New(k8sClient)
124+
122125
By("successfully creating object when rabbitmqClusterReference.name is set")
123126
policy = topology.Policy{
124127
ObjectMeta: metav1.ObjectMeta{
@@ -130,20 +133,19 @@ var _ = Describe("RabbitMQ Cluster with TLS enabled", func() {
130133
Pattern: ".*",
131134
ApplyTo: "queues",
132135
Definition: &runtime.RawExtension{
133-
Raw: []byte(`{"ha-mode":"all"}`),
136+
Raw: []byte(`{"max-length": 10000}`),
134137
},
135138
RabbitmqClusterReference: targetClusterRef,
136139
},
137140
}
138141
Expect(k8sClient.Create(ctx, &policy)).To(Succeed())
139142

140-
var fetchedPolicy topology.Policy
141-
Eventually(func() []topology.Condition {
142-
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: policy.Name, Namespace: policy.Namespace}, &fetchedPolicy)).To(Succeed())
143-
return fetchedPolicy.Status.Conditions
144-
}, waitUpdatedStatusCondition, 2).Should(HaveLen(1), "policy status condition should be present")
143+
Eventually(k.Object(&policy)).
144+
WithTimeout(waitUpdatedStatusCondition).
145+
WithPolling(2*time.Second).
146+
Should(HaveField("Status.Conditions", HaveLen(1)), "policy status condition should be present")
145147

146-
readyCondition := fetchedPolicy.Status.Conditions[0]
148+
readyCondition := policy.Status.Conditions[0]
147149
Expect(string(readyCondition.Type)).To(Equal("Ready"))
148150
Expect(readyCondition.Status).To(Equal(corev1.ConditionTrue))
149151
Expect(readyCondition.Reason).To(Equal("SuccessfulCreateOrUpdate"))
@@ -177,13 +179,12 @@ var _ = Describe("RabbitMQ Cluster with TLS enabled", func() {
177179
}
178180
Expect(k8sClient.Create(ctx, &exchange)).To(Succeed())
179181

180-
var fetched topology.Exchange
181-
Eventually(func() []topology.Condition {
182-
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: exchange.Name, Namespace: exchange.Namespace}, &fetched)).To(Succeed())
183-
return fetched.Status.Conditions
184-
}, 10, 2).Should(HaveLen(1))
182+
Eventually(k.Object(&exchange)).
183+
WithTimeout(waitUpdatedStatusCondition).
184+
WithPolling(2*time.Second).
185+
Should(HaveField("Status.Conditions", HaveLen(1)), "exchange status condition should be present")
185186

186-
readyCondition = fetched.Status.Conditions[0]
187+
readyCondition = exchange.Status.Conditions[0]
187188
Expect(string(readyCondition.Type)).To(Equal("Ready"))
188189
Expect(readyCondition.Status).To(Equal(corev1.ConditionTrue))
189190
Expect(readyCondition.Reason).To(Equal("SuccessfulCreateOrUpdate"))

system_tests/utils.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ func basicTestRabbitmqCluster(name, namespace string) *rabbitmqv1beta1.RabbitmqC
195195
},
196196
Rabbitmq: rabbitmqv1beta1.RabbitmqClusterConfigurationSpec{
197197
AdditionalPlugins: []rabbitmqv1beta1.Plugin{"rabbitmq_federation", "rabbitmq_shovel", "rabbitmq_stream"},
198+
AdditionalConfig: "log.console.level = debug",
198199
},
199200
},
200201
}

system_tests/vhost_system_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,10 +178,18 @@ var _ = Describe("vhost", func() {
178178
})
179179

180180
AfterEach(func() {
181+
if CurrentSpecReport().Failed() {
182+
out, err := kubectl("logs", "-n", "rabbitmq-system", rmq.Name+"server-0")
183+
if err != nil {
184+
GinkgoWriter.Printf("error getting rabbitmq logs: %v\n", err)
185+
}
186+
GinkgoWriter.Printf("rabbitmq logs:\n%s\n", string(out))
187+
}
181188
Expect(k8sClient.Delete(ctx, vhostWithLimits)).To(Succeed())
182189
})
183190

184191
It("configures the limits", func() {
192+
// TODO: this test seems flaky, perhaps it's racy to create the vhost, get the vhost and set/get vhost limits
185193
var err error
186194
var vhostLimitsInfoResponse []rabbithole.VhostLimitsInfo
187195
Eventually(func() error {

0 commit comments

Comments
 (0)