Skip to content

Commit 44450b7

Browse files
committed
MLE-19222 Eval/invoke now stream results
Finally makes use of the OkHttp MultipartReader to stream body parts instead of reading them all into memory. There's no test to be added here, as the only way to verify this is to try with a sufficient amount of data to cause an OutOfMemoryError. That will be verified manually instead. The existing regression tests will suffice to ensure that eval/invoke still work properly.
1 parent b13710e commit 44450b7

File tree

4 files changed

+151
-18
lines changed

4 files changed

+151
-18
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright © 2024 MarkLogic Corporation. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.impl;
5+
6+
import java.io.ByteArrayOutputStream;
7+
import java.io.IOException;
8+
import java.io.InputStream;
9+
10+
public interface IoUtil {
11+
12+
/**
13+
* Tossing this commonly used logic here so that it can be reused. Can be removed when we drop Java 8 support, as
14+
* Java 9+ has a "readAllBytes" method.
15+
*/
16+
static byte[] streamToBytes(InputStream stream) throws IOException {
17+
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
18+
byte[] b = new byte[8192];
19+
int len = 0;
20+
while ((len = stream.read(b)) != -1) {
21+
buffer.write(b, 0, len);
22+
}
23+
buffer.flush();
24+
return buffer.toByteArray();
25+
}
26+
}

marklogic-client-api/src/main/java/com/marklogic/client/impl/OkHttpServices.java

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.marklogic.client.eval.EvalResultIterator;
2020
import com.marklogic.client.impl.okhttp.HttpUrlBuilder;
2121
import com.marklogic.client.impl.okhttp.OkHttpUtil;
22+
import com.marklogic.client.impl.okhttp.PartIterator;
2223
import com.marklogic.client.io.*;
2324
import com.marklogic.client.io.marker.*;
2425
import com.marklogic.client.query.*;
@@ -3836,7 +3837,29 @@ private <U extends OkHttpResultIterator> U postIteratedResourceImpl(
38363837

38373838
Response response = sendRequestWithRetry(requestBldr, (transaction == null), doPostFunction, resendableConsumer);
38383839
checkStatus(response, response.code(), "apply", "resource", path, ResponseStatus.OK_OR_CREATED_OR_NO_CONTENT);
3839-
return makeResults(constructor, reqlog, "apply", "resource", response);
3840+
3841+
boolean shouldStreamResults = "eval".equalsIgnoreCase(path) || "invoke".equalsIgnoreCase(path);
3842+
boolean hasDataToStream = response.body().contentLength() != 0;
3843+
// If body is empty, we can use the "old" way of reading results as there's nothing to stream.
3844+
return shouldStreamResults && hasDataToStream ?
3845+
evalAndStreamResults(reqlog, response) :
3846+
makeResults(constructor, reqlog, "apply", "resource", response);
3847+
}
3848+
3849+
/**
3850+
* Added to resolve MLE-19222, where the eval/invoke response was read into memory, leading to OutOfMemoryErrors.
3851+
* The one thing we are not able to do here though is check for errors in the trailers, as trailers cannot be
3852+
* read until the entire body has been read. But we don't want to read the entire body right away.
3853+
*/
3854+
private <U extends OkHttpResultIterator> U evalAndStreamResults(RequestLogger reqlog, Response response) {
3855+
if (response == null) return null;
3856+
try {
3857+
MultipartReader reader = new MultipartReader(response.body());
3858+
PartIterator partIterator = new PartIterator(reader);
3859+
return (U) new DefaultOkHttpResultIterator(reqlog, partIterator, response);
3860+
} catch (IOException e) {
3861+
throw new MarkLogicIOException(e);
3862+
}
38403863
}
38413864

