Skip to content

Commit 06f4dd8

Browse files
authored
fix: replace panic with proper error handling in RPC client and lookup service (#1403)
### Motivation Fixes #1402 The current implementation uses `panic()` calls in several critical code paths within the RPC client and lookup service, which can cause the entire application to crash when encountering invalid URLs or configuration errors. This is not a good practice for a client library as it makes the application unstable and difficult to handle gracefully. The main issues addressed: - `NewRPCClient()` panics when creating lookup service fails - `LookupService()` panics when parsing invalid URLs or creating lookup services - `NewLookupService()` panics for invalid URL schemes ### Modifications - **RPC Client Interface**: Modified `LookupService()` method signature to return `(LookupService, error)` instead of just `LookupService` - **NewRPCClient**: Changed to return `(RPCClient, error)` and handle lookup service creation errors properly - **LookupService Implementation**: Replaced all `panic()` calls with proper error returns and descriptive error
1 parent 1f9e828 commit 06f4dd8

File tree

6 files changed

+99
-22
lines changed

6 files changed

+99
-22
lines changed

pulsar/client_impl.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,16 @@ func newClient(options ClientOptions) (Client, error) {
171171
tlsEnabled: tlsConfig != nil,
172172
}
173173

174-
c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout, logger, metrics,
174+
c.rpcClient, err = internal.NewRPCClient(url, c.cnxPool, operationTimeout, logger, metrics,
175175
options.ListenerName, tlsConfig, authProvider, toKeyValues(options.LookupProperties))
176+
if err != nil {
177+
return nil, err
178+
}
176179

177-
c.lookupService = c.rpcClient.LookupService("")
180+
c.lookupService, err = c.rpcClient.LookupService("")
181+
if err != nil {
182+
return nil, err
183+
}
178184

179185
c.handlers = internal.NewClientHandlers()
180186

pulsar/consumer_partition.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1934,7 +1934,11 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
19341934

19351935
func (pc *partitionConsumer) lookupTopic(brokerServiceURL string) (*internal.LookupResult, error) {
19361936
if len(brokerServiceURL) == 0 {
1937-
lr, err := pc.client.rpcClient.LookupService(pc.redirectedClusterURI).Lookup(pc.topic)
1937+
lookupService, err := pc.client.rpcClient.LookupService(pc.redirectedClusterURI)
1938+
if err != nil {
1939+
return nil, fmt.Errorf("failed to get lookup service: %w", err)
1940+
}
1941+
lr, err := lookupService.Lookup(pc.topic)
19381942
if err != nil {
19391943
pc.log.WithError(err).Warn("Failed to lookup topic")
19401944
return nil, err

pulsar/internal/lookup_service_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ func (c *mockedLookupRPCClient) RequestToHost(_ *ServiceNameResolver, requestID
8282
return c.RequestToAnyBroker(requestID, cmdType, message)
8383
}
8484

85-
func (c *mockedLookupRPCClient) LookupService(_ string) LookupService {
86-
return nil
85+
func (c *mockedLookupRPCClient) LookupService(_ string) (LookupService, error) {
86+
return nil, nil
8787
}
8888

8989
func (c *mockedLookupRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, _ uint64,
@@ -521,8 +521,8 @@ func (m *mockedPartitionedTopicMetadataRPCClient) RequestToHost(_ *ServiceNameRe
521521
return m.RequestToAnyBroker(requestID, cmdType, message)
522522
}
523523

524-
func (m *mockedPartitionedTopicMetadataRPCClient) LookupService(_ string) LookupService {
525-
return nil
524+
func (m *mockedPartitionedTopicMetadataRPCClient) LookupService(_ string) (LookupService, error) {
525+
return nil, nil
526526
}
527527

528528
func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {

pulsar/internal/rpc_client.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ type RPCClient interface {
7474

7575
RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
7676

77-
LookupService(URL string) LookupService
77+
LookupService(URL string) (LookupService, error)
7878
}
7979

8080
type rpcClient struct {
@@ -96,7 +96,8 @@ type rpcClient struct {
9696

9797
func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
9898
requestTimeout time.Duration, logger log.Logger, metrics *Metrics,
99-
listenerName string, tlsConfig *TLSOptions, authProvider auth.Provider, lookupProperties []*pb.KeyValue) RPCClient {
99+
listenerName string, tlsConfig *TLSOptions, authProvider auth.Provider,
100+
lookupProperties []*pb.KeyValue) (RPCClient, error) {
100101
c := rpcClient{
101102
pool: pool,
102103
requestTimeout: requestTimeout,
@@ -110,11 +111,11 @@ func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
110111
}
111112
lookupService, err := c.NewLookupService(serviceURL)
112113
if err != nil {
113-
panic(err)
114+
return nil, fmt.Errorf("failed to create lookup service: %w", err)
114115
}
115116
c.lookupService = lookupService
116117

117-
return &c
118+
return &c, nil
118119
}
119120

120121
func (c *rpcClient) requestToHost(serviceNameResolver *ServiceNameResolver,
@@ -220,29 +221,28 @@ func (c *rpcClient) NewConsumerID() uint64 {
220221
return atomic.AddUint64(&c.consumerIDGenerator, 1)
221222
}
222223

223-
func (c *rpcClient) LookupService(URL string) LookupService {
224+
func (c *rpcClient) LookupService(URL string) (LookupService, error) {
224225
if URL == "" {
225-
return c.lookupService
226+
return c.lookupService, nil
226227
}
227228
c.urlLookupServiceMapLock.Lock()
228229
defer c.urlLookupServiceMapLock.Unlock()
229230
lookupService, ok := c.urlLookupServiceMap[URL]
230231
if ok {
231-
return lookupService
232+
return lookupService, nil
232233
}
233234

234235
serviceURL, err := url.Parse(URL)
235236
if err != nil {
236-
panic(err)
237+
return nil, fmt.Errorf("failed to parse URL '%s': %w", URL, err)
237238
}
238239

239240
lookupService, err = c.NewLookupService(serviceURL)
240241
if err != nil {
241-
panic(err)
242+
return nil, fmt.Errorf("failed to create lookup service for URL '%s': %w", URL, err)
242243
}
243244
c.urlLookupServiceMap[URL] = lookupService
244-
return lookupService
245-
245+
return lookupService, nil
246246
}
247247

248248
func (c *rpcClient) NewLookupService(url *url.URL) (LookupService, error) {
@@ -263,7 +263,7 @@ func (c *rpcClient) NewLookupService(url *url.URL) (LookupService, error) {
263263
return NewHTTPLookupService(
264264
httpClient, url, serviceNameResolver, c.tlsConfig != nil, c.log, c.metrics), nil
265265
default:
266-
panic(fmt.Sprintf("Invalid URL scheme '%s'", url.Scheme))
266+
return nil, fmt.Errorf("invalid URL scheme '%s'", url.Scheme)
267267
}
268268
}
269269

pulsar/internal/rpc_client_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package internal
19+
20+
import (
21+
"net/url"
22+
"testing"
23+
24+
"github.com/apache/pulsar-client-go/pulsar/log"
25+
"github.com/stretchr/testify/assert"
26+
)
27+
28+
func TestNewRPCClient_InvalidURL_ShouldNotPanic(t *testing.T) {
29+
// Test that NewRPCClient doesn't panic with invalid URL
30+
invalidURL, _ := url.Parse("invalid://scheme")
31+
32+
// This should not panic and should return an error
33+
_, err := NewRPCClient(invalidURL, nil, 0, log.DefaultNopLogger(), nil, "", nil, nil, nil)
34+
assert.Error(t, err)
35+
assert.Contains(t, err.Error(), "invalid URL scheme")
36+
}
37+
38+
func TestLookupService_InvalidURL_ShouldNotPanic(t *testing.T) {
39+
// Create a minimal RPC client for testing
40+
validURL, _ := url.Parse("pulsar://localhost:6650")
41+
rpcClient, err := NewRPCClient(validURL, nil, 0, log.DefaultNopLogger(), nil, "", nil, nil, nil)
42+
assert.NoError(t, err)
43+
44+
// Test that LookupService doesn't panic with invalid URL
45+
_, err = rpcClient.LookupService("invalid://url")
46+
assert.Error(t, err)
47+
assert.Contains(t, err.Error(), "invalid URL scheme")
48+
}
49+
50+
func TestLookupService_InvalidScheme_ShouldNotPanic(t *testing.T) {
51+
// Create a minimal RPC client for testing
52+
validURL, _ := url.Parse("pulsar://localhost:6650")
53+
rpcClient, err := NewRPCClient(validURL, nil, 0, log.DefaultNopLogger(), nil, "", nil, nil, nil)
54+
assert.NoError(t, err)
55+
56+
// Test that LookupService doesn't panic with invalid scheme
57+
_, err = rpcClient.LookupService("ftp://localhost:21")
58+
assert.Error(t, err)
59+
assert.Contains(t, err.Error(), "invalid URL scheme")
60+
}

pulsar/producer_partition.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,11 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
244244

245245
func (p *partitionProducer) lookupTopic(brokerServiceURL string) (*internal.LookupResult, error) {
246246
if len(brokerServiceURL) == 0 {
247-
lr, err := p.client.rpcClient.LookupService(p.redirectedClusterURI).Lookup(p.topic)
247+
lookupService, err := p.client.rpcClient.LookupService(p.redirectedClusterURI)
248+
if err != nil {
249+
return nil, fmt.Errorf("failed to get lookup service: %w", err)
250+
}
251+
lr, err := lookupService.Lookup(p.topic)
248252
if err != nil {
249253
p.log.WithError(err).Warn("Failed to lookup topic")
250254
return nil, err
@@ -253,8 +257,11 @@ func (p *partitionProducer) lookupTopic(brokerServiceURL string) (*internal.Look
253257
p.log.Debug("Lookup result: ", lr)
254258
return lr, err
255259
}
256-
return p.client.rpcClient.LookupService(p.redirectedClusterURI).
257-
GetBrokerAddress(brokerServiceURL, p._getConn().IsProxied())
260+
lookupService, err := p.client.rpcClient.LookupService(p.redirectedClusterURI)
261+
if err != nil {
262+
return nil, fmt.Errorf("failed to get lookup service: %w", err)
263+
}
264+
return lookupService.GetBrokerAddress(brokerServiceURL, p._getConn().IsProxied())
258265
}
259266

260267
func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {

0 commit comments

Comments
 (0)