Skip to content

Commit 2dc895b

Browse files
authored
Support inputstream for request and response bodies (#138)
## Changes This PR adds support for non-application/json requests and responses. This PR is divided into two parts: **Support setting headers per request.** The main methods of the ApiClient interface are expanded to accept one new parameter,`Map<String, String> headers`, and all callers are expected to pass a map of headers to be included in the request. As the allowed content types for requests and responses are known from the OpenAPI specification, impls must construct the request header map in code generation and pass it to the ApiClient. The default "Content-Type: application/json" header is removed, as each request should specify its own Content-Type and Accept headers. (This is merged in #135.) **Add support for streaming request and response bodies to support non-application/json requests/responses.** Today, serialization of requests is done in the ApiClient. This implies that ApiClient needs to be able to serialize a request purely based on the parameters provided to it via headers, the request body, etc. In this proposal, the serialization is coupled to the request type: InputStream indicates that the client has already serialized the request, so it can be provided directly to the underlying HttpClient library, and any other type will be passed through Jackson's ObjectMapper for serialization to JSON. Internally, request and response bodies are modeled as InputStreams rather than byte arrays to support streaming requests and responses, though for non-streaming requests and responses, a string value for the body is captured in the `debugBody` field of Request and Response, respectively. One important caveat about streaming responses is that callers are required to close the streams when they are done using them. This releases the underlying HTTP connection back to the connection pool for use in subsequent requests. Otherwise, customers could encounter deadlocks waiting for connections. For this, we recommend that users use the try-with-resources model as demonstrated in integration test FilesIT.java. ## Tests - [x] Integration test for Files IT exercises uploading and downloading files using the streaming pathways. - [x] Existing integration tests cover the pre-existing non-streaming pathways.
1 parent fb6089f commit 2dc895b

File tree

16 files changed

+230
-67
lines changed

16 files changed

+230
-67
lines changed

.codegen/api.java.tmpl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package com.databricks.sdk.service.{{.Package.Name}};
33

44
import java.io.IOException;
5+
import java.io.InputStream;
56
import java.util.Collection;
67
import java.util.Map;
78
import java.time.Duration;

.codegen/impl.java.tmpl

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package com.databricks.sdk.service.{{.Package.Name}};
33

44
import java.io.IOException;
5+
import java.io.InputStream;
56
import java.util.Collection;
67
import java.util.Map;
78
import java.util.HashMap;
@@ -27,23 +28,33 @@ class {{.PascalName}}Impl implements {{.PascalName}}Service {
2728
String path = {{if .PathParts -}}
2829
String.format("{{range .PathParts}}{{.Prefix}}{{if or .Field .IsAccountId}}%s{{end}}{{ end }}"{{ range .PathParts }}{{if .Field}}, request.get{{.Field.PascalName}}(){{ else if .IsAccountId }}, apiClient.configuredAccountID(){{end}}{{ end }})
2930
{{- else}}"{{.Path}}"{{end}};
30-
{{ template "headers" . }}
31-
{{if .Response -}}
32-
{{- if .Response.ArrayValue -}} return apiClient.getCollection(path, null, {{template "type" .Response.ArrayValue}}.class, headers);
33-
{{- else if .Response.MapValue -}} return apiClient.getStringMap(path, request, headers);
34-
{{- else -}} return apiClient.{{.Verb}}(path{{if .Request}}, request{{end}}{{if .Response -}}, {{if .Response.ArrayValue }}Collection
35-
{{- else if .Response.MapValue }}Map
36-
{{- else}}{{template "type" .Response}}
37-
{{- end -}}{{else}}, Void{{end}}.class, headers);{{end}}
38-
{{- else -}}apiClient.{{.Verb}}(path{{if .Request}}, request{{end}}{{if .Response -}}, {{if .Response.ArrayValue }}Collection
39-
{{- else if .Response.MapValue }}Map
40-
{{- else}}{{template "type" .Response}}
41-
{{- end -}}{{else}}, Void{{end}}.class, headers);
42-
{{end}}
31+
{{ template "headers" . -}}
32+
{{ if not .Response -}}
33+
{{ template "api-call" . }}
34+
{{- else if .Response.ArrayValue -}} return apiClient.getCollection(path, null, {{template "type" .Response.ArrayValue}}.class, headers);
35+
{{- else if .Response.MapValue -}} return apiClient.getStringMap(path, {{ template "request-param" .}}, headers);
36+
{{- else if .IsResponseByteStream -}}
37+
InputStream response = {{ template "api-call" . }}
38+
return new {{ .Response.PascalName }}().set{{.ResponseBodyField.PascalName}}(response);
39+
{{- else }}return {{ template "api-call" . }}
40+
{{- end}}
4341
}
4442
{{end}}
4543
}
4644