38423865
@Override
@@ -4587,6 +4610,12 @@ static abstract class OkHttpResultIterator<T extends OkHttpResult> {
45874610
private long totalSize = -1;
45884611
private Closeable closeable;
45894612

4613+
OkHttpResultIterator(RequestLogger reqlog, Iterator<BodyPart> partIterator, Closeable closeable) {
4614+
this.reqlog = reqlog;
4615+
this.partQueue = partIterator;
4616+
this.closeable = closeable;
4617+
}
4618+
45904619
OkHttpResultIterator(RequestLogger reqlog, List<BodyPart> partList, Closeable closeable) {
45914620
this.reqlog = reqlog;
45924621
if (partList != null && partList.size() > 0) {
@@ -4685,14 +4714,15 @@ OkHttpServiceResult constructNext(RequestLogger logger, BodyPart part) {
46854714
}
46864715
}
46874716

4688-
static class DefaultOkHttpResultIterator
4689-
extends OkHttpResultIterator<OkHttpResult>
4690-
implements Iterator<OkHttpResult> {
4691-
DefaultOkHttpResultIterator(RequestLogger reqlog,
4692-
List<BodyPart> partList, Closeable closeable) {
4717+
static class DefaultOkHttpResultIterator extends OkHttpResultIterator<OkHttpResult> implements Iterator<OkHttpResult> {
4718+
DefaultOkHttpResultIterator(RequestLogger reqlog, List<BodyPart> partList, Closeable closeable) {
46934719
super(reqlog, partList, closeable);
46944720
}
46954721

4722+
DefaultOkHttpResultIterator(RequestLogger reqlog, Iterator<BodyPart> partIterator, Closeable closeable) {
4723+
super(reqlog, partIterator, closeable);
4724+
}
4725+
46964726
OkHttpResult constructNext(RequestLogger logger, BodyPart part) {
46974727
return new OkHttpResult(logger, part);
46984728
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright © 2024 MarkLogic Corporation. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.impl.okhttp;
5+
6+
import com.marklogic.client.MarkLogicIOException;
7+
import com.marklogic.client.impl.IoUtil;
8+
import jakarta.activation.DataHandler;
9+
import jakarta.mail.BodyPart;
10+
import jakarta.mail.MessagingException;
11+
import jakarta.mail.internet.MimeBodyPart;
12+
import jakarta.mail.util.ByteArrayDataSource;
13+
import okhttp3.Headers;
14+
import okhttp3.MultipartReader;
15+
16+
import java.io.IOException;
17+
import java.io.InputStream;
18+
import java.util.Iterator;
19+
20+
/**
21+
* Adapts the iterator over the OkHttp MultipartReader to conform to the Iterator that is required by
22+
* OkHttpEvalResultIterator. By converting each MultipartReader.Part into a jakarta.mail.BodyPart, we can reuse
23+
* all the existing plumbing that depends on jakarta.mail.BodyPart.
24+
* <p>
25+
* Added to resolve MLE-19222, where eval/invoke results are not being streamed but rather were all being read into
26+
* memory, leading to OutOfMemoryErrors.
27+
*/
28+
public class PartIterator implements Iterator<BodyPart> {
29+
30+
private final MultipartReader reader;
31+
private BodyPart nextBodyPart;
32+
33+
public PartIterator(MultipartReader reader) {
34+
this.reader = reader;
35+
readNextPart();
36+
}
37+
38+
@Override
39+
public boolean hasNext() {
40+
return nextBodyPart != null;
41+
}
42+
43+
@Override
44+
public BodyPart next() {
45+
BodyPart partToReturn = nextBodyPart;
46+
readNextPart();
47+
return partToReturn;
48+
}
49+
50+
private void readNextPart() {
51+
try {
52+
// See http://okhttp.foofun.cn/4.x/okhttp/okhttp3/-multipart-reader/ for more info on the OkHttp
53+
// MultipartReader. This was actually requested many moons ago by one of the original Java Client
54+
// developers - https://github.com/square/okhttp/issues/3394.
55+
MultipartReader.Part nextPart = reader.nextPart();
56+
this.nextBodyPart = nextPart != null ? convertPartToBodyPart(nextPart) : null;
57+
} catch (Exception e) {
58+
throw new MarkLogicIOException(e);
59+
}
60+
}
61+
62+
private static BodyPart convertPartToBodyPart(MultipartReader.Part part) throws IOException, MessagingException {
63+
MimeBodyPart bodyPart = new MimeBodyPart();
64+
65+
try {
66+
try (InputStream inputStream = part.body().inputStream()) {
67+
byte[] bytes = IoUtil.streamToBytes(inputStream);
68+
bodyPart.setDataHandler(new DataHandler(new ByteArrayDataSource(bytes, part.headers().get("Content-Type"))));
69+
}
70+
71+
// part.headers.toMultimap() is lowercasing header names, which causes later issues.
72+
Headers headers = part.headers();
73+
for (String headerName : headers.names()) {
74+
for (String headerValue : headers.values(headerName)) {
75+
bodyPart.addHeader(headerName, headerValue);
76+
}
77+
}
78+
return bodyPart;
79+
} finally {
80+
// Looking at the OkHttp source code, this does not appear necessary, as closing the InputStream above should
81+
// achieve the same effect. But there is no downside to doing this, as it may be required by a future version
82+
// of OkHttp.
83+
part.close();
84+
}
85+
}
86+
}

marklogic-client-api/src/main/java/com/marklogic/client/io/InputStreamHandle.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.io.InputStream;
1111
import java.nio.charset.StandardCharsets;
1212

13+
import com.marklogic.client.impl.IoUtil;
1314
import com.marklogic.client.io.marker.*;
1415
import org.slf4j.Logger;
1516
import org.slf4j.LoggerFactory;
@@ -45,8 +46,6 @@ public class InputStreamHandle
4546
private byte[] contentBytes;
4647
private InputStream content;
4748

48-
final static private int BUFFER_SIZE = 8192;
49-
5049
/**
5150
* Creates a factory to create an InputStreamHandle instance for an input stream.
5251
* @return the factory
@@ -186,17 +185,9 @@ public InputStream bytesToContent(byte[] buffer) {
186185
public byte[] contentToBytes(InputStream content) {
187186
try {
188187
if (content == null) return null;
189-
190-
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
191-
192-
byte[] b = new byte[BUFFER_SIZE];
193-
int len = 0;
194-
while ((len = content.read(b)) != -1) {
195-
buffer.write(b, 0, len);
196-
}
188+
byte[] bytes = IoUtil.streamToBytes(content);
197189
content.close();
198-
199-
return buffer.toByteArray();
190+
return bytes;
200191
} catch (IOException e) {
201192
throw new MarkLogicIOException(e);
202193
}

0 commit comments

Comments
 (0)