Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.FunctionState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import javax.ws.rs.core.Response;
import org.apache.pulsar.client.admin.Packages;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.packages.management.core.common.PackageMetadata;
import org.apache.pulsar.packages.management.core.common.PackageName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,14 @@
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.admin.Transactions;
import org.apache.pulsar.client.admin.Worker;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnectorProvider;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.internal.http.AsyncHttpConnector;
import org.apache.pulsar.client.internal.http.AsyncHttpConnectorProvider;
import org.apache.pulsar.common.net.ServiceURI;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
Expand Down Expand Up @@ -124,8 +125,10 @@ public PulsarAdminImpl(String serviceUrl, ClientConfigurationData clientConfigDa
clientConfigData.setServiceUrl(serviceUrl);
}

PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver();
pulsarServiceNameResolver.updateServiceUrl(serviceUrl);
AsyncHttpConnectorProvider asyncConnectorProvider = new AsyncHttpConnectorProvider(clientConfigData,
clientConfigData.getAutoCertRefreshSeconds(), acceptGzipCompression);
clientConfigData.getAutoCertRefreshSeconds(), acceptGzipCompression, pulsarServiceNameResolver);

ClientConfig httpConfig = new ClientConfig();
httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Sink;
import org.apache.pulsar.client.admin.Sinks;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.io.ConnectorDefinition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Source;
import org.apache.pulsar.client.admin.Sources;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.io.ConnectorDefinition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin.internal.http;
package org.apache.pulsar.client.internal.http;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
Expand Down Expand Up @@ -44,6 +44,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.util.FutureUtil;
import org.asynchttpclient.Request;
Expand Down Expand Up @@ -174,9 +176,11 @@ public void testShouldStopRetriesWhenTimeoutOccurs() throws IOException, Executi
Executor delayedExecutor = runnable -> {
scheduledExecutor.schedule(runnable, requestTimeout, TimeUnit.MILLISECONDS);
};
PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver();
pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl());
@Cleanup
AsyncHttpConnector connector = new AsyncHttpConnector(5000, requestTimeout,
requestTimeout, 0, conf, false) {
requestTimeout, 0, conf, false, pulsarServiceNameResolver) {
@Override
protected CompletableFuture<Response> oneShot(InetSocketAddress host, ClientRequest request) {
// delay the response to simulate a timeout
Expand Down Expand Up @@ -214,7 +218,7 @@ public void failure(Throwable failure) {
}

@Test
void testMaxRedirects() {
void testMaxRedirects() throws InvalidServiceURL {
// Redirect to itself to test max redirects
server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
.willReturn(aResponse()
Expand All @@ -224,9 +228,11 @@ void testMaxRedirects() {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("http://localhost:" + server.port());

PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver();
pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl());
@Cleanup
AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
5000, 0, conf, false);
5000, 0, conf, false, pulsarServiceNameResolver);

Request request = new RequestBuilder("GET")
.setUrl("http://localhost:" + server.port() + "/admin/v2/clusters")
Expand All @@ -243,21 +249,21 @@ void testMaxRedirects() {
}

@Test
void testRelativeRedirect() throws ExecutionException, InterruptedException {
void testRelativeRedirect() throws ExecutionException, InterruptedException, InvalidServiceURL {
doTestRedirect("path2");
}

@Test
void testAbsoluteRedirect() throws ExecutionException, InterruptedException {
void testAbsoluteRedirect() throws ExecutionException, InterruptedException, InvalidServiceURL {
doTestRedirect("/path2");
}

@Test
void testUrlRedirect() throws ExecutionException, InterruptedException {
void testUrlRedirect() throws ExecutionException, InterruptedException, InvalidServiceURL {
doTestRedirect("http://localhost:" + server.port() + "/path2");
}

private void doTestRedirect(String location) throws InterruptedException, ExecutionException {
private void doTestRedirect(String location) throws InterruptedException, ExecutionException, InvalidServiceURL {
server.stubFor(get(urlEqualTo("/path1"))
.willReturn(aResponse()
.withStatus(301)
Expand All @@ -270,9 +276,11 @@ private void doTestRedirect(String location) throws InterruptedException, Execut
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("http://localhost:" + server.port());

PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver();
pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl());
@Cleanup
AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
5000, 0, conf, false);
5000, 0, conf, false, pulsarServiceNameResolver);

Request request = new RequestBuilder("GET")
.setUrl("http://localhost:" + server.port() + "/path1")
Expand All @@ -283,7 +291,7 @@ private void doTestRedirect(String location) throws InterruptedException, Execut
}

@Test
void testRedirectWithBody() throws ExecutionException, InterruptedException {
void testRedirectWithBody() throws ExecutionException, InterruptedException, InvalidServiceURL {
server.stubFor(post(urlEqualTo("/path1"))
.willReturn(aResponse()
.withStatus(307)
Expand All @@ -296,9 +304,12 @@ void testRedirectWithBody() throws ExecutionException, InterruptedException {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("http://localhost:" + server.port());

PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver();
pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl());
@Cleanup
AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
5000, 0, conf, false);
5000, 0, conf, false, pulsarServiceNameResolver);


Request request = new RequestBuilder("POST")
.setUrl("http://localhost:" + server.port() + "/path1")
Expand All @@ -310,7 +321,7 @@ void testRedirectWithBody() throws ExecutionException, InterruptedException {
}

@Test
void testMaxConnections() throws ExecutionException, InterruptedException {
void testMaxConnections() throws ExecutionException, InterruptedException, InvalidServiceURL {
server.stubFor(post(urlEqualTo("/concurrency-test"))
.willReturn(aResponse()
.withTransformers("concurrency-test")));
Expand All @@ -320,9 +331,11 @@ void testMaxConnections() throws ExecutionException, InterruptedException {
conf.setConnectionsPerBroker(maxConnections);
conf.setServiceUrl("http://localhost:" + server.port());

PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver();
pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl());
@Cleanup
AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
5000, 0, conf, false);
5000, 0, conf, false, pulsarServiceNameResolver);

Request request = new RequestBuilder("POST")
.setUrl("http://localhost:" + server.port() + "/concurrency-test")
Expand Down
22 changes: 22 additions & 0 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,21 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-multipart</artifactId>
</dependency>

<!-- Testing dependencies -->
<dependency>
<groupId>${project.groupId}</groupId>
Expand Down Expand Up @@ -217,6 +232,13 @@
<artifactId>fastutil</artifactId>
</dependency>

<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8</artifactId>
<version>${wiremock.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Loading
Loading