Skip to content

Commit c44b8c9

Browse files
committed
Fix
1 parent a43f36e commit c44b8c9

File tree

3 files changed

+23
-12
lines changed

3 files changed

+23
-12
lines changed

pkg/googlecloud/publisher.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package googlecloud
22

33
import (
44
"context"
5+
"fmt"
56
"sync"
67
"time"
78

@@ -233,7 +234,7 @@ func (p *Publisher) publisher(ctx context.Context, topic string) (pub *pubsub.Pu
233234
return pub, nil
234235
}
235236

236-
exists, err := topicExists(ctx, p.client, topic)
237+
exists, err := topicExists(ctx, p.client, p.config.ProjectID, topic)
237238
if err != nil {
238239
return nil, errors.Wrapf(err, "could not check if topic %s exists", topic)
239240
}
@@ -247,7 +248,7 @@ func (p *Publisher) publisher(ctx context.Context, topic string) (pub *pubsub.Pu
247248
}
248249

249250
_, err = p.client.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{
250-
Name: topic,
251+
Name: fullyQualifiedTopicName(p.config.ProjectID, topic),
251252
})
252253
if err != nil {
253254
return nil, errors.Wrapf(err, "could not create topic %s", topic)
@@ -256,9 +257,9 @@ func (p *Publisher) publisher(ctx context.Context, topic string) (pub *pubsub.Pu
256257
return pub, nil
257258
}
258259

259-
func topicExists(ctx context.Context, client *pubsub.Client, topic string) (bool, error) {
260+
func topicExists(ctx context.Context, client *pubsub.Client, projectID string, topic string) (bool, error) {
260261
_, err := client.TopicAdminClient.GetTopic(ctx, &pubsubpb.GetTopicRequest{
261-
Topic: topic,
262+
Topic: fullyQualifiedTopicName(projectID, topic),
262263
})
263264
if err != nil {
264265
if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
@@ -268,3 +269,7 @@ func topicExists(ctx context.Context, client *pubsub.Client, topic string) (bool
268269
}
269270
return true, nil
270271
}
272+
273+
func fullyQualifiedTopicName(projectID string, topic string) string {
274+
return fmt.Sprintf("projects/%s/topics/%s", projectID, topic)
275+
}

pkg/googlecloud/pubsub_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,13 +136,15 @@ func TestSubscriberAllowedWhenAttachedToAnotherTopic(t *testing.T) {
136136
}
137137

138138
sub1, err := googlecloud.NewSubscriber(googlecloud.SubscriberConfig{
139+
ProjectID: "tests",
139140
GenerateSubscriptionName: subNameFn,
140141
}, logger)
141142
require.NoError(t, err)
142143

143144
topic1 := fmt.Sprintf("topic1_%d", testNumber)
144145

145146
sub2, err := googlecloud.NewSubscriber(googlecloud.SubscriberConfig{
147+
ProjectID: "tests",
146148
GenerateSubscriptionName: subNameFn,
147149
DoNotEnforceSubscriptionAttachedToTopic: true,
148150
}, logger)

pkg/googlecloud/subscriber.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ func (s *Subscriber) subscription(ctx context.Context, subscriptionName, topicNa
384384
sub = s.client.Subscriber(subscriptionName)
385385

386386
subResp, err := s.client.SubscriptionAdminClient.GetSubscription(ctx, &pubsubpb.GetSubscriptionRequest{
387-
Subscription: subscriptionName,
387+
Subscription: fullyQualifiedSubscriptionName(s.config.ProjectID, subscriptionName),
388388
})
389389
if err == nil {
390390
return s.existingSubscriber(sub, subResp, topicName)
@@ -400,7 +400,7 @@ func (s *Subscriber) subscription(ctx context.Context, subscriptionName, topicNa
400400
return nil, errors.Wrap(ErrSubscriptionDoesNotExist, subscriptionName)
401401
}
402402

403-
tExists, err := topicExists(ctx, s.client, topicName)
403+
tExists, err := topicExists(ctx, s.client, s.config.ProjectID, topicName)
404404
if err != nil {
405405
return nil, errors.Wrapf(err, "could not check if topic %s exists", topicName)
406406
}
@@ -411,7 +411,7 @@ func (s *Subscriber) subscription(ctx context.Context, subscriptionName, topicNa
411411

412412
if !tExists {
413413
_, err = s.client.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{
414-
Name: topicName,
414+
Name: fullyQualifiedTopicName(s.config.topicProjectID(), topicName),
415415
})
416416

417417
if status.Code(err) == codes.AlreadyExists {
@@ -422,8 +422,8 @@ func (s *Subscriber) subscription(ctx context.Context, subscriptionName, topicNa
422422
}
423423

424424
subConfig := s.config.GenerateSubscription(GenerateSubscriptionParams{})
425-
subConfig.Name = subscriptionName
426-
subConfig.Topic = topicName
425+
subConfig.Name = fullyQualifiedSubscriptionName(s.config.ProjectID, subscriptionName)
426+
subConfig.Topic = fullyQualifiedTopicName(s.config.topicProjectID(), topicName)
427427

428428
_, err = s.client.SubscriptionAdminClient.CreateSubscription(ctx, subConfig)
429429
if status.Code(err) == codes.AlreadyExists {
@@ -462,12 +462,12 @@ func (s *Subscriber) existingSubscriber(sub *pubsub.Subscriber, subscription *pu
462462
return sub, nil
463463
}
464464

465-
fullyQualifiedTopicName := fmt.Sprintf("projects/%s/topics/%s", s.config.topicProjectID(), topic)
465+
fullName := fullyQualifiedTopicName(s.config.topicProjectID(), topic)
466466

467-
if subscription.Topic != fullyQualifiedTopicName {
467+
if subscription.Topic != fullName {
468468
return nil, errors.Wrap(
469469
ErrUnexpectedTopic,
470-
fmt.Sprintf("topic of existing sub: %s; expecting: %s", subscription.Topic, fullyQualifiedTopicName),
470+
fmt.Sprintf("topic of existing sub: %s; expecting: %s", subscription.Topic, fullName),
471471
)
472472
}
473473

@@ -578,3 +578,7 @@ func subscriptionFromSubscriptionConfig(cfg pubsubv1.SubscriptionConfig) *pubsub
578578
MessageTransforms: messageTransforms,
579579
}
580580
}
581+
582+
func fullyQualifiedSubscriptionName(projectID, subscriptionName string) string {
583+
return fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subscriptionName)
584+
}

0 commit comments

Comments
 (0)