Skip to content

Commit 3068114

Browse files
committed
fix: replace panic with proper error handling in RPC client and lookup service
1 parent 1f9e828 commit 3068114

File tree

6 files changed

+97
-22
lines changed

6 files changed

+97
-22
lines changed

pulsar/client_impl.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,16 @@ func newClient(options ClientOptions) (Client, error) {
171171
tlsEnabled: tlsConfig != nil,
172172
}
173173

174-
c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout, logger, metrics,
174+
c.rpcClient, err = internal.NewRPCClient(url, c.cnxPool, operationTimeout, logger, metrics,
175175
options.ListenerName, tlsConfig, authProvider, toKeyValues(options.LookupProperties))
176+
if err != nil {
177+
return nil, err
178+
}
176179

177-
c.lookupService = c.rpcClient.LookupService("")
180+
c.lookupService, err = c.rpcClient.LookupService("")
181+
if err != nil {
182+
return nil, err
183+
}
178184

179185
c.handlers = internal.NewClientHandlers()
180186

pulsar/consumer_partition.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1934,7 +1934,11 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
19341934

19351935
func (pc *partitionConsumer) lookupTopic(brokerServiceURL string) (*internal.LookupResult, error) {
19361936
if len(brokerServiceURL) == 0 {
1937-
lr, err := pc.client.rpcClient.LookupService(pc.redirectedClusterURI).Lookup(pc.topic)
1937+
lookupService, err := pc.client.rpcClient.LookupService(pc.redirectedClusterURI)
1938+
if err != nil {
1939+
return nil, fmt.Errorf("failed to get lookup service: %w", err)
1940+
}
1941+
lr, err := lookupService.Lookup(pc.topic)
19381942
if err != nil {
19391943
pc.log.WithError(err).Warn("Failed to lookup topic")
19401944
return nil, err

pulsar/internal/lookup_service_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ func (c *mockedLookupRPCClient) RequestToHost(_ *ServiceNameResolver, requestID
8282
return c.RequestToAnyBroker(requestID, cmdType, message)
8383
}
8484

85-
func (c *mockedLookupRPCClient) LookupService(_ string) LookupService {
86-
return nil
85+
func (c *mockedLookupRPCClient) LookupService(_ string) (LookupService, error) {
86+
return nil, nil
8787
}
8888

8989
func (c *mockedLookupRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, _ uint64,
@@ -521,8 +521,8 @@ func (m *mockedPartitionedTopicMetadataRPCClient) RequestToHost(_ *ServiceNameRe
521521
return m.RequestToAnyBroker(requestID, cmdType, message)
522522
}
523523

524-
func (m *mockedPartitionedTopicMetadataRPCClient) LookupService(_ string) LookupService {
525-
return nil
524+
func (m *mockedPartitionedTopicMetadataRPCClient) LookupService(_ string) (LookupService, error) {
525+
return nil, nil
526526
}
527527

528528
func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {

pulsar/internal/rpc_client.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ type RPCClient interface {
7474

7575
RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
7676

77-
LookupService(URL string) LookupService
77+
LookupService(URL string) (LookupService, error)
7878
}
7979

8080
type rpcClient struct {
@@ -96,7 +96,7 @@ type rpcClient struct {
9696

9797
func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
9898
requestTimeout time.Duration, logger log.Logger, metrics *Metrics,
99-
listenerName string, tlsConfig *TLSOptions, authProvider auth.Provider, lookupProperties []*pb.KeyValue) RPCClient {
99+
listenerName string, tlsConfig *TLSOptions, authProvider auth.Provider, lookupProperties []*pb.KeyValue) (RPCClient, error) {
100100
c := rpcClient{
101101
pool: pool,
102102
requestTimeout: requestTimeout,
@@ -110,11 +110,11 @@ func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
110110
}
111111
lookupService, err := c.NewLookupService(serviceURL)
112112
if err != nil {
113-
panic(err)
113+
return nil, fmt.Errorf("failed to create lookup service: %w", err)
114114
}
115115
c.lookupService = lookupService
116116

117-
return &c
117+
return &c, nil
118118
}
119119

120120
func (c *rpcClient) requestToHost(serviceNameResolver *ServiceNameResolver,
@@ -220,29 +220,28 @@ func (c *rpcClient) NewConsumerID() uint64 {
220220
return atomic.AddUint64(&c.consumerIDGenerator, 1)
221221
}
222222

223-
func (c *rpcClient) LookupService(URL string) LookupService {
223+
func (c *rpcClient) LookupService(URL string) (LookupService, error) {
224224
if URL == "" {
225-
return c.lookupService
225+
return c.lookupService, nil
226226
}
227227
c.urlLookupServiceMapLock.Lock()
228228
defer c.urlLookupServiceMapLock.Unlock()
229229
lookupService, ok := c.urlLookupServiceMap[URL]
230230
if ok {
231-
return lookupService
231+
return lookupService, nil
232232
}
233233

234234
serviceURL, err := url.Parse(URL)
235235
if err != nil {
236-
panic(err)
236+
return nil, fmt.Errorf("failed to parse URL '%s': %w", URL, err)
237237
}
238238

239239
lookupService, err = c.NewLookupService(serviceURL)
240240
if err != nil {
241-
panic(err)
241+
return nil, fmt.Errorf("failed to create lookup service for URL '%s': %w", URL, err)
242242
}
243243
c.urlLookupServiceMap[URL] = lookupService
244-
return lookupService
245-
244+
return lookupService, nil
246245
}
247246

248247
func (c *rpcClient) NewLookupService(url *url.URL) (LookupService, error) {
@@ -263,7 +262,7 @@ func (c *rpcClient) NewLookupService(url *url.URL) (LookupService, error) {
263262
return NewHTTPLookupService(
264263
httpClient, url, serviceNameResolver, c.tlsConfig != nil, c.log, c.metrics), nil
265264
default:
266-
panic(fmt.Sprintf("Invalid URL scheme '%s'", url.Scheme))
265+
return nil, fmt.Errorf("Invalid URL scheme '%s'", url.Scheme)
267266
}
268267
}
269268

pulsar/internal/rpc_client_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
package internal
18+
19+
import (
20+
"net/url"
21+
"testing"
22+
23+
"github.com/apache/pulsar-client-go/pulsar/log"
24+
"github.com/stretchr/testify/assert"
25+
)
26+
27+
func TestNewRPCClient_InvalidURL_ShouldNotPanic(t *testing.T) {
28+
// Test that NewRPCClient doesn't panic with invalid URL
29+
invalidURL, _ := url.Parse("invalid://scheme")
30+
31+
// This should not panic and should return an error
32+
_, err := NewRPCClient(invalidURL, nil, 0, log.DefaultNopLogger(), nil, "", nil, nil, nil)
33+
assert.Error(t, err)
34+
assert.Contains(t, err.Error(), "Invalid URL scheme")
35+
}
36+
37+
func TestLookupService_InvalidURL_ShouldNotPanic(t *testing.T) {
38+
// Create a minimal RPC client for testing
39+
validURL, _ := url.Parse("pulsar://localhost:6650")
40+
rpcClient, err := NewRPCClient(validURL, nil, 0, log.DefaultNopLogger(), nil, "", nil, nil, nil)
41+
assert.NoError(t, err)
42+
43+
// Test that LookupService doesn't panic with invalid URL
44+
_, err = rpcClient.LookupService("invalid://url")
45+
assert.Error(t, err)
46+
assert.Contains(t, err.Error(), "Invalid URL scheme")
47+
}
48+
49+
func TestLookupService_InvalidScheme_ShouldNotPanic(t *testing.T) {
50+
// Create a minimal RPC client for testing
51+
validURL, _ := url.Parse("pulsar://localhost:6650")
52+
rpcClient, err := NewRPCClient(validURL, nil, 0, log.DefaultNopLogger(), nil, "", nil, nil, nil)
53+
assert.NoError(t, err)
54+
55+
// Test that LookupService doesn't panic with invalid scheme
56+
_, err = rpcClient.LookupService("ftp://localhost:21")
57+
assert.Error(t, err)
58+
assert.Contains(t, err.Error(), "Invalid URL scheme")
59+
}

pulsar/producer_partition.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,11 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
244244

245245
func (p *partitionProducer) lookupTopic(brokerServiceURL string) (*internal.LookupResult, error) {
246246
if len(brokerServiceURL) == 0 {
247-
lr, err := p.client.rpcClient.LookupService(p.redirectedClusterURI).Lookup(p.topic)
247+
lookupService, err := p.client.rpcClient.LookupService(p.redirectedClusterURI)
248+
if err != nil {
249+
return nil, fmt.Errorf("failed to get lookup service: %w", err)
250+
}
251+
lr, err := lookupService.Lookup(p.topic)
248252
if err != nil {
249253
p.log.WithError(err).Warn("Failed to lookup topic")
250254
return nil, err
@@ -253,8 +257,11 @@ func (p *partitionProducer) lookupTopic(brokerServiceURL string) (*internal.Look
253257
p.log.Debug("Lookup result: ", lr)
254258
return lr, err
255259
}
256-
return p.client.rpcClient.LookupService(p.redirectedClusterURI).
257-
GetBrokerAddress(brokerServiceURL, p._getConn().IsProxied())
260+
lookupService, err := p.client.rpcClient.LookupService(p.redirectedClusterURI)
261+
if err != nil {
262+
return nil, fmt.Errorf("failed to get lookup service: %w", err)
263+
}
264+
return lookupService.GetBrokerAddress(brokerServiceURL, p._getConn().IsProxied())
258265
}
259266

260267
func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {

0 commit comments

Comments
 (0)