Skip to content

Commit 6e5c7d3

Browse files
authored
Fixed tls connection issue (apache#220)
* Fixed Lookup service.
1 parent 4e52758 commit 6e5c7d3

File tree

3 files changed

+124
-8
lines changed

3 files changed

+124
-8
lines changed

pulsar/client_impl.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func newClient(options ClientOptions) (Client, error) {
9595
cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout),
9696
}
9797
c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout)
98-
c.lookupService = internal.NewLookupService(c.rpcClient, url)
98+
c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig != nil)
9999
c.handlers = internal.NewClientHandlers()
100100
return c, nil
101101
}

pulsar/internal/lookup_service.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,26 @@ type LookupService interface {
4444
type lookupService struct {
4545
rpcClient RPCClient
4646
serviceURL *url.URL
47+
tlsEnabled bool
4748
}
4849

4950
// NewLookupService init a lookup service struct and return an object of LookupService.
50-
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL) LookupService {
51+
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, tlsEnabled bool) LookupService {
5152
return &lookupService{
5253
rpcClient: rpcClient,
5354
serviceURL: serviceURL,
55+
tlsEnabled: tlsEnabled,
5456
}
5557
}
5658

5759
func (ls *lookupService) getBrokerAddress(lr *pb.CommandLookupTopicResponse) (logicalAddress *url.URL,
5860
physicalAddress *url.URL, err error) {
59-
logicalAddress, err = url.ParseRequestURI(lr.GetBrokerServiceUrl())
61+
if ls.tlsEnabled {
62+
logicalAddress, err = url.ParseRequestURI(lr.GetBrokerServiceUrlTls())
63+
} else {
64+
logicalAddress, err = url.ParseRequestURI(lr.GetBrokerServiceUrl())
65+
}
66+
6067
if err != nil {
6168
return nil, nil, err
6269
}

pulsar/internal/lookup_service_test.go

Lines changed: 114 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func TestLookupSuccess(t *testing.T) {
127127
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
128128
},
129129
},
130-
}, url)
130+
}, url, false)
131131

132132
lr, err := ls.Lookup("my-topic")
133133
assert.NoError(t, err)
@@ -137,6 +137,38 @@ func TestLookupSuccess(t *testing.T) {
137137
assert.Equal(t, "pulsar://broker-1:6650", lr.PhysicalAddr.String())
138138
}
139139

140+
func TestTlsLookupSuccess(t *testing.T) {
141+
url, err := url.Parse("pulsar+ssl://example:6651")
142+
assert.NoError(t, err)
143+
144+
ls := NewLookupService(&mockedRPCClient{
145+
t: t,
146+
147+
expectedRequests: []pb.CommandLookupTopic{
148+
{
149+
RequestId: proto.Uint64(1),
150+
Topic: proto.String("my-topic"),
151+
Authoritative: proto.Bool(false),
152+
},
153+
},
154+
mockedResponses: []pb.CommandLookupTopicResponse{
155+
{
156+
RequestId: proto.Uint64(1),
157+
Response: responseType(pb.CommandLookupTopicResponse_Connect),
158+
Authoritative: proto.Bool(true),
159+
BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"),
160+
},
161+
},
162+
}, url, true)
163+
164+
lr, err := ls.Lookup("my-topic")
165+
assert.NoError(t, err)
166+
assert.NotNil(t, lr)
167+
168+
assert.Equal(t, "pulsar+ssl://broker-1:6651", lr.LogicalAddr.String())
169+
assert.Equal(t, "pulsar+ssl://broker-1:6651", lr.PhysicalAddr.String())
170+
}
171+
140172
func TestLookupWithProxy(t *testing.T) {
141173
url, err := url.Parse("pulsar://example:6650")
142174
assert.NoError(t, err)
@@ -160,7 +192,7 @@ func TestLookupWithProxy(t *testing.T) {
160192
ProxyThroughServiceUrl: proto.Bool(true),
161193
},
162194
},
163-
}, url)
195+
}, url, false)
164196

165197
lr, err := ls.Lookup("my-topic")
166198
assert.NoError(t, err)
@@ -170,6 +202,39 @@ func TestLookupWithProxy(t *testing.T) {
170202
assert.Equal(t, "pulsar://example:6650", lr.PhysicalAddr.String())
171203
}
172204