45+
{{ define "api-call" }}
46+
apiClient.{{.Verb}}(path
47+
{{- if .Request}}, {{ template "request-param" .}}{{end}}
48+
, {{ if not .Response -}}Void
49+
{{- else if .IsResponseByteStream}}InputStream
50+
{{- else}}{{template "type" .Response}}{{- end -}}.class
51+
, headers);
52+
{{- end }}
53+
54+
{{ define "request-param" -}}
55+
request{{ if .RequestBodyField }}.get{{.RequestBodyField.PascalName}}(){{end}}
56+
{{- end }}
57+
4758
{{ define "headers" -}}
4859
Map<String, String> headers = new HashMap<>();
4960
{{- range $key, $value := .FixedRequestHeaders }}

.codegen/interface.java.tmpl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package com.databricks.sdk.service.{{.Package.Name}};
33

44
import java.util.Collection;
55
import java.util.Map;
6+
import java.io.InputStream;
67

78
import com.databricks.sdk.support.Generated;
89

@@ -33,6 +34,7 @@ public interface {{.PascalName}}Service {
3334
{{- else if .IsInt}}long
3435
{{- else if .ArrayValue }}Collection<{{template "type" .ArrayValue}}>
3536
{{- else if .MapValue }}Map<String,{{template "type" .MapValue}}>
37+
{{- else if .IsByteStream}}InputStream
3638
{{- else if .IsObject }}{{.PascalName}}
3739
{{- else if .IsExternal }}com.databricks.sdk.service.{{.Package.Name}}.{{.PascalName}}
3840
{{- else if .Enum }}{{.PascalName}}

.codegen/model.java.tmpl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package com.databricks.sdk.service.{{.Package.Name}};
44

55
import java.util.Map;
6+
import java.io.InputStream;
67

78
import com.fasterxml.jackson.annotation.JsonProperty;
89
import java.util.Collection;
@@ -87,6 +88,7 @@ public enum {{.PascalName}}{
8788
{{- else if .IsInt}}Long
8889
{{- else if .ArrayValue }}Collection<{{template "type" .ArrayValue}}>
8990
{{- else if .MapValue }}Map<String,{{template "type" .MapValue}}>
91+
{{- else if .IsByteStream}}InputStream
9092
{{- else if .IsObject }}{{.PascalName}}
9193
{{- else if .IsExternal }}com.databricks.sdk.service.{{.Package.Name}}.{{.PascalName}}
9294
{{- else if .Enum }}{{.PascalName}}

databricks-sdk-java/src/main/java/com/databricks/sdk/core/ApiClient.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@
1212
import com.fasterxml.jackson.databind.JavaType;
1313
import com.fasterxml.jackson.databind.ObjectMapper;
1414
import com.fasterxml.jackson.databind.SerializationFeature;
15+
import java.io.ByteArrayInputStream;
1516
import java.io.IOException;
17+
import java.io.InputStream;
18+
import java.nio.charset.StandardCharsets;
1619
import java.util.*;
1720
import org.slf4j.Logger;
1821
import org.slf4j.LoggerFactory;
@@ -182,9 +185,14 @@ private <I> Request prepareBaseRequest(String method, String path, I in)
182185
throws JsonProcessingException {
183186
if (in == null || !hasBody(method)) {
184187
return new Request(method, path);
188+
} else if (InputStream.class.isAssignableFrom(in.getClass())) {
189+
InputStream body = (InputStream) in;
190+
String debugBody = "<InputStream>";
191+
return new Request(method, path, body, debugBody);
185192
} else {
186-
String body = serialize(in);
187-
return new Request(method, path, body);
193+
String debugBody = serialize(in);
194+
InputStream body = new ByteArrayInputStream(debugBody.getBytes(StandardCharsets.UTF_8));
195+
return new Request(method, path, body, debugBody);
188196
}
189197
}
190198

@@ -295,7 +303,7 @@ private String makeLogRecord(Request in, Response out) {
295303
in.getHeaders()
296304
.forEach((header, value) -> sb.append(String.format("\n * %s: %s", header, value)));
297305
}
298-
String requestBody = in.getBody();
306+
String requestBody = in.getDebugBody();
299307
if (requestBody != null && !requestBody.isEmpty()) {
300308
for (String line : bodyLogger.redactedDump(requestBody).split("\n")) {
301309
sb.append("\n> ");
@@ -304,18 +312,24 @@ private String makeLogRecord(Request in, Response out) {
304312
}
305313
sb.append("\n< ");
306314
sb.append(out.toString());
307-
for (String line : bodyLogger.redactedDump(out.getBody()).split("\n")) {
315+
for (String line : bodyLogger.redactedDump(out.getDebugBody()).split("\n")) {
308316
sb.append("\n< ");
309317
sb.append(line);
310318
}
311319
return sb.toString();
312320
}
313321

314-
public <T> T deserialize(String body, Class<T> target) throws IOException {
322+
public <T> T deserialize(InputStream body, Class<T> target) throws IOException {
323+
if (target == InputStream.class) {
324+
return (T) body;
325+
}
315326
return mapper.readValue(body, target);
316327
}
317328

318-
public <T> T deserialize(String body, JavaType target) throws IOException {
329+
public <T> T deserialize(InputStream body, JavaType target) throws IOException {
330+
if (target == mapper.constructType(InputStream.class)) {
331+
return (T) body;
332+
}
319333
return mapper.readValue(body, target);
320334
}
321335

databricks-sdk-java/src/main/java/com/databricks/sdk/core/BodyLogger.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ public String redactedDump(String body) {
4848
Object result = recursiveMarshal(rootNode, maxBytes);
4949
return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(result);
5050
} catch (JsonProcessingException e) {
51-
// Unable to unmarshal means the body isn't JSON (or something else)
52-
return String.format("[unable to marshal: %s]", e.getMessage());
51+
// Unable to unmarshal means the body isn't JSON
52+
return onlyNBytes(body, maxBytes);
5353
}
5454
}
5555

databricks-sdk-java/src/main/java/com/databricks/sdk/core/commons/CommonsHttpClient.java

Lines changed: 58 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package com.databricks.sdk.core.commons;
22

3+
import static org.apache.http.entity.ContentType.APPLICATION_JSON;
4+
5+
import com.databricks.sdk.core.DatabricksException;
36
import com.databricks.sdk.core.http.HttpClient;
47
import com.databricks.sdk.core.http.Request;
58
import com.databricks.sdk.core.http.Response;
9+
import com.databricks.sdk.core.utils.CustomCloseInputStream;
610
import java.io.IOException;
711
import java.io.InputStream;
8-
import java.io.UnsupportedEncodingException;
9-
import java.nio.charset.Charset;
12+
import java.nio.charset.StandardCharsets;
1013
import java.util.Arrays;
1114
import java.util.List;
1215
import java.util.Map;
@@ -17,7 +20,7 @@
1720
import org.apache.http.StatusLine;
1821
import org.apache.http.client.config.RequestConfig;
1922
import org.apache.http.client.methods.*;
20-
import org.apache.http.entity.StringEntity;
23+
import org.apache.http.entity.InputStreamEntity;
2124
import org.apache.http.impl.client.CloseableHttpClient;
2225
import org.apache.http.impl.client.HttpClientBuilder;
2326
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
@@ -53,22 +56,56 @@ private CloseableHttpClient makeClosableHttpClient() {
5356
public Response execute(Request in) throws IOException {
5457
HttpUriRequest request = transformRequest(in);
5558
in.getHeaders().forEach(request::setHeader);
56-
try (CloseableHttpResponse response = hc.execute(request)) {
57-
HttpEntity entity = response.getEntity();
58-
StatusLine statusLine = response.getStatusLine();
59-
Map<String, List<String>> hs =
60-
Arrays.stream(response.getAllHeaders())
61-
.collect(
62-
Collectors.groupingBy(
63-
NameValuePair::getName,
64-
Collectors.mapping(NameValuePair::getValue, Collectors.toList())));
65-
String body = null;
66-
if (entity != null) {
67-
try (InputStream inputStream = entity.getContent()) {
68-
body = IOUtils.toString(inputStream, Charset.defaultCharset());
69-
}
70-
}
59+
CloseableHttpResponse response = hc.execute(request);
60+
return computeResponse(in, response);
61+
}
62+
63+
private Response computeResponse(Request in, CloseableHttpResponse response) throws IOException {
64+
HttpEntity entity = response.getEntity();
65+
StatusLine statusLine = response.getStatusLine();
66+
Map<String, List<String>> hs =
67+
Arrays.stream(response.getAllHeaders())
68+
.collect(
69+
Collectors.groupingBy(
70+
NameValuePair::getName,
71+
Collectors.mapping(NameValuePair::getValue, Collectors.toList())));
72+
if (entity == null) {
73+
response.close();
74+
return new Response(in, statusLine.getStatusCode(), statusLine.getReasonPhrase(), hs);
75+
}
76+
77+
// The Databricks SDK is currently designed to treat all non-application/json responses as
78+
// InputStreams, leaving the caller to decide how to read and parse the response. The caller
79+
// is responsible for closing the InputStream to release the HTTP Connection.
80+
//
81+
// The client only streams responses when the caller has explicitly requested a non-JSON
82+
// response and the server has responded with a non-JSON Content-Type. The Databricks API
83+
// error response is either JSON or HTML and is safe to read fully into memory.
84+
boolean streamResponse =
85+
in.getHeaders().containsKey("Accept")
86+
&& !APPLICATION_JSON.getMimeType().equals(in.getHeaders().get("Accept"))
87+
&& hs.containsKey("Content-Type")
88+
&& !APPLICATION_JSON.getMimeType().equals(hs.get("Content-Type").get(0));
89+
if (streamResponse) {
90+
CustomCloseInputStream inputStream =
91+
new CustomCloseInputStream(
92+
entity.getContent(),
93+
() -> {
94+
try {
95+
response.close();
96+
} catch (Exception e) {
97+
throw new DatabricksException("Unable to close connection", e);
98+
}
99+
});
100+
return new Response(
101+
in, statusLine.getStatusCode(), statusLine.getReasonPhrase(), hs, inputStream);
102+
}
103+
104+
try (InputStream inputStream = entity.getContent()) {
105+
String body = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
71106
return new Response(in, statusLine.getStatusCode(), statusLine.getReasonPhrase(), hs, body);
107+
} finally {
108+
response.close();
72109
}
73110
}
74111

@@ -89,12 +126,8 @@ private HttpUriRequest transformRequest(Request in) {
89126
}
90127
}
91128

92-
private HttpRequestBase withEntity(HttpEntityEnclosingRequestBase request, String body) {
93-
try {
94-
request.setEntity(new StringEntity(body));
95-
return request;
96-
} catch (UnsupportedEncodingException e) {
97-
throw new IllegalArgumentException(e);
98-
}
129+
private HttpRequestBase withEntity(HttpEntityEnclosingRequestBase request, InputStream body) {
130+
request.setEntity(new InputStreamEntity(body));
131+
return request;
99132
}
100133
}

databricks-sdk-java/src/main/java/com/databricks/sdk/core/error/ApiErrors.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package com.databricks.sdk.core.error;
22

33
import com.databricks.sdk.core.DatabricksError;
4+
import com.databricks.sdk.core.DatabricksException;
45
import com.databricks.sdk.core.http.Response;
56
import com.fasterxml.jackson.databind.ObjectMapper;
6-
import java.io.IOException;
7+
import java.io.*;
8+
import java.nio.charset.StandardCharsets;
79
import java.util.regex.Matcher;
810
import java.util.regex.Pattern;
11+
import org.apache.commons.io.IOUtils;
912

1013
/** Helper methods for inspecting the response and errors thrown during API requests. */
1114
public class ApiErrors {
@@ -55,13 +58,20 @@ private static DatabricksError readErrorFromResponse(Response response) {
5558
*/
5659
private static ApiErrorBody parseApiError(Response response) {
5760
try {
58-
return MAPPER.readValue(response.getBody(), ApiErrorBody.class);
61+
// Read the body now, so we can try to parse as JSON and then fallback to old error handling
62+
// logic.
63+
String body = IOUtils.toString(response.getBody(), StandardCharsets.UTF_8);
64+
try {
65+
return MAPPER.readValue(body, ApiErrorBody.class);
66+
} catch (IOException e) {
67+
return parseUnknownError(response, body, e);
68+
}
5969
} catch (IOException e) {
60-
return parseUnknownError(response, e);
70+
throw new DatabricksException("Unable to read response body", e);
6171
}
6272
}
6373

64-
private static ApiErrorBody parseUnknownError(Response response, IOException err) {
74+
private static ApiErrorBody parseUnknownError(Response response, String body, IOException err) {
6575
ApiErrorBody errorBody = new ApiErrorBody();
6676
String[] statusParts = response.getStatus().split(" ", 2);
6777
if (statusParts.length < 2) {
@@ -71,14 +81,13 @@ private static ApiErrorBody parseUnknownError(Response response, IOException err
7181
errorBody.setErrorCode(errorCode.replaceAll(" ", "_").toUpperCase());
7282
}
7383

74-
Matcher messageMatcher = HTML_ERROR_REGEX.matcher(response.getBody());
84+
Matcher messageMatcher = HTML_ERROR_REGEX.matcher(body);
7585
if (messageMatcher.find()) {
7686
errorBody.setMessage(messageMatcher.group(1).replaceAll("^[ .]+|[ .]+$", ""));
7787
} else {
7888
errorBody.setMessage(
7989
String.format(
80-
"Response from server (%s) %s: %s",
81-
response.getStatus(), response.getBody(), err.getMessage()));
90+
"Response from server (%s) %s: %s", response.getStatus(), body, err.getMessage()));
8291
}
8392
return errorBody;
8493
}

0 commit comments

Comments
 (0)