Skip to content

Commit 933c82d

Browse files
committed
JCO-35 Expose an internal API for the JDBC driver to use
Motivation ---------- The JDBC driver needs to make different HTTP requests than the standard Analytics SDK. Rather than expand the SDK's public API, we'll expose an internal low-level client the JDBC driver can use to execute custom HTTP requests. Modifications ------------- Add `InternalUnsupportedHttpClient` with methods for executing ad hoc HTTP requests and streaming query results. Use an interceptor to add the "Authorization" header, so all requests get authenticated regardless of code path. Extract `RawQueryMetadata` out of `AnalyticsResponseParser` to minimize the API surface area exposed to the JDBC driver. The internal methods that previously returned `QueryMetadata` now return `RawQueryMetadata` -- which has all the fields required by the JDBC driver.
1 parent 837414c commit 933c82d

File tree

8 files changed

+362
-52
lines changed

8 files changed

+362
-52
lines changed

couchbase-analytics-java-client/src/main/java/com/couchbase/analytics/client/java/AnalyticsOkHttpClient.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import okhttp3.HttpUrl;
2929
import okhttp3.OkHttpClient;
3030
import okhttp3.Protocol;
31+
import okhttp3.Request;
3132
import okhttp3.Response;
3233
import okhttp3.TlsVersion;
3334
import okhttp3.tls.HandshakeCertificates;
@@ -139,6 +140,20 @@ public AnalyticsOkHttpClient(ClusterOptions.Unmodifiable options, HttpUrl url, C
139140
clientBuilder.sslSocketFactory(handshakeCertificates.sslSocketFactory(), handshakeCertificates.trustManager());
140141
}
141142

143+
clientBuilder.addInterceptor(chain -> {
144+
Request request = chain.request();
145+
146+
// get the value every time in case the credential is dynamic (deprecated)
147+
String authorizationHeaderValue = credential.httpAuthorizationHeaderValue();
148+
if (authorizationHeaderValue != null) {
149+
request = request.newBuilder()
150+
.header("Authorization", authorizationHeaderValue)
151+
.build();
152+
}
153+
154+
return chain.proceed(request);
155+
});
156+
142157
this.client = clientBuilder.build();
143158

144159
int maxInFlightRequests = 128;

couchbase-analytics-java-client/src/main/java/com/couchbase/analytics/client/java/AnalyticsResponseParser.java

Lines changed: 11 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,14 @@
1616

1717
package com.couchbase.analytics.client.java;
1818

19+
import com.couchbase.analytics.client.java.internal.RawQueryMetadata;
1920
import com.couchbase.jsonskiff.JsonStreamParser;
20-
import org.jspecify.annotations.Nullable;
2121

2222
import java.io.Closeable;
2323
import java.io.InputStream;
2424
import java.util.List;
2525
import java.util.function.Consumer;
2626

27-
import static java.nio.charset.StandardCharsets.UTF_8;
2827
import static java.util.Objects.requireNonNull;
2928

3029
class AnalyticsResponseParser implements Closeable {
@@ -38,26 +37,19 @@ public AnalyticsResponseParser(Consumer<byte[]> rowConsumer) {
3837
this.parser = newStreamParser();
3938
}
4039

41-
@Nullable String requestId;
42-
byte @Nullable [] signature;
43-
byte @Nullable [] plans;
44-
byte @Nullable [] metrics;
45-
byte @Nullable [] errors;
46-
byte @Nullable [] warnings;
47-
@Nullable String clientContextId;
48-
@Nullable String status;
40+
final RawQueryMetadata result = new RawQueryMetadata();
4941

5042
private JsonStreamParser newStreamParser() {
5143
return JsonStreamParser.builder()
52-
.doOnValue("/requestID", v -> requestId = v.readString())
53-
.doOnValue("/signature", v -> signature = v.bytes())
54-
.doOnValue("/plans", v -> plans = v.bytes())
55-
.doOnValue("/clientContextID", v -> clientContextId = v.readString())
44+
.doOnValue("/requestID", v -> result.requestId = v.readString())
45+
.doOnValue("/signature", v -> result.signature = v.bytes())
46+
.doOnValue("/plans", v -> result.plans = v.bytes())
47+
.doOnValue("/clientContextID", v -> result.clientContextId = v.readString())
5648
.doOnValue("/results/-", v -> rowConsumer.accept(v.bytes()))
57-
.doOnValue("/status", v -> status = v.readString())
58-
.doOnValue("/metrics", v -> metrics = v.bytes())
59-
.doOnValue("/warnings", v -> warnings = v.bytes())
60-
.doOnValue("/errors", v -> fail(errors = v.bytes()))
49+
.doOnValue("/status", v -> result.status = v.readString())
50+
.doOnValue("/metrics", v -> result.metrics = v.bytes())
51+
.doOnValue("/warnings", v -> result.warnings = v.bytes())
52+
.doOnValue("/errors", v -> fail(result.errors = v.bytes()))
6153
.build();
6254
}
6355

@@ -96,21 +88,8 @@ public void close() {
9688
parser.close();
9789
}
9890

99-
static @Nullable String newString(byte @Nullable [] array) {
100-
return array == null ? null : new String(array, UTF_8);
101-
}
102-
10391
@Override
10492
public String toString() {
105-
return "AnalyticsResponseParser{" +
106-
"requestId='" + requestId + '\'' +
107-
", signature=" + newString(signature) +
108-
", plans=" + newString(plans) +
109-
", metrics=" + newString(metrics) +
110-
", errors=" + newString(errors) +
111-
", warnings=" + newString(warnings) +
112-
", clientContextId='" + clientContextId + '\'' +
113-
", status='" + status + '\'' +
114-
'}';
93+
return result.toString();
11594
}
11695
}

