Skip to content

Commit 849f139

Browse files
mikeeeyaron2nelson-parentecicoyle
authored
fix(tests): refactor gcp pubsub ctx propagation (#3822)
Signed-off-by: Mike Nguyen <[email protected]> Co-authored-by: Yaron Schneider <[email protected]> Co-authored-by: Nelson Parente <[email protected]> Co-authored-by: Cassie Coyle <[email protected]>
1 parent f1e67a0 commit 849f139

File tree

2 files changed

+20
-23
lines changed

2 files changed

+20
-23
lines changed

tests/certification/pubsub/gcp/pubsub/gcppubsub_helper.go

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,18 @@ type DataMessage struct {
3939

4040
type MessageFunc func(*DataMessage) error
4141

42-
func NewTopicManager(projectID string) (*topicManager, error) {
42+
func NewTopicManager(ctx context.Context, projectID string) (*topicManager, error) {
4343
tpm := &topicManager{
4444
projectID: projectID,
4545
}
46-
err := tpm.connect()
46+
err := tpm.connect(ctx)
4747
if err != nil {
4848
return nil, err
4949
}
5050
return tpm, nil
5151
}
5252

53-
func (tm *topicManager) connect() error {
54-
ctx := t.Context()
53+
func (tm *topicManager) connect(ctx context.Context) error {
5554
client, err := pubsub.NewClient(ctx, tm.projectID)
5655
if err != nil {
5756
return fmt.Errorf("GCP pubsub.NewClient failed to connect: %v", err)
@@ -65,9 +64,8 @@ func (tm *topicManager) disconnect() error {
6564
return tm.gcpClient.Close()
6665
}
6766

68-
func (tp *topicManager) GetMessages(topicID string, msgTimeout time.Duration, subID string, fn MessageFunc) (int, error) {
69-
ctx := t.Context()
70-
67+
func (tp *topicManager) GetMessages(ctx context.Context, topicID string, msgTimeout time.Duration, subID string,
68+
fn MessageFunc) (int, error) {
7169
topic := tp.gcpClient.Topic(topicID)
7270
cfg := &pubsub.SubscriptionConfig{
7371
Topic: topic,
@@ -133,24 +131,23 @@ func getOrCreateSub(ctx context.Context, client *pubsub.Client, subID string, cf
133131
return sub, nil
134132
}
135133

136-
func deleteSubscriptions(projectID string, subs []string) error {
134+
func deleteSubscriptions(ctx context.Context, projectID string, subs []string) error {
137135
for _, s := range subs {
138136
fmt.Printf("Deleting subscription: %s\n", s)
139-
deleteSubscription(os.Stdout, projectID, s)
137+
deleteSubscription(ctx, os.Stdout, projectID, s)
140138
}
141139
return nil
142140
}
143141

144-
func deleteTopics(projectID string, topics []string) error {
142+
func deleteTopics(ctx context.Context, projectID string, topics []string) error {
145143
for _, t := range topics {
146144
fmt.Printf("Deleting topics: %s\n", t)
147-
deleteTopic(os.Stdout, projectID, t)
145+
deleteTopic(ctx, os.Stdout, projectID, t)
148146
}
149147
return nil
150148
}
151149

152-
func deleteSubscription(w io.Writer, projectID, subID string) error {
153-
ctx := t.Context()
150+
func deleteSubscription(ctx context.Context, w io.Writer, projectID, subID string) error {
154151
client, err := pubsub.NewClient(ctx, projectID)
155152
if err != nil {
156153
return fmt.Errorf("pubsub.NewClient: %v", err)
@@ -165,8 +162,7 @@ func deleteSubscription(w io.Writer, projectID, subID string) error {
165162
return nil
166163
}
167164

168-
func deleteTopic(w io.Writer, projectID, topicID string) error {
169-
ctx := t.Context()
165+
func deleteTopic(ctx context.Context, w io.Writer, projectID, topicID string) error {
170166
client, err := pubsub.NewClient(ctx, projectID)
171167
if err != nil {
172168
return fmt.Errorf("pubsub.NewClient: %v", err)

tests/certification/pubsub/gcp/pubsub/gcppubsub_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ func GCPPubSubMessageDeadLetter(t *testing.T) {
457457
t := time.NewTicker(500 * time.Millisecond)
458458
defer t.Stop()
459459
counter := 1
460-
tm, err := NewTopicManager(projectID)
460+
tm, err := NewTopicManager(ctx, projectID)
461461
if err != nil {
462462
return fmt.Errorf("deadLetterReceiverApplication - NewTopicManager: %v", err)
463463
}
@@ -474,11 +474,12 @@ func GCPPubSubMessageDeadLetter(t *testing.T) {
474474
ctx.Logf("deadLetterReceiverApplication - timeout waiting for messages from (%q)", deadLetterTopicName)
475475
return fmt.Errorf("deadLetterReceiverApplication - timeout waiting for messages from (%q)", deadLetterTopicName)
476476
case <-t.C:
477-
numMsgs, err := tm.GetMessages(deadLetterTopicName, msgTimeout, deadLetterSubcription, func(m *DataMessage) error {
478-
ctx.Logf("deadLetterReceiverApplication - received message counter(%d) (%v)\n", counter, m.Data)
479-
messagesWatcher.Observe(m.Data)
480-
return nil
481-
})
477+
numMsgs, err := tm.GetMessages(ctx, deadLetterTopicName, msgTimeout, deadLetterSubcription,
478+
func(m *DataMessage) error {
479+
ctx.Logf("deadLetterReceiverApplication - received message counter(%d) (%v)\n", counter, m.Data)
480+
messagesWatcher.Observe(m.Data)
481+
return nil
482+
})
482483
if err != nil {
483484
ctx.Logf("deadLetterReceiverApplication - failed to get messages from (%q) counter(%d) %v - trying again\n", deadLetterTopicName, counter, err)
484485
continue
@@ -797,11 +798,11 @@ func teardown(t *testing.T) {
797798
t.Logf("GCP PubSub CertificationTests teardown...")
798799
//Dapr runtime automatically creates the following subscriptions, topics
799800
//so here they get deleted.
800-
if err := deleteSubscriptions(projectID, subscriptions); err != nil {
801+
if err := deleteSubscriptions(t.Context(), projectID, subscriptions); err != nil {
801802
t.Log(err)
802803
}
803804

804-
if err := deleteTopics(projectID, topics); err != nil {
805+
if err := deleteTopics(t.Context(), projectID, topics); err != nil {
805806
t.Log(err)
806807
}
807808

0 commit comments

Comments
 (0)