Skip to content

Commit 9a20a17

Browse files
authored
Merge pull request #1732 from marklogic/feature/19222-oom
MLE-19222 Eval/invoke now stream results
2 parents b13710e + 44450b7 commit 9a20a17

File tree

4 files changed

+151
-18
lines changed

4 files changed

+151
-18
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright © 2024 MarkLogic Corporation. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.impl;
5+
6+
import java.io.ByteArrayOutputStream;
7+
import java.io.IOException;
8+
import java.io.InputStream;
9+
10+
public interface IoUtil {
11+
12+
/**
13+
* Tossing this commonly used logic here so that it can be reused. Can be removed when we drop Java 8 support, as
14+
* Java 9+ has a "readAllBytes" method.
15+
*/
16+
static byte[] streamToBytes(InputStream stream) throws IOException {
17+
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
18+
byte[] b = new byte[8192];
19+
int len = 0;
20+
while ((len = stream.read(b)) != -1) {
21+
buffer.write(b, 0, len);
22+
}
23+
buffer.flush();
24+
return buffer.toByteArray();
25+
}
26+
}

marklogic-client-api/src/main/java/com/marklogic/client/impl/OkHttpServices.java

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.marklogic.client.eval.EvalResultIterator;
2020
import com.marklogic.client.impl.okhttp.HttpUrlBuilder;
2121
import com.marklogic.client.impl.okhttp.OkHttpUtil;
22+
import com.marklogic.client.impl.okhttp.PartIterator;
2223
import com.marklogic.client.io.*;
2324
import com.marklogic.client.io.marker.*;
2425
import com.marklogic.client.query.*;
@@ -3836,7 +3837,29 @@ private <U extends OkHttpResultIterator> U postIteratedResourceImpl(
38363837

38373838
Response response = sendRequestWithRetry(requestBldr, (transaction == null), doPostFunction, resendableConsumer);
38383839
checkStatus(response, response.code(), "apply", "resource", path, ResponseStatus.OK_OR_CREATED_OR_NO_CONTENT);
3839-
return makeResults(constructor, reqlog, "apply", "resource", response);
3840+
3841+
boolean shouldStreamResults = "eval".equalsIgnoreCase(path) || "invoke".equalsIgnoreCase(path);
3842+
boolean hasDataToStream = response.body().contentLength() != 0;
3843+
// If body is empty, we can use the "old" way of reading results as there's nothing to stream.
3844+
return shouldStreamResults && hasDataToStream ?
3845+
evalAndStreamResults(reqlog, response) :
3846+
makeResults(constructor, reqlog, "apply", "resource", response);
3847+
}
3848+
3849+
/**
3850+
* Added to resolve MLE-19222, where the eval/invoke response was read into memory, leading to OutOfMemoryErrors.
3851+
* The one thing we are not able to do here though is check for errors in the trailers, as trailers cannot be
3852+
* read until the entire body has been read. But we don't want to read the entire body right away.
3853+
*/
3854+
private <U extends OkHttpResultIterator> U evalAndStreamResults(RequestLogger reqlog, Response response) {
3855+
if (response == null) return null;
3856+
try {
3857+
MultipartReader reader = new MultipartReader(response.body());
3858+
PartIterator partIterator = new PartIterator(reader);
3859+
return (U) new DefaultOkHttpResultIterator(reqlog, partIterator, response);
3860+
} catch (IOException e) {
3861+
throw new MarkLogicIOException(e);
3862+
}
38403863
}
38413864

