Skip to content

Commit c3ba0a3

Browse files
committed
Adiciona timeout para envio
Inclui parâmetro de tempo limite e utiliza contexto com timeout na publicação. Ajusta configuração e tratamento de atributos para garantir resposta rápida.
1 parent b646ab2 commit c3ba0a3

File tree

3 files changed

+20
-7
lines changed

3 files changed

+20
-7
lines changed

wrapper/publisher/driver/contrib/cloud.google.com/pubsub/v1/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const (
1414
logRoot = ".log"
1515
orderingKey = ".orderingKey"
1616
level = logRoot + ".level"
17+
publishTimeout = ".publishTimeout"
1718
settings = ".settings"
1819
delayThreshold = settings + ".delayThreshold"
1920
countThreshold = settings + ".countThreshold"
@@ -60,4 +61,5 @@ func ConfigAdd(path string) {
6061
config.Add(path+limitExceededBehavior, 1, "behavior when flow control limits are exceeded: Block or Ignore")
6162
config.Add(path+enableCompression, false, "whether to compress messages before sending")
6263
config.Add(path+compressionBytesThreshold, 240, "the minimum size a message must be to be compressed before sending")
64+
config.Add(path+publishTimeout, 60*time.Second, "the maximum duration to wait for a publish to complete")
6365
}

wrapper/publisher/driver/contrib/cloud.google.com/pubsub/v1/options.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package pubsub
22

33
import (
4+
"time"
5+
46
"cloud.google.com/go/pubsub"
57
"github.com/xgodev/boost/wrapper/config"
68
)
@@ -10,8 +12,10 @@ type Options struct {
1012
Log struct {
1113
Level string
1214
}
13-
OrderingKey bool
14-
Settings pubsub.PublishSettings
15+
OrderingKey bool
16+
Settings pubsub.PublishSettings
17+
Timeout time.Duration
18+
PublishTimeout time.Duration
1519
}
1620

1721
// NewOptions returns options from config file or environment vars.

wrapper/publisher/driver/contrib/cloud.google.com/pubsub/v1/pubsub.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,9 @@ func (p *client) send(ctx context.Context, events []*v2.Event) ([]publisher.Publ
7474
go func(ev *v2.Event) {
7575
defer wg.Done()
7676

77-
ctx := context.Background()
77+
base := context.WithoutCancel(ctx)
78+
pubCtx, cancel := context.WithTimeout(base, p.options.PublishTimeout)
79+
defer cancel()
7880

7981
logger := log.WithField("subject", ev.Subject()).
8082
WithField("id", ev.ID())
@@ -99,12 +101,17 @@ func (p *client) send(ctx context.Context, events []*v2.Event) ([]publisher.Publ
99101
"ce_id": ev.ID(),
100102
"ce_source": ev.Source(),
101103
"ce_type": ev.Type(),
102-
"content-type": ev.DataContentType(),
103-
"ce_time": ev.Time().String(),
104+
"ce_time": ev.Time().UTC().Format(time.RFC3339),
104105
"ce_path": "/",
105106
"ce_subject": ev.Subject(),
106107
}
107108

109+
if ct := ev.DataContentType(); ct != "" {
110+
attrs["content-type"] = ct
111+
} else {
112+
attrs["content-type"] = "application/json"
113+
}
114+
108115
msg := &pubsub.Message{ID: ev.ID(), Data: raw, Attributes: attrs, PublishTime: time.Now()}
109116
if p.options.OrderingKey {
110117
if pk, err := p.getPartitionKey(ev); err == nil {
@@ -115,8 +122,8 @@ func (p *client) send(ctx context.Context, events []*v2.Event) ([]publisher.Publ
115122
topic := p.getTopic(ev.Subject())
116123
err = try.Do(func(attempt int) (bool, error) {
117124
logger.Tracef("publishing to topic %s, attempt %d", ev.Subject(), attempt)
118-
r := topic.Publish(ctx, msg)
119-
if _, err := r.Get(ctx); err != nil {
125+
r := topic.Publish(pubCtx, msg)
126+
if _, err := r.Get(pubCtx); err != nil {
120127
log.Error(err)
121128
return attempt < 5, errors.NewInternal(err, "Pub/Sub publish failed")
122129
}

0 commit comments

Comments
 (0)