Skip to content

Commit bdc7248

Browse files
author
ninjazhou
committed
Support http lookup authentication(oauth2 and token)
1 parent f9427f6 commit bdc7248

File tree

6 files changed

+84
-9
lines changed

6 files changed

+84
-9
lines changed

src/Pulsar.Client/Api/AuthenticationDataProvider.fs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
namespace Pulsar.Client.Api
22

3+
open System.Collections.Generic
34
open Pulsar.Client.Common
45
open System.Text
56
open System.Security.Cryptography.X509Certificates
@@ -26,6 +27,18 @@ type AuthenticationDataProvider() =
2627
default this.GetCommandData() =
2728
""
2829

30+
//Http
31+
32+
abstract member HasDataForHttp: unit -> bool
33+
default this.HasDataForHttp() =
34+
false
35+
36+
abstract member GetHttpHeaders: unit -> IReadOnlyDictionary<string, string>
37+
default this.GetHttpHeaders() =
38+
Dictionary<string, string>()
39+
40+
//AuthDataProvider
41+
2942
abstract member Authenticate: AuthData -> AuthData
3043
default this.Authenticate authData =
3144
let bytes =

src/Pulsar.Client/Auth/AuthenticationDataToken.fs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
namespace Pulsar.Client.Auth
22

3+
open System.Collections.Generic
34
open Pulsar.Client.Api
45

56
type internal AuthenticationDataToken (supplier: unit -> string) =
@@ -9,4 +10,18 @@ type internal AuthenticationDataToken (supplier: unit -> string) =
910
true
1011

1112
override this.GetCommandData() =
12-
supplier()
13+
supplier()
14+
15+
override this.HasDataForHttp()=
16+
true
17+
18+
// Since AuthenticationOauth2 and AuthenticationToken both return this class when call GetAuthData()
19+
// We only need to realize this DataToken class GetHttpHeaders to realize http authentication
20+
override this.GetHttpHeaders()=
21+
let httpHeaderPropertiesDict = Dictionary<string, string>()
22+
httpHeaderPropertiesDict.Add("X-Pulsar-Auth-Method-Name", "token")
23+
httpHeaderPropertiesDict.Add("Authorization", "Bearer " + supplier())
24+
httpHeaderPropertiesDict
25+
26+
27+

src/Pulsar.Client/Internal/HttpLookupService.fs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@ open Pulsar.Client.Schema
1515

1616
type internal HttpLookupService (config: PulsarClientConfiguration, _connectionPool: ConnectionPool) =
1717

