Skip to content

Commit e5b897d

Browse files
authored
fix topic not found when namespace non exists (#131)
* fix topic not found namespace * fix ci * fix ci * fix ci * fix ci * fix ci * fix ci * fix ci * fix * fix ci * fix ci
1 parent fccccad commit e5b897d

7 files changed

+171
-15
lines changed

pulsar/resource_pulsar_cluster_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ func TestHandleExistingCluster(t *testing.T) {
5656
PreCheck: func() {
5757
testAccPreCheck(t)
5858
createCluster(t, cName)
59+
t.Cleanup(func() {
60+
if err := getClientFromMeta(testAccProvider.Meta()).Clusters().Delete(cName); err != nil {
61+
t.Fatalf("ERROR_DELETING_TEST_CLUSTER: %v", err)
62+
}
63+
})
5964
},
6065
CheckDestroy: testPulsarClusterDestroy,
6166
ProviderFactories: testAccProviderFactories,
@@ -75,6 +80,11 @@ func TestImportExistingCluster(t *testing.T) {
7580
PreCheck: func() {
7681
testAccPreCheck(t)
7782
createCluster(t, cName)
83+
t.Cleanup(func() {
84+
if err := getClientFromMeta(testAccProvider.Meta()).Clusters().Delete(cName); err != nil {
85+
t.Fatalf("ERROR_DELETING_TEST_CLUSTER: %v", err)
86+
}
87+
})
7888
},
7989
CheckDestroy: testPulsarClusterDestroy,
8090
ProviderFactories: testAccProviderFactories,

pulsar/resource_pulsar_namespace_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,11 @@ func TestImportExistingNamespace(t *testing.T) {
332332
PreCheck: func() {
333333
testAccPreCheck(t)
334334
createNamespace(t, id)
335+
t.Cleanup(func() {
336+
if err := getClientFromMeta(testAccProvider.Meta()).Namespaces().DeleteNamespace(id); err != nil {
337+
t.Fatalf("ERROR_DELETING_TEST_NS: %v", err)
338+
}
339+
})
335340
},
336341
CheckDestroy: testPulsarNamespaceDestroy,
337342
ProviderFactories: testAccProviderFactories,

pulsar/resource_pulsar_sink_test.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,24 @@ func testPulsarSinkDestroy(s *terraform.State) error {
115115

116116
func TestImportExistingSink(t *testing.T) {
117117
sinkName := acctest.RandString(6)
118-
err := createSampleSink(sinkName)
119-
if err != nil {
120-
t.Fatal(err)
121-
}
122118

123119
resource.Test(t, resource.TestCase{
120+
PreCheck: func() {
121+
testAccPreCheck(t)
122+
createSampleSink(sinkName)
123+
t.Cleanup(func() {
124+
if err := getClientFromMeta(testAccProvider.Meta()).Sinks().DeleteSink(
125+
"public",
126+
"default",
127+
sinkName,
128+
); err != nil {
129+
if cliErr, ok := err.(rest.Error); ok && cliErr.Code == 404 {
130+
return
131+
}
132+
t.Fatalf("ERROR_DELETING_TEST_SINK: %v", err)
133+
}
134+
})
135+
},
124136
ProviderFactories: testAccProviderFactories,
125137
CheckDestroy: testPulsarSinkDestroy,
126138
Steps: []resource.TestStep{

pulsar/resource_pulsar_source_test.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,23 @@ func getPulsarSourceByResourceID(id string) (*utils.SourceConfig, error) {
133133

134134
func TestImportExistingSource(t *testing.T) {
135135
sourceName := acctest.RandString(6)
136-
err := createSampleSource(sourceName)
137-
if err != nil {
138-
t.Fatal(err)
139-
}
140136

141137
resource.Test(t, resource.TestCase{
138+
PreCheck: func() {
139+
testAccPreCheck(t)
140+
createSampleSource(sourceName)
141+
t.Cleanup(func() {
142+
if err := getClientFromMeta(testAccProvider.Meta()).Sources().DeleteSource(
143+
"public",
144+
"default",
145+
sourceName); err != nil {
146+
if cliErr, ok := err.(rest.Error); ok && cliErr.Code == 404 {
147+
return
148+
}
149+
t.Fatalf("ERROR_DELETING_TEST_SOURCE: %v", err)
150+
}
151+
})
152+
},
142153
ProviderFactories: testAccProviderFactories,
143154
CheckDestroy: testPulsarSourceDestroy,
144155
Steps: []resource.TestStep{

pulsar/resource_pulsar_tenant_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ func TestHandleExistingTenant(t *testing.T) {
5757
PreCheck: func() {
5858
testAccPreCheck(t)
5959
createTenant(t, tName)
60+
t.Cleanup(func() {
61+
if err := getClientFromMeta(testAccProvider.Meta()).Tenants().Delete(tName); err != nil {
62+
t.Fatalf("ERROR_DELETING_TEST_TENANT: %v", err)
63+
}
64+
})
6065
},
6166
ProviderFactories: testAccProviderFactories,
6267
PreventPostDestroyRefresh: false,
@@ -77,6 +82,11 @@ func TestImportExistingTenant(t *testing.T) {
7782
PreCheck: func() {
7883
testAccPreCheck(t)
7984
createTenant(t, tName)
85+
t.Cleanup(func() {
86+
if err := getClientFromMeta(testAccProvider.Meta()).Tenants().Delete(tName); err != nil {
87+
t.Fatalf("ERROR_DELETING_TEST_TENANT: %v", err)
88+
}
89+
})
8090
},
8191
CheckDestroy: testPulsarTenantDestroy,
8292
ProviderFactories: testAccProviderFactories,

pulsar/resource_pulsar_topic.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"context"
2222
"fmt"
2323

24+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
2425
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
2526
"github.com/cenkalti/backoff/v4"
2627
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
@@ -162,6 +163,10 @@ func resourcePulsarTopicRead(ctx context.Context, d *schema.ResourceData, meta i
162163

163164
topicName, found, err := getTopic(d, meta)
164165
if err != nil {
166+
if cliErr, ok := err.(rest.Error); ok && cliErr.Code == 404 {
167+
d.SetId("")
168+
return nil
169+
}
165170
return diag.Errorf("%v", err)
166171
}
167172
if !found {

pulsar/resource_pulsar_topic_test.go

Lines changed: 110 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,15 @@ func TestTopic(t *testing.T) {
4444
{
4545
Config: testPulsarPartitionTopic,
4646
Check: resource.ComposeTestCheckFunc(
47-
testPulsarTopicExists("pulsar_topic.sample-topic-1"),
48-
testPulsarTopicExists("pulsar_topic.sample-topic-2"),
47+
testPulsarTopicExists("pulsar_topic.sample-topic-1", t),
48+
testPulsarTopicExists("pulsar_topic.sample-topic-2", t),
4949
),
5050
},
5151
{
5252
Config: testPulsarNonPartitionTopic,
5353
Check: resource.ComposeTestCheckFunc(
54-
testPulsarTopicExists("pulsar_topic.sample-topic-3"),
55-
testPulsarTopicExists("pulsar_topic.sample-topic-4"),
54+
testPulsarTopicExists("pulsar_topic.sample-topic-3", t),
55+
testPulsarTopicExists("pulsar_topic.sample-topic-4", t),
5656
),
5757
},
5858
},
@@ -65,11 +65,20 @@ func TestImportExistingTopic(t *testing.T) {
6565
pnum := 10
6666

6767
fullID := strings.Join([]string{ttype + ":/", "public", "default", tname}, "/")
68+
topicName, err := utils.GetTopicName(fullID)
69+
if err != nil {
70+
t.Fatalf("ERROR_GETTING_TOPIC_NAME: %v", err)
71+
}
6872

6973
resource.Test(t, resource.TestCase{
7074
PreCheck: func() {
7175
testAccPreCheck(t)
7276
createTopic(t, fullID, pnum)
77+
t.Cleanup(func() {
78+
if err := getClientFromMeta(testAccProvider.Meta()).Topics().Delete(*topicName, true, pnum == 0); err != nil {
79+
t.Fatalf("ERROR_DELETING_TEST_TOPIC: %v", err)
80+
}
81+
})
7382
},
7483
ProviderFactories: testAccProviderFactories,
7584
CheckDestroy: testPulsarTopicDestroy,
@@ -93,6 +102,61 @@ func TestPartionedTopicWithPermissionGrantUpdate(t *testing.T) {
93102
testTopicWithPermissionGrantUpdate(t, 10)
94103
}
95104

105+
func TestTopicNamespaceExternallyRemoved(t *testing.T) {
106+
107+
resourceName := "pulsar_topic.test"
108+
tName := acctest.RandString(10)
109+
nsName := acctest.RandString(10)
110+
topicName := acctest.RandString(10)
111+
112+
resource.Test(t, resource.TestCase{
113+
PreCheck: func() { testAccPreCheck(t) },
114+
ProviderFactories: testAccProviderFactories,
115+
CheckDestroy: testPulsarTopicDestroy,
116+
Steps: []resource.TestStep{
117+
{
118+
Config: testPulsarNamespaceWithTopic(testWebServiceURL, tName, nsName, topicName),
119+
Check: resource.ComposeTestCheckFunc(
120+
testPulsarTopicExists(resourceName, t),
121+
),
122+
ExpectError: nil,
123+
},
124+
{
125+
PreConfig: func() {
126+
client := getClientFromMeta(testAccProvider.Meta())
127+
topicName, err := utils.GetTopicName(fmt.Sprintf("persistent://%s/%s/%s", tName, nsName, topicName))
128+
if err != nil {
129+
t.Fatalf("ERROR_GETTING_TOPIC_NAME: %v", err)
130+
}
131+
namespace, err := utils.GetNameSpaceName(topicName.GetTenant(), topicName.GetNamespace())
132+
if err != nil {
133+
t.Fatalf("ERROR_READ_NAMESPACE: %v", err)
134+
}
135+
partitionedTopics, nonPartitionedTopics, err := client.Topics().List(*namespace)
136+
if err != nil {
137+
t.Fatalf("ERROR_READ_TOPIC_DATA: %v", err)
138+
}
139+
140+
for _, topic := range append(partitionedTopics, nonPartitionedTopics...) {
141+
if topicName.String() == topic {
142+
if err = client.Topics().Delete(*topicName, true, true); err != nil {
143+
t.Fatalf("ERROR_DELETING_TEST_TOPIC: %v", err)
144+
}
145+
}
146+
}
147+
if err = client.Namespaces().DeleteNamespace(tName + "/" + nsName); err != nil {
148+
t.Fatalf("ERROR_DELETING_TEST_NS: %v", err)
149+
}
150+
},
151+
Config: testPulsarNamespaceWithTopic(testWebServiceURL, tName, nsName, topicName),
152+
PlanOnly: true,
153+
ExpectNonEmptyPlan: true,
154+
ExpectError: nil,
155+
},
156+
},
157+
})
158+
}
159+
96160
func testTopicWithPermissionGrantUpdate(t *testing.T, pnum int) {
97161
resourceName := "pulsar_topic.test"
98162
tname := acctest.RandString(10)
@@ -114,7 +178,7 @@ func testTopicWithPermissionGrantUpdate(t *testing.T, pnum int) {
114178
actions = ["produce", "consume"]
115179
}`),
116180
Check: resource.ComposeTestCheckFunc(
117-
testPulsarTopicExists(resourceName),
181+
testPulsarTopicExists(resourceName, t),
118182
resource.TestCheckResourceAttr(resourceName, "permission_grant.#", "2"),
119183
resource.TestCheckResourceAttr(resourceName, "permission_grant.0.role", "some-role-1"),
120184
resource.TestCheckResourceAttr(resourceName, "permission_grant.0.actions.#", "3"),
@@ -134,7 +198,7 @@ func testTopicWithPermissionGrantUpdate(t *testing.T, pnum int) {
134198
actions = ["produce"]
135199
}`),
136200
Check: resource.ComposeTestCheckFunc(
137-
testPulsarTopicExists(resourceName),
201+
testPulsarTopicExists(resourceName, t),
138202
resource.TestCheckResourceAttr(resourceName, "permission_grant.#", "1"),
139203
resource.TestCheckResourceAttr(resourceName, "permission_grant.0.role", "some-role-2"),
140204
resource.TestCheckResourceAttr(resourceName, "permission_grant.0.actions.#", "1"),
@@ -177,7 +241,7 @@ func testPulsarTopicDestroy(s *terraform.State) error {
177241
return nil
178242
}
179243

180-
func testPulsarTopicExists(topic string) resource.TestCheckFunc {
244+
func testPulsarTopicExists(topic string, t *testing.T) resource.TestCheckFunc {
181245
return func(s *terraform.State) error {
182246
rs, ok := s.RootModule().Resources[topic]
183247
if !ok {
@@ -188,6 +252,7 @@ func testPulsarTopicExists(topic string) resource.TestCheckFunc {
188252
if err != nil {
189253
return fmt.Errorf("ERROR_READ_TOPIC: %w", err)
190254
}
255+
t.Logf("topicName: %v", topicName)
191256
namespace, err := utils.GetNameSpaceName(topicName.GetTenant(), topicName.GetNamespace())
192257
if err != nil {
193258
return fmt.Errorf("ERROR_READ_NAMESPACE: %w", err)
@@ -338,3 +403,41 @@ resource "pulsar_topic" "test" {
338403
}
339404
`, url, ttype, tname, pnum, permissionGrants)
340405
}
406+
407+
func testPulsarNamespaceWithTopic(wsURL, tenant, ns, topicName string) string {
408+
return fmt.Sprintf(`
409+
provider "pulsar" {
410+
web_service_url = "%s"
411+
}
412+
413+
resource "pulsar_tenant" "test_tenant" {
414+
tenant = "%s"
415+
allowed_clusters = ["standalone"]
416+
}
417+
418+
resource "pulsar_namespace" "test" {
419+
tenant = pulsar_tenant.test_tenant.tenant
420+
namespace = "%s"
421+
422+
topic_auto_creation {
423+
enable = false
424+
}
425+
426+
depends_on = [
427+
pulsar_tenant.test_tenant
428+
]
429+
}
430+
431+
resource "pulsar_topic" "test" {
432+
tenant = "%s"
433+
namespace = "%s"
434+
topic_type = "persistent"
435+
topic_name = "%s"
436+
partitions = 0
437+
438+
depends_on = [
439+
pulsar_namespace.test
440+
]
441+
}
442+
`, wsURL, tenant, ns, tenant, ns, topicName)
443+
}

0 commit comments

Comments
 (0)