Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Protocol;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.TlsVersion;
import okhttp3.tls.HandshakeCertificates;
Expand Down Expand Up @@ -139,6 +140,20 @@ public AnalyticsOkHttpClient(ClusterOptions.Unmodifiable options, HttpUrl url, C
clientBuilder.sslSocketFactory(handshakeCertificates.sslSocketFactory(), handshakeCertificates.trustManager());
}

clientBuilder.addInterceptor(chain -> {
Request request = chain.request();

// get the value every time in case the credential is dynamic (deprecated)
String authorizationHeaderValue = credential.httpAuthorizationHeaderValue();
if (authorizationHeaderValue != null) {
request = request.newBuilder()
.header("Authorization", authorizationHeaderValue)
.build();
}

return chain.proceed(request);
});

this.client = clientBuilder.build();

int maxInFlightRequests = 128;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@

package com.couchbase.analytics.client.java;

import com.couchbase.analytics.client.java.internal.RawQueryMetadata;
import com.couchbase.jsonskiff.JsonStreamParser;
import org.jspecify.annotations.Nullable;

import java.io.Closeable;
import java.io.InputStream;
import java.util.List;
import java.util.function.Consumer;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;

class AnalyticsResponseParser implements Closeable {
Expand All @@ -38,26 +37,19 @@ public AnalyticsResponseParser(Consumer<byte[]> rowConsumer) {
this.parser = newStreamParser();
}

@Nullable String requestId;
byte @Nullable [] signature;
byte @Nullable [] plans;
byte @Nullable [] metrics;
byte @Nullable [] errors;
byte @Nullable [] warnings;
@Nullable String clientContextId;
@Nullable String status;
final RawQueryMetadata result = new RawQueryMetadata();

private JsonStreamParser newStreamParser() {
return JsonStreamParser.builder()
.doOnValue("/requestID", v -> requestId = v.readString())
.doOnValue("/signature", v -> signature = v.bytes())
.doOnValue("/plans", v -> plans = v.bytes())
.doOnValue("/clientContextID", v -> clientContextId = v.readString())
.doOnValue("/requestID", v -> result.requestId = v.readString())
.doOnValue("/signature", v -> result.signature = v.bytes())
.doOnValue("/plans", v -> result.plans = v.bytes())
.doOnValue("/clientContextID", v -> result.clientContextId = v.readString())
.doOnValue("/results/-", v -> rowConsumer.accept(v.bytes()))
.doOnValue("/status", v -> status = v.readString())
.doOnValue("/metrics", v -> metrics = v.bytes())
.doOnValue("/warnings", v -> warnings = v.bytes())
.doOnValue("/errors", v -> fail(errors = v.bytes()))
.doOnValue("/status", v -> result.status = v.readString())
.doOnValue("/metrics", v -> result.metrics = v.bytes())
.doOnValue("/warnings", v -> result.warnings = v.bytes())
.doOnValue("/errors", v -> fail(result.errors = v.bytes()))
.build();
}

Expand Down Expand Up @@ -96,21 +88,8 @@ public void close() {
parser.close();
}

static @Nullable String newString(byte @Nullable [] array) {
return array == null ? null : new String(array, UTF_8);
}

@Override
public String toString() {
return "AnalyticsResponseParser{" +
"requestId='" + requestId + '\'' +
", signature=" + newString(signature) +
", plans=" + newString(plans) +
", metrics=" + newString(metrics) +
", errors=" + newString(errors) +
", warnings=" + newString(warnings) +
", clientContextId='" + clientContextId + '\'' +
", status='" + status + '\'' +
'}';
return result.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.couchbase.analytics.client.java;

import com.couchbase.analytics.client.java.internal.Certificates;
import com.couchbase.analytics.client.java.internal.RawQueryMetadata;
import com.couchbase.analytics.client.java.internal.utils.BuilderPropertySetter;
import okhttp3.HttpUrl;
import org.slf4j.Logger;
Expand Down Expand Up @@ -94,7 +95,6 @@ private static QueryExecutor newQueryExecutor(
credential
),
url,
credential,
options
);
}
Expand Down Expand Up @@ -267,7 +267,8 @@ public QueryResult executeQuery(String statement, Consumer<QueryOptions> options
@Override
public QueryMetadata executeStreamingQuery(String statement, Consumer<Row> rowAction, Consumer<QueryOptions> options) {
try {
return queryExecutor.executeStreamingQueryWithRetry(null, statement, rowAction, options);
RawQueryMetadata rawMetadata = queryExecutor.executeStreamingQueryWithRetry(null, statement, rowAction, options);
return new QueryMetadata(rawMetadata);

} catch (QueryException e) {
// Expected, so omit uninteresting noise from the JSON stream parser.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
/*
* Copyright 2026 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.couchbase.analytics.client.java;

import com.couchbase.analytics.client.java.codec.Deserializer;
import com.couchbase.analytics.client.java.internal.InternalJacksonSerDes;
import com.couchbase.analytics.client.java.internal.RawQueryMetadata;
import com.couchbase.analytics.client.java.internal.ThreadSafe;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.ResponseBody;
import okio.BufferedSink;
import org.jetbrains.annotations.ApiStatus;
import org.jspecify.annotations.Nullable;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.function.Consumer;

import static com.couchbase.analytics.client.java.internal.utils.lang.CbStrings.removeStart;
import static java.util.Objects.requireNonNull;

/**
* An HTTP client intended for use by other Couchbase libraries.
* <p>
* NOT PART OF THE PUBLIC API! This class may change without notice.
*/
@ThreadSafe
@ApiStatus.Internal
public class InternalUnsupportedHttpClient {
private final Cluster cluster;
private final HttpUrl baseUrl;

public static InternalUnsupportedHttpClient from(Cluster cluster) {
HttpUrl url = cluster.queryExecutor.url;
return new InternalUnsupportedHttpClient(
cluster,
new HttpUrl.Builder()
.scheme(url.scheme())
.host(url.host())
.port(url.port())
.build()
);
}

private InternalUnsupportedHttpClient(
Cluster cluster,
HttpUrl baseUrl
) {
this.cluster = requireNonNull(cluster);

this.baseUrl = new HttpUrl.Builder()
.scheme(baseUrl.scheme())
.host(baseUrl.host())
.port(baseUrl.port())
.build();
}

/**
* Executes an arbitrary HTTP request.
*
* @throws AnalyticsTimeoutException if request times out.
* @throws CancellationException if thread is interrupted.
* @throws AnalyticsException for all other IO errors.
*/
public Response execute(
Consumer<RequestBuilder> requestCustomizer,
Duration timeout
) {
RequestBuilder builder = new RequestBuilder(baseUrl.toString());
requestCustomizer.accept(builder);

return new Response(
cluster.queryExecutor.executeRaw(
builder.wrapped.build(),
timeout
)
);
}

/**
* Executes an HTTP request for an Analytics query.
*
* @throws QueryException if response has a non-empty "errors" field.
* @throws AnalyticsTimeoutException if request times out.
* @throws CancellationException if thread is interrupted.
* @throws AnalyticsException for all other IO errors.
*/
public RawQueryMetadata executeStreaming(
Consumer<RequestBuilder> requestCustomizer,
Duration timeout,
Consumer<Row> rowAction,
@Nullable Deserializer deserializer
) {
RequestBuilder builder = new RequestBuilder(baseUrl.toString());
requestCustomizer.accept(builder);

return cluster.queryExecutor.executeStreamingQueryOnce(
builder.wrapped,
timeout,
rowAction,
deserializer == null ? InternalJacksonSerDes.INSTANCE : deserializer
);
}

public static class Response implements Closeable {
private final okhttp3.Response wrapped;

Response(okhttp3.Response wrapped) {
this.wrapped = wrapped;
}

@Override
public void close() {
wrapped.close();
}

public int httpStatusCode() {
return wrapped.code();
}

/**
* Returns an input stream over the bytes of the body,
* or null if the response does not have a body.
* <p>
* Must not be called more than once.
*/
public @Nullable InputStream bodyInputStream() {
ResponseBody body = wrapped.body();
return body == null ? null : body.byteStream();
}

/**
* Returns the response body as a string,
* or null if the response does not have a body.
*
* @throws UncheckedIOException if there was an error reading the response body.
*/
public @Nullable String bodyAsString() {
try {
ResponseBody body = wrapped.body();
return body == null ? null : body.string();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

public static class RequestBuilder {
private final String baseUrl;

RequestBuilder(String baseUrl) {
this.baseUrl = baseUrl.endsWith("/")
? baseUrl
: baseUrl + "/";
}

private final Request.Builder wrapped = new Request.Builder();

/**
* Sets the path component (and query string, if applicable).
* <p>
* Caller is responsible for ensuring the input is correctly URI-encoded.
*
* @param path pre-encoded path and query
*/
public RequestBuilder path(String path) {
wrapped.url(baseUrl + removeStart(path, "/"));
return this;
}

public RequestBuilder header(String name, String value) {
wrapped.header(name, value);
return this;
}

private static final MediaType JSON = requireNonNull(MediaType.parse("application/json"));

public RequestBuilder postJson(byte[] body) {
wrapped.post(new RequestBody() {
@Override
public long contentLength() {
return body.length;
}

@Override
public MediaType contentType() {
return JSON;
}

@Override
public void writeTo(BufferedSink bufferedSink) throws IOException {
bufferedSink.write(body);
}
});

return this;
}

public RequestBuilder delete() {
wrapped.delete();
return this;
}

public RequestBuilder get() {
wrapped.get();
return this;
}
}
}
Loading