38423865
@Override
@@ -4587,6 +4610,12 @@ static abstract class OkHttpResultIterator<T extends OkHttpResult> {
45874610
private long totalSize = -1;
45884611
private Closeable closeable;
45894612

4613+
OkHttpResultIterator(RequestLogger reqlog, Iterator<BodyPart> partIterator, Closeable closeable) {
4614+
this.reqlog = reqlog;
4615+
this.partQueue = partIterator;
4616+
this.closeable = closeable;
4617+
}
4618+
45904619
OkHttpResultIterator(RequestLogger reqlog, List<BodyPart> partList, Closeable closeable) {
45914620
this.reqlog = reqlog;
45924621
if (partList != null && partList.size() > 0) {
@@ -4685,14 +4714,15 @@ OkHttpServiceResult constructNext(RequestLogger logger, BodyPart part) {
46854714
}
46864715
}
46874716

4688-
static class DefaultOkHttpResultIterator
4689-
extends OkHttpResultIterator<OkHttpResult>
4690-
implements Iterator<OkHttpResult> {
4691-
DefaultOkHttpResultIterator(RequestLogger reqlog,
4692-
List<BodyPart> partList, Closeable closeable) {
4717+
static class DefaultOkHttpResultIterator extends OkHttpResultIterator<OkHttpResult> implements Iterator<OkHttpResult> {
4718+
DefaultOkHttpResultIterator(RequestLogger reqlog, List<BodyPart> partList, Closeable closeable) {
46934719
super(reqlog, partList, closeable);
46944720
}
46954721

4722+
DefaultOkHttpResultIterator(RequestLogger reqlog, Iterator<BodyPart> partIterator, Closeable closeable) {
4723+
super(reqlog, partIterator, closeable);
4724+
}
4725+
46964726
OkHttpResult constructNext(RequestLogger logger, BodyPart part) {
46974727
return new OkHttpResult(logger, part);
46984728
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright © 2024 MarkLogic Corporation. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.impl.okhttp;
5+
6+
import com.marklogic.client.MarkLogicIOException;
7+
import com.marklogic.client.impl.IoUtil;
8+
import jakarta.activation.DataHandler;
9+
import jakarta.mail.BodyPart;
10+
import jakarta.mail.MessagingException;
11+
import jakarta.mail.internet.MimeBodyPart;
12+
import jakarta.mail.util.ByteArrayDataSource;
13+
import okhttp3.Headers;
14+
import okhttp3.MultipartReader;
15+
16+
import java.io.IOException;
17+
import java.io.InputStream;
18+
import java.util.Iterator;
19+
20+
/**
21+
* Adapts the iterator over the OkHttp MultipartReader to conform to the Iterator that is required by
22+
* OkHttpEvalResultIterator. By converting each MultipartReader.Part into a jakarta.mail.BodyPart, we can reuse
23+
* all the existing plumbing that depends on jakarta.mail.BodyPart.
24+
* <p>
25+
* Added to resolve MLE-19222, where eval/invoke results are not being streamed but rather were all being read into
26+
* memory, leading to OutOfMemoryErrors.
27+
*/
28+
public class PartIterator implements Iterator<BodyPart> {
29+
30+
private final MultipartReader reader;
31+
private BodyPart nextBodyPart;
32+
33+
public PartIterator(MultipartReader reader) {
34+
this.reader = reader;
35+
readNextPart();
36+
}
37+
38+
@Override
39+
public boolean hasNext() {
40+
return nextBodyPart != null;
41+
}
42+
43+
@Override
44+
public BodyPart next() {
45+
BodyPart partToReturn = nextBodyPart;
46+
readNextPart();
47+
return partToReturn;
48+
}
49+
50+
private void readNextPart() {
51+
try {
52+
// See http://okhttp.foofun.cn/4.x/okhttp/okhttp3/-multipart-reader/ for more info on the OkHttp
53+
// MultipartReader. This was actually requested many moons ago by one of the original Java Client
54+
// developers - https://github.com/square/okhttp/issues/3394.
55+
MultipartReader.Part nextPart = reader.nextPart();
56+
this.nextBodyPart = nextPart != null ? convertPartToBodyPart(nextPart) : null;
57+
} catch (Exception e) {
58+
throw new MarkLogicIOException(e);
59+
}
60+
}
61+
62+
private static BodyPart convertPartToBodyPart(MultipartReader.Part part) throws IOException, MessagingException {
63+
MimeBodyPart bodyPart = new MimeBodyPart();
64+
65+
try {
66+
try (InputStream inputStream = part.body().inputStream()) {
67+
byte[] bytes = IoUtil.streamToBytes(inputStream);
68+
bodyPart.setDataHandler(new DataHandler(new ByteArrayDataSource(bytes, part.headers().get("Content-Type"))));
69+
}
70+
71+
// part.headers.toMultimap() is lowercasing header names, which causes later issues.
72+
Headers headers = part.headers();
73+
for (String headerName : headers.names()) {
74+
for (String headerValue : headers.values(headerName)) {
75+
bodyPart.addHeader(headerName, headerValue);
76+
}
77+
}
78+
return bodyPart;
79+
} finally {
80+
// Looking at the OkHttp source code, this does not appear necessary, as closing the InputStream above should
81+
// achieve the same effect. But there is no downside to doing this, as it may be required by a future version
82+
// of OkHttp.
83+
part.close();
84+
}
85+
}
86+
}

marklogic-client-api/src/main/java/com/marklogic/client/io/InputStreamHandle.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.io.InputStream;
1111
import java.nio.charset.StandardCharsets;
1212

13+
import com.marklogic.client.impl.IoUtil;
1314
import com.marklogic.client.io.marker.*;
1415
import org.slf4j.Logger;
1516
import org.slf4j.LoggerFactory;
@@ -45,8 +46,6 @@ public class InputStreamHandle
4546
private byte[] contentBytes;
4647
private InputStream content;
4748

48-
final static private int BUFFER_SIZE = 8192;
49-
5049
/**
5150
* Creates a factory to create an InputStreamHandle instance for an input stream.
5251
* @return the factory
@@ -186,17 +185,9 @@ public InputStream bytesToContent(byte[] buffer) {
186185
public byte[] contentToBytes(InputStream content) {
187186
try {
188187
if (content == null) return null;
189-
190-
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
191-
192-
byte[] b = new byte[BUFFER_SIZE];
193-
int len = 0;
194-
while ((len = content.read(b)) != -1) {
195-
buffer.write(b, 0, len);
196-
}
188+
byte[] bytes = IoUtil.streamToBytes(content);
197189
content.close();
198-
199-
return buffer.toByteArray();
190+
return bytes;
200191
} catch (IOException e) {
201192
throw new MarkLogicIOException(e);
202193
}

0 commit comments

Comments
 (0)