Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<src.dir>src/main/java</src.dir>
<!-- dependency -->
<annotations.version>13.0</annotations.version>
<awaitility.version>4.2.2</awaitility.version>
<embedded-pulsar.version>0.0.5</embedded-pulsar.version>
<http-facade.version>0.3.0</http-facade.version>
<jackson.version>2.17.2</jackson.version>
<junit.version>5.11.0</junit.version>
<log4j.version>2.20.0</log4j.version>
Expand Down Expand Up @@ -69,6 +71,11 @@
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>${annotations.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.util.List;

public class JacksonService {
Expand All @@ -18,26 +21,26 @@ public class JacksonService {
MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

public static String toJson(Object o) throws JsonProcessingException {
return MAPPER.writeValueAsString(o);
public static byte[] toBytes(@Nullable Object o) throws JsonProcessingException {
return o == null ? null : MAPPER.writeValueAsBytes(o);
}

public static <T> T toObject(String json, Class<T> type) throws JsonProcessingException {
if (json == null || json.isEmpty()) {
public static <T> T toObject(@Nullable byte[] json, @NotNull Class<T> type) throws IOException {
if (json == null || json.length == 0) {
return null;
}
return MAPPER.readValue(json, type);
}

public static <T> T toRefer(String json, TypeReference<T> ref) throws JsonProcessingException {
if (json == null || json.isEmpty()) {
public static <T> T toRefer(byte[] json, TypeReference<T> ref) throws IOException {
if (json == null || json.length == 0) {
return null;
}
return MAPPER.readValue(json, ref);
}

public static <T> List<T> toList(String json, TypeReference<List<T>> typeRef) throws JsonProcessingException {
if (json == null || json.isEmpty()) {
public static <T> List<T> toList(byte[] json, TypeReference<List<T>> typeRef) throws IOException {
if (json == null || json.length == 0) {
return List.of();
}
return MAPPER.readValue(json, typeRef);
Expand Down
5 changes: 5 additions & 0 deletions pulsar-admin-jdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
<artifactId>pulsar-admin-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.github.openfacade</groupId>
<artifactId>http-facade</artifactId>
<version>${http-facade.version}</version>
</dependency>
</dependencies>

<build>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.github.protocol.pulsar.admin.jdk;

import java.net.http.HttpResponse;
import io.github.openfacade.http.HttpResponse;

public class Brokers {
private final InnerHttpClient innerHttpClient;
Expand All @@ -15,12 +15,12 @@ public void healthcheck(TopicVersion topicVersion) throws PulsarAdminException {
url += "?topicVersion=" + topicVersion;
}
try {
HttpResponse<String> httpResponse = innerHttpClient.get(url);
HttpResponse httpResponse = innerHttpClient.get(url);
if (httpResponse.statusCode() != 200) {
throw new PulsarAdminException("healthcheck failed, status code: " + httpResponse.statusCode(),
httpResponse.statusCode());
}
if (!httpResponse.body().equals("ok")) {
if (!httpResponse.bodyAsString().equals("ok")) {
throw new PulsarAdminException("healthcheck failed, body: " + httpResponse.body());
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package io.github.protocol.pulsar.admin.jdk;

import com.fasterxml.jackson.core.type.TypeReference;
import io.github.openfacade.http.HttpResponse;
import io.github.protocol.pulsar.admin.common.JacksonService;

import java.io.IOException;
import java.net.http.HttpResponse;
import java.util.List;
import java.util.concurrent.ExecutionException;

public class Clusters {

Expand All @@ -17,15 +18,14 @@ public Clusters(InnerHttpClient httpClient) {

public List<String> getClusters() throws PulsarAdminException {
try {
HttpResponse<String> response = httpClient.get(UrlConst.CLUSTERS);
HttpResponse response = httpClient.get(UrlConst.CLUSTERS);
if (response.statusCode() != 200) {
throw new PulsarAdminException(
String.format("failed to get list of clusters, "
+ "status code %s, body : %s", response.statusCode(), response.body()));
}
return JacksonService.toRefer(response.body(), new TypeReference<List<String>>() {
});
} catch (IOException | InterruptedException e) {
return JacksonService.toRefer(response.body(), new TypeReference<>() {});
} catch (IOException | InterruptedException | ExecutionException e) {
throw new PulsarAdminException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,128 +1,124 @@
package io.github.protocol.pulsar.admin.jdk;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.github.openfacade.http.HttpClient;
import io.github.openfacade.http.HttpClientConfig;
import io.github.openfacade.http.HttpClientEngine;
import io.github.openfacade.http.HttpClientFactory;
import io.github.openfacade.http.HttpMethod;
import io.github.openfacade.http.HttpRequest;
import io.github.openfacade.http.HttpResponse;
import io.github.openfacade.http.HttpSchema;
import io.github.openfacade.http.TlsConfig;
import io.github.openfacade.http.UrlBuilder;
import io.github.protocol.pulsar.admin.api.Configuration;
import io.github.protocol.pulsar.admin.common.JacksonService;

import java.io.IOException;
import java.net.URI;
import java.net.URLEncoder;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

public class InnerHttpClient {
private final Configuration conf;

private final HttpClient client;

private final String httpPrefix;
private UrlBuilder templateUrlBuilder;

public InnerHttpClient(Configuration conf) {
this.conf = conf;
HttpClient.Builder builder = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_1_1);
HttpClientConfig.Builder clientConfigBuilder = new HttpClientConfig.Builder();
clientConfigBuilder.engine(HttpClientEngine.JAVA);
if (conf.tlsEnabled) {
builder = builder
.sslContext(SslContextUtil.build(conf.tlsConfig));
this.httpPrefix = "https://" + conf.host + ":" + conf.port;
} else {
this.httpPrefix = "http://" + conf.host + ":" + conf.port;
TlsConfig.Builder tlsConfigBuilder = new TlsConfig.Builder();
io.github.protocol.pulsar.admin.api.TlsConfig tlsConfig = conf.tlsConfig;
tlsConfigBuilder.cipherSuites(tlsConfig.cipherSuites);
tlsConfigBuilder.hostnameVerifyDisabled(tlsConfig.hostnameVerifyDisabled);
tlsConfigBuilder.keyStore(tlsConfig.keyStorePath, tlsConfig.keyStorePassword);
tlsConfigBuilder.trustStore(tlsConfig.trustStorePath, tlsConfig.trustStorePassword);
tlsConfigBuilder.verifyDisabled(tlsConfig.verifyDisabled);
clientConfigBuilder.tlsConfig(tlsConfigBuilder.build());
}
this.client = builder.build();
this.client = HttpClientFactory.createHttpClient(clientConfigBuilder.build());
templateUrlBuilder = new UrlBuilder();
templateUrlBuilder.setHttpSchema(conf.tlsEnabled ? HttpSchema.HTTPS : HttpSchema.HTTP).setHost(conf.host)
.setPort(conf.port);
}

public HttpResponse<String> get(String url) throws IOException, InterruptedException {
return this.get(url, new String[0]);
public HttpResponse post(String url) throws IOException, InterruptedException, ExecutionException {
return this.innerPost(url, new byte[0]);
}

public HttpResponse<String> get(String url, String... requestParams)
throws IOException, InterruptedException {
HttpRequest request = HttpRequest.newBuilder()
.uri(getUri(url, requestParams))
.GET()
.build();
return client.send(request, HttpResponse.BodyHandlers.ofString());
public HttpResponse post(String url, Object body) throws IOException, InterruptedException, ExecutionException {
return this.innerPost(url, objectToBytes(body));
}

public HttpResponse<String> post(String url, Object body, String... params)
throws IOException, InterruptedException {
return this.post(url, objectToString(body), params);
public HttpResponse post(String url, Object body, String... params)
throws IOException, ExecutionException, InterruptedException {
return this.innerPost(url, objectToBytes(body), params);
}

public HttpResponse<String> post(String url, String body, String... params)
throws IOException, InterruptedException {
HttpRequest request = HttpRequest.newBuilder()
.uri(getUri(url, params))
.POST(HttpRequest.BodyPublishers.ofString(body == null ? "" : body))
.setHeader("Content-Type", "application/json")
.build();
return client.send(request, HttpResponse.BodyHandlers.ofString());
private HttpResponse innerPost(String url, byte[] body, String... params)
throws InterruptedException, ExecutionException {
Map<String, List<String>> headers = new HashMap<>();
headers.put("Content-Type", List.of("application/json"));
HttpRequest request = new HttpRequest(concatUrlWithParams(url, params), HttpMethod.POST, headers, body);
return client.send(request).get();
}

public HttpResponse<String> post(String url) throws IOException, InterruptedException {
return this.post(url, "");
public HttpResponse put(String url) throws IOException, InterruptedException, ExecutionException {
return this.innerPut(url, new byte[0]);
}

public HttpResponse<String> put(String url) throws IOException, InterruptedException {
return this.put(url, "");
public HttpResponse put(String url, Object body) throws IOException, InterruptedException, ExecutionException {
return this.innerPut(url, objectToBytes(body));
}

public HttpResponse<String> put(String url, Object body, String... params)
throws IOException, InterruptedException {
return this.put(url, objectToString(body), params);
public HttpResponse put(String url, Object body, String... params)
throws IOException, InterruptedException, ExecutionException {
return this.innerPut(url, objectToBytes(body), params);
}

public HttpResponse<String> put(String url, String body, String... params)
throws IOException, InterruptedException {
HttpRequest request = HttpRequest.newBuilder()
.uri(getUri(url, params))
.PUT(HttpRequest.BodyPublishers.ofString(body == null ? "" : body))
.setHeader("Content-Type", "application/json")
.build();
return client.send(request, HttpResponse.BodyHandlers.ofString());
private HttpResponse innerPut(String url, byte[] body, String... params)
throws InterruptedException, ExecutionException {
Map<String, List<String>> headers = new HashMap<>();
headers.put("Content-Type", List.of("application/json"));
HttpRequest request = new HttpRequest(concatUrlWithParams(url, params), HttpMethod.PUT, headers, body);
return client.send(request).get();
}

public HttpResponse<String> delete(String url, String... requestParams)
throws IOException, InterruptedException {
HttpRequest request = HttpRequest.newBuilder()
.uri(getUri(url, requestParams))
.DELETE()
.build();
return client.send(request, HttpResponse.BodyHandlers.ofString());
public HttpResponse delete(String url, String... params)
throws IOException, InterruptedException, ExecutionException {
HttpRequest request = new HttpRequest(concatUrlWithParams(url, params), HttpMethod.DELETE);
return client.send(request).get();
}

private URI getUri(String url, String... params) {
return URI.create(this.httpPrefix + url + mapToParams(params));
public HttpResponse get(String url, String... params) throws IOException, InterruptedException, ExecutionException {
HttpRequest request = new HttpRequest(concatUrlWithParams(url, params), HttpMethod.GET);
return client.send(request).get();
}

static String mapToParams(String... requestParams) {
private List<UrlBuilder.Param> convertListToParams(String... requestParams) {
if (requestParams.length % 2 != 0) {
throw new IllegalArgumentException("params list length cannot be odd");
}
if (requestParams.length == 0) {
return "";
}
StringBuilder res = new StringBuilder("?");
res.append(requestParams[0]);
res.append('=');
res.append(requestParams[1]);
for (int i = 2; i < requestParams.length; ) {
res.append('&');
res.append(encode(requestParams[i++]));
res.append('=');
res.append(encode(requestParams[i++]));
List<UrlBuilder.Param> queryParams = new ArrayList<>();
for (int i = 0; i < requestParams.length; i = i + 2) {
queryParams.add(new UrlBuilder.Param(requestParams[i], requestParams[i + 1]));
}
return res.toString();
return queryParams;
}

private String objectToString(Object obj) throws JsonProcessingException {
return obj == null ? "" : JacksonService.toJson(obj);
private byte[] objectToBytes(Object obj) throws JsonProcessingException {
return obj == null ? new byte[0] : JacksonService.toBytes(obj);
}

private static String encode(String value) {
return URLEncoder.encode(value, StandardCharsets.UTF_8);
private String concatUrlWithParams(String url, String... params) {
UrlBuilder urlBuilder = templateUrlBuilder.duplicate();
urlBuilder.setPath(url);
if (params != null && params.length > 0) {
urlBuilder.setQueryParams(convertListToParams(params));
}
return urlBuilder.build();
}

}
Loading
Loading