Skip to content

Commit 6566ac5

Browse files
committed
update pubsub
1 parent c32b990 commit 6566ac5

File tree

3 files changed

+22
-8
lines changed

3 files changed

+22
-8
lines changed

consumer.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func ConfigureSubscription(subscription *pubsub.Subscription, c SubscriptionConf
4444
return subscription
4545
}
4646

47-
func (c *Consumer) Consume(ctx context.Context, caller mq.ConsumerCaller) {
47+
func (c *Consumer) Consume(ctx context.Context, handle func(context.Context, *mq.Message, error) error) {
4848
er1 := c.Subscription.Receive(ctx, func(ctx2 context.Context, m *pubsub.Message) {
4949
message := mq.Message{
5050
Id: m.ID,
@@ -55,9 +55,9 @@ func (c *Consumer) Consume(ctx context.Context, caller mq.ConsumerCaller) {
5555
if c.AckOnConsume {
5656
m.Ack()
5757
}
58-
caller.Call(ctx2, &message, nil)
58+
handle(ctx2, &message, nil)
5959
})
6060
if er1 != nil {
61-
caller.Call(ctx, nil, er1)
61+
handle(ctx, nil, er1)
6262
}
6363
}

producer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,13 @@ func ConfigureTopic(topic *pubsub.Topic, c TopicConfig) *pubsub.Topic {
5555
return topic
5656
}
5757

58-
func (c *Producer) Produce(ctx context.Context, data []byte, messageAttributes *map[string]string) (string, error) {
58+
func (c *Producer) Produce(ctx context.Context, data []byte, messageAttributes map[string]string) (string, error) {
5959
msg := &pubsub.Message{
6060
Data: data,
6161
}
6262

6363
if messageAttributes != nil {
64-
msg.Attributes = *messageAttributes
64+
msg.Attributes = messageAttributes
6565
}
6666

6767
publishResult := c.Topic.Publish(ctx, msg)

pubsub_health_checker.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,24 @@ type PubSubHealthChecker struct {
2323
resourceId string
2424
}
2525

26-
func NewPubSubHealthChecker(name string, client *pubsub.Client, timeout time.Duration, permissionType PermissionType, resourceId string) *PubSubHealthChecker {
27-
return &PubSubHealthChecker{name, client, timeout, permissionType, resourceId}
26+
func NewPubSubHealthChecker(name string, client *pubsub.Client, resourceId string, permissionType PermissionType, timeout ...time.Duration) *PubSubHealthChecker {
27+
if len(timeout) >= 1 {
28+
return &PubSubHealthChecker{name: name, client: client, permissionType: permissionType, resourceId: resourceId, timeout: timeout[0]}
29+
}
30+
return &PubSubHealthChecker{name: name, client: client, permissionType: permissionType, resourceId: resourceId, timeout: 4 * time.Second}
31+
}
32+
func NewPubHealthChecker(name string, client *pubsub.Client, resourceId string, timeout ...time.Duration) *PubSubHealthChecker {
33+
if len(timeout) >= 1 {
34+
return &PubSubHealthChecker{name: name, client: client, permissionType: PermissionPublish, resourceId: resourceId, timeout: timeout[0]}
35+
}
36+
return &PubSubHealthChecker{name: name, client: client, permissionType: PermissionPublish, resourceId: resourceId, timeout: 4 * time.Second}
37+
}
38+
func NewSubHealthChecker(name string, client *pubsub.Client, resourceId string, timeout ...time.Duration) *PubSubHealthChecker {
39+
if len(timeout) >= 1 {
40+
return &PubSubHealthChecker{name: name, client: client, permissionType: PermissionSubscribe, resourceId: resourceId, timeout: timeout[0]}
41+
}
42+
return &PubSubHealthChecker{name: name, client: client, permissionType: PermissionSubscribe, resourceId: resourceId, timeout: 4 * time.Second}
2843
}
29-
3044
func (h *PubSubHealthChecker) Name() string {
3145
return h.name
3246
}

0 commit comments

Comments
 (0)