Skip to content

Commit 92c6e28

Browse files
authored
improve: use ctx and timer instead sleep (#1256)
* improve: use ctx and timer instead sleep * Address comment
1 parent 35076ac commit 92c6e28

12 files changed

+263
-142
lines changed

pulsar/consumer_partition.go

Lines changed: 37 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package pulsar
1919

2020
import (
2121
"container/list"
22+
"context"
2223
"encoding/hex"
2324
"fmt"
2425
"math"
@@ -612,7 +613,6 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
612613
pc.log.WithField("state", state).Error("Failed to getLastMessageID for the closing or closed consumer")
613614
return nil, errors.New("failed to getLastMessageID for the closing or closed consumer")
614615
}
615-
remainTime := pc.client.operationTimeout
616616
bo := pc.backoffPolicyFunc()
617617
request := func() (*trackingMessageID, error) {
618618
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
@@ -622,23 +622,20 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
622622
<-req.doneCh
623623
return req.msgID, req.err
624624
}
625-
for {
626-
msgID, err := request()
627-
if err == nil {
628-
return msgID, nil
629-
}
630-
if remainTime <= 0 {
631-
pc.log.WithError(err).Error("Failed to getLastMessageID")
632-
return nil, fmt.Errorf("failed to getLastMessageID due to %w", err)
633-
}
625+
626+
ctx, cancel := context.WithTimeout(context.Background(), pc.client.operationTimeout)
627+
defer cancel()
628+
res, err := internal.Retry(ctx, request, func(err error) time.Duration {
634629
nextDelay := bo.Next()
635-
if nextDelay > remainTime {
636-
nextDelay = remainTime
637-
}
638-
remainTime -= nextDelay
639630
pc.log.WithError(err).Errorf("Failed to get last message id from broker, retrying in %v...", nextDelay)
640-
time.Sleep(nextDelay)
631+
return nextDelay
632+
})
633+
if err != nil {
634+
pc.log.WithError(err).Error("Failed to getLastMessageID")
635+
return nil, fmt.Errorf("failed to getLastMessageID due to %w", err)
641636
}
637+
638+
return res, nil
642639
}
643640

644641
func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) {
@@ -1805,8 +1802,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
18051802
pc.log.Debug("seek operation triggers reconnection, and reset isSeeking")
18061803
}
18071804
var (
1808-
maxRetry int
1809-
delayReconnectTime, totalDelayReconnectTime time.Duration
1805+
maxRetry int
18101806
)
18111807

18121808
if pc.options.maxReconnectToBroker == nil {
@@ -1816,50 +1812,39 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
18161812
}
18171813
bo := pc.backoffPolicyFunc()
18181814