205+
func TestTlsLookupWithProxy(t *testing.T) {
206+
url, err := url.Parse("pulsar+ssl://example:6651")
207+
assert.NoError(t, err)
208+
209+
ls := NewLookupService(&mockedRPCClient{
210+
t: t,
211+
212+
expectedRequests: []pb.CommandLookupTopic{
213+
{
214+
RequestId: proto.Uint64(1),
215+
Topic: proto.String("my-topic"),
216+
Authoritative: proto.Bool(false),
217+
},
218+
},
219+
mockedResponses: []pb.CommandLookupTopicResponse{
220+
{
221+
RequestId: proto.Uint64(1),
222+
Response: responseType(pb.CommandLookupTopicResponse_Connect),
223+
Authoritative: proto.Bool(true),
224+
BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"),
225+
ProxyThroughServiceUrl: proto.Bool(true),
226+
},
227+
},
228+
}, url, true)
229+
230+
lr, err := ls.Lookup("my-topic")
231+
assert.NoError(t, err)
232+
assert.NotNil(t, lr)
233+
234+
assert.Equal(t, "pulsar+ssl://broker-1:6651", lr.LogicalAddr.String())
235+
assert.Equal(t, "pulsar+ssl://example:6651", lr.PhysicalAddr.String())
236+
}
237+
173238
func TestLookupWithRedirect(t *testing.T) {
174239
url, err := url.Parse("pulsar://example:6650")
175240
assert.NoError(t, err)
@@ -204,7 +269,7 @@ func TestLookupWithRedirect(t *testing.T) {
204269
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
205270
},
206271
},
207-
}, url)
272+
}, url, false)
208273

209274
lr, err := ls.Lookup("my-topic")
210275
assert.NoError(t, err)
@@ -214,6 +279,50 @@ func TestLookupWithRedirect(t *testing.T) {
214279
assert.Equal(t, "pulsar://broker-1:6650", lr.PhysicalAddr.String())
215280
}
216281

282+
func TestTlsLookupWithRedirect(t *testing.T) {
283+
url, err := url.Parse("pulsar+ssl://example:6651")
284+
assert.NoError(t, err)
285+
286+
ls := NewLookupService(&mockedRPCClient{
287+
t: t,
288+
expectedURL: "pulsar+ssl://broker-2:6651",
289+
290+
expectedRequests: []pb.CommandLookupTopic{
291+
{
292+
RequestId: proto.Uint64(1),
293+
Topic: proto.String("my-topic"),
294+
Authoritative: proto.Bool(false),
295+
},
296+
{
297+
RequestId: proto.Uint64(2),
298+
Topic: proto.String("my-topic"),
299+
Authoritative: proto.Bool(true),
300+
},
301+
},
302+
mockedResponses: []pb.CommandLookupTopicResponse{
303+
{
304+
RequestId: proto.Uint64(1),
305+
Response: responseType(pb.CommandLookupTopicResponse_Redirect),
306+
Authoritative: proto.Bool(true),
307+
BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-2:6651"),
308+
},
309+
{
310+
RequestId: proto.Uint64(2),
311+
Response: responseType(pb.CommandLookupTopicResponse_Connect),
312+
Authoritative: proto.Bool(true),
313+
BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"),
314+
},
315+
},
316+
}, url, true)
317+
318+
lr, err := ls.Lookup("my-topic")
319+
assert.NoError(t, err)
320+
assert.NotNil(t, lr)
321+
322+
assert.Equal(t, "pulsar+ssl://broker-1:6651", lr.LogicalAddr.String())
323+
assert.Equal(t, "pulsar+ssl://broker-1:6651", lr.PhysicalAddr.String())
324+
}
325+
217326
func TestLookupWithInvalidUrlResponse(t *testing.T) {
218327
url, err := url.Parse("pulsar://example:6650")
219328
assert.NoError(t, err)
@@ -237,7 +346,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
237346
ProxyThroughServiceUrl: proto.Bool(false),
238347
},
239348
},
240-
}, url)
349+
}, url, false)
241350

242351
lr, err := ls.Lookup("my-topic")
243352
assert.Error(t, err)
@@ -265,7 +374,7 @@ func TestLookupWithLookupFailure(t *testing.T) {
265374
Authoritative: proto.Bool(true),
266375
},
267376
},
268-
}, url)
377+
}, url, false)
269378

270379
lr, err := ls.Lookup("my-topic")
271380
assert.Error(t, err)

0 commit comments

Comments
 (0)