18-
let httpClient = new HttpClient(new SocketsHttpHandler(
19-
PooledConnectionLifetime = TimeSpan.FromMinutes(2),
20-
AllowAutoRedirect = true
21-
))
18+
let pulsarHttpClient = PulsarHttpClient(config)
19+
2220
let jsonOptions = JsonSerializerOptions(
2321
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
2422
)
@@ -93,7 +91,7 @@ type internal HttpLookupService (config: PulsarClientConfiguration, _connectionP
9391
let topicRestPath = topic.Replace("persistent://","persistent/").Replace("non-persistent://","non-persistent/")
9492
let! response =
9593
randomServiceUri.AbsoluteUri + $"admin/v2/%s{topicRestPath}/partitions?checkAllowAutoCreation=true"
96-
|> httpClient.GetStreamAsync
94+
|> pulsarHttpClient.GetStreamAsync
9795
|> Async.AwaitTask
9896
let brokerResponse = JsonSerializer.Deserialize<{| Partitions: int |}>(response, jsonOptions)
9997
return { Partitions = brokerResponse.Partitions }
@@ -113,7 +111,7 @@ type internal HttpLookupService (config: PulsarClientConfiguration, _connectionP
113111
let randomServiceUri = config.ServiceAddresses[RandomGenerator.Next(0, config.ServiceAddresses.Length)]
114112
let topic: string = %topicName
115113
let topicRestPath = topic.Replace("persistent://","persistent/").Replace("non-persistent://","non-persistent/")
116-
let! response = httpClient.GetStreamAsync (randomServiceUri.AbsoluteUri + $"lookup/v2/topic/%s{topicRestPath}")
114+
let! response = pulsarHttpClient.GetStreamAsync (randomServiceUri.AbsoluteUri + $"lookup/v2/topic/%s{topicRestPath}")
117115
let brokerResponse =
118116
JsonSerializer.Deserialize<{|
119117
BrokerUrl : string
@@ -142,7 +140,7 @@ type internal HttpLookupService (config: PulsarClientConfiguration, _connectionP
142140
| false -> "NON_PERSISTENT"
143141
let! response =
144142
randomServiceUri.AbsoluteUri + $"admin/v2/namespaces/%s{ns.ToString()}/topics?mode=%s{mode}"
145-
|> httpClient.GetStreamAsync
143+
|> pulsarHttpClient.GetStreamAsync
146144
|> Async.AwaitTask
147145
let brokerResponse = JsonSerializer.Deserialize<string seq>(response, jsonOptions)
148146
return brokerResponse
@@ -176,7 +174,7 @@ type internal HttpLookupService (config: PulsarClientConfiguration, _connectionP
176174
$"admin/v2/schemas/%s{topicRestPath}/schema"
177175
let! response =
178176
randomServiceUri.AbsoluteUri + path
179-
|> httpClient.GetStreamAsync
177+
|> pulsarHttpClient.GetStreamAsync
180178
|> Async.AwaitTask
181179
let schemaResponse =
182180
JsonSerializer.Deserialize<{|
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
namespace Pulsar.Client.Internal
2+
3+
open System.Net.Http
4+
open Pulsar.Client.Api
5+
open System
6+
7+
// This class is mainly used for http lookup service
8+
// We name this class `PulsarHttpClient` to avoid naming clash with native HttpClient,
9+
// and in Java it's just `HttpClient`
10+
type internal PulsarHttpClient (config: PulsarClientConfiguration) =
11+
12+
let authenticationDataProvider = config.Authentication.GetAuthData()
13+
14+
let httpClient = new HttpClient(new SocketsHttpHandler(
15+
PooledConnectionLifetime = TimeSpan.FromMinutes(2),
16+
AllowAutoRedirect = true
17+
))
18+
19+
member this.GetStreamAsync (requestUri: string) =
20+
backgroundTask {
21+
if authenticationDataProvider.HasDataForHttp() then
22+
let request = new HttpRequestMessage(HttpMethod.Get, requestUri)
23+
for headerPropertyEntry in authenticationDataProvider.GetHttpHeaders() do
24+
request.Headers.Add(headerPropertyEntry.Key, headerPropertyEntry.Value)
25+
let! response = httpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead)
26+
response.EnsureSuccessStatusCode() |> ignore
27+
return! response.Content.ReadAsStreamAsync()
28+
else
29+
return! httpClient.GetStreamAsync(requestUri)
30+
}
31+
32+
33+
34+
35+

src/Pulsar.Client/Pulsar.Client.fsproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@
139139
<Compile Include="Internal\EndPointResolver.fs" />
140140
<Compile Include="Internal\ILookupService.fs" />
141141
<Compile Include="Internal\BinaryLookupService.fs" />
142+
<Compile Include="Internal\PulsarHttpClient.fs" />
142143
<Compile Include="Internal\HttpLookupService.fs" />
143144
<Compile Include="Internal\ConnectionHandler.fs" />
144145
<Compile Include="Internal\AcknowledgmentsGroupingTracker.fs" />

tests/UnitTests/Api/PulsarClientBuilderTests.fs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,17 @@ module PulsarClientBuilderTests =
3939
"Service Url needs to be specified on the PulsarClientBuilder object."
4040
}
4141

42+
test "Http lookup authentication authDataProvider" {
43+
AuthenticationFactory.Token("test").GetAuthData().HasDataForHttp()
44+
|> Expect.equal "AuthenticationToken HasDataForHttp should be true" true
45+
46+
AuthenticationFactoryOAuth2.ClientCredentials(
47+
Uri("https://test.com"),
48+
"test",
49+
Uri("https://test.com")
50+
).GetAuthMethodName()
51+
|> Expect.equal "AuthenticationFactoryOAuth2 authData should be token" "token"
52+
}
53+
54+
4255
]

0 commit comments

Comments
 (0)