1819-
for maxRetry != 0 {
1820-
if pc.getConsumerState() != consumerReady {
1821-
// Consumer is already closing
1822-
pc.log.Info("consumer state not ready, exit reconnect")
1823-
return
1824-
}
1825-
1826-
var assignedBrokerURL string
1815+
var assignedBrokerURL string
1816+
if connectionClosed != nil && connectionClosed.HasURL() {
1817+
assignedBrokerURL = connectionClosed.assignedBrokerURL
1818+
}
18271819

1828-
if connectionClosed != nil && connectionClosed.HasURL() {
1829-
delayReconnectTime = 0
1830-
assignedBrokerURL = connectionClosed.assignedBrokerURL
1831-
connectionClosed = nil // Attempt connecting to the assigned broker just once
1832-
} else {
1833-
delayReconnectTime = bo.Next()
1820+
opFn := func() (struct{}, error) {
1821+
if maxRetry == 0 {
1822+
return struct{}{}, nil
18341823
}
1835-
totalDelayReconnectTime += delayReconnectTime
1836-
1837-
pc.log.WithFields(log.Fields{
1838-
"assignedBrokerURL": assignedBrokerURL,
1839-
"delayReconnectTime": delayReconnectTime,
1840-
}).Info("Reconnecting to broker")
1841-
time.Sleep(delayReconnectTime)
18421824

1843-
// double check
18441825
if pc.getConsumerState() != consumerReady {
18451826
// Consumer is already closing
18461827
pc.log.Info("consumer state not ready, exit reconnect")
1847-
return
1828+
return struct{}{}, nil
18481829
}
18491830

18501831
err := pc.grabConn(assignedBrokerURL)
1832+
if assignedBrokerURL != "" {
1833+
// Attempt connecting to the assigned broker just once
1834+
assignedBrokerURL = ""
1835+
}
18511836
if err == nil {
18521837
// Successfully reconnected
18531838
pc.log.Info("Reconnected consumer to broker")
18541839
bo.Reset()
1855-
return
1840+
return struct{}{}, nil
18561841
}
18571842
pc.log.WithError(err).Error("Failed to create consumer at reconnect")
18581843
errMsg := err.Error()
18591844
if strings.Contains(errMsg, errMsgTopicNotFound) {
18601845
// when topic is deleted, we should give up reconnection.
18611846
pc.log.Warn("Topic Not Found.")
1862-
break
1847+
return struct{}{}, nil
18631848
}
18641849

18651850
if maxRetry > 0 {
@@ -1869,7 +1854,17 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
18691854
if maxRetry == 0 || bo.IsMaxBackoffReached() {
18701855
pc.metrics.ConsumersReconnectMaxRetry.Inc()
18711856
}
1857+
1858+
return struct{}{}, err
18721859
}
1860+
_, _ = internal.Retry(context.Background(), opFn, func(_ error) time.Duration {
1861+
delayReconnectTime := bo.Next()
1862+
pc.log.WithFields(log.Fields{
1863+
"assignedBrokerURL": assignedBrokerURL,
1864+
"delayReconnectTime": delayReconnectTime,
1865+
}).Info("Reconnecting to broker")
1866+
return delayReconnectTime
1867+
})
18731868
}
18741869

18751870
func (pc *partitionConsumer) lookupTopic(brokerServiceURL string) (*internal.LookupResult, error) {

pulsar/consumer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func TestConsumerConnectError(t *testing.T) {
129129
assert.Nil(t, consumer)
130130
assert.NotNil(t, err)
131131

132-
assert.Equal(t, err.Error(), "connection error")
132+
assert.ErrorContains(t, err, "connection error")
133133
}
134134

135135
func TestBatchMessageReceive(t *testing.T) {

pulsar/dlq_router.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"fmt"
2323
"time"
2424

25+
"github.com/apache/pulsar-client-go/pulsar/internal"
26+
2527
"github.com/apache/pulsar-client-go/pulsar/backoff"
2628

2729
"github.com/apache/pulsar-client-go/pulsar/log"
@@ -165,7 +167,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
165167

166168
// Retry to create producer indefinitely
167169
bo := r.backOffPolicyFunc()
168-
for {
170+
opFn := func() (Producer, error) {
169171
opt := r.policy.ProducerOptions
170172
opt.Topic = r.policy.DeadLetterTopic
171173
opt.Schema = schema
@@ -179,14 +181,17 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
179181
if r.policy.ProducerOptions.CompressionType == NoCompression {
180182
opt.CompressionType = LZ4
181183
}
182-
producer, err := r.client.CreateProducer(opt)
184+
return r.client.CreateProducer(opt)
185+
}
183186

184-
if err != nil {
185-
r.log.WithError(err).Error("Failed to create DLQ producer")
186-
time.Sleep(bo.Next())
187-
continue
188-
}
189-
r.producer = producer
190-
return producer
187+
res, err := internal.Retry(context.Background(), opFn, func(err error) time.Duration {
188+
r.log.WithError(err).Error("Failed to create DLQ producer")
189+
return bo.Next()
190+
})
191+
192+
if err == nil {
193+
r.producer = res
191194
}
195+
196+
return res
192197
}

pulsar/internal/http_client.go

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package internal
1919

2020
import (
2121
"bytes"
22+
"context"
2223
"crypto/tls"
2324
"crypto/x509"
2425
"encoding/json"
@@ -146,25 +147,27 @@ func (c *httpClient) MakeRequest(method, endpoint string) (*http.Response, error
146147
}
147148

148149
func (c *httpClient) Get(endpoint string, obj interface{}, params map[string]string) error {
149-
_, err := c.GetWithQueryParams(endpoint, obj, params, true)
150-
if _, ok := err.(*url.Error); ok {
151-
// We can retry this kind of requests over a connection error because they're
152-
// not specific to a particular broker.
153-
bo := backoff.NewDefaultBackoffWithInitialBackOff(100 * time.Millisecond)
154-
startTime := time.Now()
155-
var retryTime time.Duration
156-
157-
for time.Since(startTime) < c.requestTimeout {
158-
retryTime = bo.Next()
159-
c.log.Debugf("Retrying httpRequest in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
160-
time.Sleep(retryTime)
161-
_, err = c.GetWithQueryParams(endpoint, obj, params, true)
162-
if _, ok := err.(*url.Error); !ok {
163-
// We either succeeded or encountered a non connection error
164-
break
165-
}
150+
var err error
151+
opFn := func() (struct{}, error) {
152+
_, err = c.GetWithQueryParams(endpoint, obj, params, true)
153+
if _, ok := err.(*url.Error); ok {
154+
// We can retry this kind of requests over a connection error because they're
155+
// not specific to a particular broker.
156+
return struct{}{}, err
166157
}
158+
return struct{}{}, nil
167159
}
160+
161+
bo := backoff.NewDefaultBackoffWithInitialBackOff(100 * time.Millisecond)
162+
163+
ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout)
164+
defer cancel()
165+
166+
_, _ = Retry(ctx, opFn, func(_ error) time.Duration {
167+
retryTime := bo.Next()
168+
c.log.Debugf("Retrying httpRequest in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
169+
return retryTime
170+
})
168171
return err
169172
}
170173

pulsar/internal/retry.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package internal
19+
20+
import (
21+
"context"
22+
"errors"
23+
"time"
24+
)
25+
26+
type OpFn[T any] func() (T, error)
27+
28+
// Retry the given operation until the returned error is nil or the context is done.
29+
func Retry[T any](ctx context.Context, op OpFn[T], nextDuration func(error) time.Duration) (T, error) {
30+
var (
31+
timer *time.Timer
32+
res T
33+
err error
34+
)
35+
36+
cleanTimer := func() {
37+
if timer != nil {
38+
timer.Stop()
39+
}
40+
}
41+
defer cleanTimer()
42+
43+
for {
44+
res, err = op()
45+
if err == nil {
46+
return res, nil
47+
}
48+
49+
duration := nextDuration(err)
50+
if timer == nil {
51+
timer = time.NewTimer(duration)
52+
} else {
53+
timer.Reset(duration)
54+
}
55+
56+
select {
57+
case <-ctx.Done():
58+
return res, errors.Join(ctx.Err(), err)
59+
case <-timer.C:
60+
}
61+
}
62+
}

pulsar/internal/retry_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package internal
19+
20+
import (
21+
"context"
22+
"errors"
23+
"testing"
24+
"time"
25+
26+
"github.com/stretchr/testify/require"
27+
)
28+
29+
func TestRetryWithCtxBackground(t *testing.T) {
30+
ctx := context.Background()
31+
i := 0
32+
res, err := Retry(ctx, func() (string, error) {
33+
if i == 2 {
34+
return "ok", nil
35+
}
36+
i++
37+
return "", errors.New("error")
38+
}, func(_ error) time.Duration {
39+
return 1 * time.Second
40+
})
41+
require.NoError(t, err)
42+
require.Equal(t, "ok", res)
43+
}
44+
45+
func TestRetryWithCtxTimeout(t *testing.T) {
46+
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
47+
defer cancelFn()
48+
retryErr := errors.New("error")
49+
res, err := Retry(ctx, func() (string, error) {
50+
return "", retryErr
51+
}, func(err error) time.Duration {
52+
require.Equal(t, retryErr, err)
53+
return 1 * time.Second
54+
})
55+
require.ErrorIs(t, err, context.DeadlineExceeded)
56+
require.ErrorContains(t, err, retryErr.Error())
57+
require.Equal(t, "", res)
58+
}

0 commit comments

Comments
 (0)