couchbase-analytics-java-client/src/main/java/com/couchbase/analytics/client/java/Cluster.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.couchbase.analytics.client.java;
1818

1919
import com.couchbase.analytics.client.java.internal.Certificates;
20+
import com.couchbase.analytics.client.java.internal.RawQueryMetadata;
2021
import com.couchbase.analytics.client.java.internal.utils.BuilderPropertySetter;
2122
import okhttp3.HttpUrl;
2223
import org.slf4j.Logger;
@@ -94,7 +95,6 @@ private static QueryExecutor newQueryExecutor(
9495
credential
9596
),
9697
url,
97-
credential,
9898
options
9999
);
100100
}
@@ -267,7 +267,8 @@ public QueryResult executeQuery(String statement, Consumer<QueryOptions> options
267267
@Override
268268
public QueryMetadata executeStreamingQuery(String statement, Consumer<Row> rowAction, Consumer<QueryOptions> options) {
269269
try {
270-
return queryExecutor.executeStreamingQueryWithRetry(null, statement, rowAction, options);
270+
RawQueryMetadata rawMetadata = queryExecutor.executeStreamingQueryWithRetry(null, statement, rowAction, options);
271+
return new QueryMetadata(rawMetadata);
271272

272273
} catch (QueryException e) {
273274
// Expected, so omit uninteresting noise from the JSON stream parser.
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
/*
2+
* Copyright 2026 Couchbase, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.couchbase.analytics.client.java;
18+
19+
import com.couchbase.analytics.client.java.codec.Deserializer;
20+
import com.couchbase.analytics.client.java.internal.InternalJacksonSerDes;
21+
import com.couchbase.analytics.client.java.internal.RawQueryMetadata;
22+
import com.couchbase.analytics.client.java.internal.ThreadSafe;
23+
import okhttp3.HttpUrl;
24+
import okhttp3.MediaType;
25+
import okhttp3.Request;
26+
import okhttp3.RequestBody;
27+
import okhttp3.ResponseBody;
28+
import okio.BufferedSink;
29+
import org.jetbrains.annotations.ApiStatus;
30+
import org.jspecify.annotations.Nullable;
31+
32+
import java.io.Closeable;
33+
import java.io.IOException;
34+
import java.io.InputStream;
35+
import java.io.UncheckedIOException;
36+
import java.time.Duration;
37+
import java.util.concurrent.CancellationException;
38+
import java.util.function.Consumer;
39+
40+
import static com.couchbase.analytics.client.java.internal.utils.lang.CbStrings.removeStart;
41+
import static java.util.Objects.requireNonNull;
42+
43+
/**
44+
* An HTTP client intended for use by other Couchbase libraries.
45+
* <p>
46+
* NOT PART OF THE PUBLIC API! This class may change without notice.
47+
*/
48+
@ThreadSafe
49+
@ApiStatus.Internal
50+
public class InternalUnsupportedHttpClient {
51+
private final Cluster cluster;
52+
private final HttpUrl baseUrl;
53+
54+
public static InternalUnsupportedHttpClient from(Cluster cluster) {
55+
HttpUrl url = cluster.queryExecutor.url;
56+
return new InternalUnsupportedHttpClient(
57+
cluster,
58+
new HttpUrl.Builder()
59+
.scheme(url.scheme())
60+
.host(url.host())
61+
.port(url.port())
62+
.build()
63+
);
64+
}
65+
66+
private InternalUnsupportedHttpClient(
67+
Cluster cluster,
68+
HttpUrl baseUrl
69+
) {
70+
this.cluster = requireNonNull(cluster);
71+
72+
this.baseUrl = new HttpUrl.Builder()
73+
.scheme(baseUrl.scheme())
74+
.host(baseUrl.host())
75+
.port(baseUrl.port())
76+
.build();
77+
}
78+
79+
/**
80+
* Executes an arbitrary HTTP request.
81+
*
82+
* @throws AnalyticsTimeoutException if request times out.
83+
* @throws CancellationException if thread is interrupted.
84+
* @throws AnalyticsException for all other IO errors.
85+
*/
86+
public Response execute(
87+
Consumer<RequestBuilder> requestCustomizer,
88+
Duration timeout
89+
) {
90+
RequestBuilder builder = new RequestBuilder(baseUrl.toString());
91+
requestCustomizer.accept(builder);
92+
93+
return new Response(
94+
cluster.queryExecutor.executeRaw(
95+
builder.wrapped.build(),
96+
timeout
97+
)
98+
);
99+
}
100+
101+
/**
102+
* Executes an HTTP request for an Analytics query.
103+
*
104+
* @throws QueryException if response has a non-empty "errors" field.
105+
* @throws AnalyticsTimeoutException if request times out.
106+
* @throws CancellationException if thread is interrupted.
107+
* @throws AnalyticsException for all other IO errors.
108+
*/
109+
public RawQueryMetadata executeStreaming(
110+
Consumer<RequestBuilder> requestCustomizer,
111+
Duration timeout,
112+
Consumer<Row> rowAction,
113+
@Nullable Deserializer deserializer
114+
) {
115+
RequestBuilder builder = new RequestBuilder(baseUrl.toString());
116+
requestCustomizer.accept(builder);
117+
118+
return cluster.queryExecutor.executeStreamingQueryOnce(
119+
builder.wrapped,
120+
timeout,
121+
rowAction,
122+
deserializer == null ? InternalJacksonSerDes.INSTANCE : deserializer
123+
);
124+
}
125+
126+
public static class Response implements Closeable {
127+
private final okhttp3.Response wrapped;
128+
129+
Response(okhttp3.Response wrapped) {
130+
this.wrapped = wrapped;
131+
}
132+
133+
@Override
134+
public void close() {
135+
wrapped.close();
136+
}
137+
138+
public int httpStatusCode() {
139+
return wrapped.code();
140+
}
141+
142+
/**
143+
* Returns an input stream over the bytes of the body,
144+
* or null if the response does not have a body.
145+
* <p>
146+
* Must not be called more than once.
147+
*/
148+
public @Nullable InputStream bodyInputStream() {
149+
ResponseBody body = wrapped.body();
150+
return body == null ? null : body.byteStream();
151+
}
152+
153+
/**
154+
* Returns the response body as a string,
155+
* or null if the response does not have a body.
156+
*
157+
* @throws UncheckedIOException if there was an error reading the response body.
158+
*/
159+
public @Nullable String bodyAsString() {
160+
try {
161+
ResponseBody body = wrapped.body();
162+
return body == null ? null : body.string();
163+
} catch (IOException e) {
164+
throw new UncheckedIOException(e);
165+
}
166+
}
167+
}
168+
169+
public static class RequestBuilder {
170+
private final String baseUrl;
171+
172+
RequestBuilder(String baseUrl) {
173+
this.baseUrl = baseUrl.endsWith("/")
174+
? baseUrl
175+
: baseUrl + "/";
176+
}
177+
178+
private final Request.Builder wrapped = new Request.Builder();
179+
180+
/**
181+
* Sets the path component (and query string, if applicable).
182+
* <p>
183+
* Caller is responsible for ensuring the input is correctly URI-encoded.
184+
*
185+
* @param path pre-encoded path and query
186+
*/
187+
public RequestBuilder path(String path) {
188+
wrapped.url(baseUrl + removeStart(path, "/"));
189+
return this;
190+
}
191+
192+
public RequestBuilder header(String name, String value) {
193+
wrapped.header(name, value);
194+
return this;
195+
}
196+
197+
private static final MediaType JSON = requireNonNull(MediaType.parse("application/json"));
198+
199+
public RequestBuilder postJson(byte[] body) {
200+
wrapped.post(new RequestBody() {
201+
@Override
202+
public long contentLength() {
203+
return body.length;
204+
}
205+
206+
@Override
207+
public MediaType contentType() {
208+
return JSON;
209+
}
210+
211+
@Override
212+
public void writeTo(BufferedSink bufferedSink) throws IOException {
213+
bufferedSink.write(body);
214+
}
215+
});
216+
217+
return this;
218+
}
219+
220+
public RequestBuilder delete() {
221+
wrapped.delete();
222+
return this;
223+
}
224+
225+
public RequestBuilder get() {
226+
wrapped.get();
227+
return this;
228+
}
229+
}
230+
}

0 commit comments

Comments
 (0)