Skip to content

Commit 8c325b9

Browse files
authored
pkgs/ok_http: Stream response bodies. (#1233)
1 parent e2e2170 commit 8c325b9

File tree

5 files changed

+447
-26
lines changed

5 files changed

+447
-26
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
package com.example.ok_http
6+
7+
import java.io.IOException
8+
import java.io.InputStream
9+
import java.util.concurrent.ExecutorService
10+
import java.util.concurrent.Executors
11+
import java.util.concurrent.Future
12+
13+
14+
/**
15+
* Callback interface utilized by the [AsyncInputStreamReader].
16+
*/
17+
interface DataCallback {
18+
fun onDataRead(data: ByteArray)
19+
fun onFinished()
20+
fun onError(e: IOException)
21+
}
22+
23+
/**
24+
* Provides functions to read data from an InputStream asynchronously.
25+
*/
26+
class AsyncInputStreamReader {
27+
private val executorService: ExecutorService = Executors.newSingleThreadExecutor()
28+
29+
/**
30+
* Reads data from an InputStream asynchronously using an executor service.
31+
*
32+
* @param inputStream The InputStream to read from
33+
* @param callback The DataCallback to call when data is read, finished, or an error occurs
34+
*
35+
* @return Future<*>
36+
*/
37+
fun readAsync(inputStream: InputStream, callback: DataCallback): Future<*> {
38+
return executorService.submit {
39+
try {
40+
val buffer = ByteArray(4096)
41+
var bytesRead: Int
42+
while (inputStream.read(buffer).also { bytesRead = it } != -1) {
43+
val byteArray = buffer.copyOfRange(0, bytesRead)
44+
callback.onDataRead(byteArray)
45+
}
46+
47+
} catch (e: IOException) {
48+
callback.onError(e)
49+
} finally {
50+
try {
51+
inputStream.close()
52+
} catch (e: IOException) {
53+
callback.onError(e)
54+
}
55+
callback.onFinished()
56+
}
57+
}
58+
}
59+
60+
fun shutdown() {
61+
executorService.shutdown()
62+
}
63+
}

pkgs/ok_http/example/integration_test/client_test.dart

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,13 @@ void main() async {
1515

1616
Future<void> testConformance() async {
1717
group('ok_http client', () {
18-
testRequestBody(OkHttpClient());
19-
testResponseBody(OkHttpClient(), canStreamResponseBody: false);
20-
testRequestHeaders(OkHttpClient());
21-
testRequestMethods(OkHttpClient(), preservesMethodCase: true);
22-
testResponseHeaders(OkHttpClient(), supportsFoldedHeaders: false);
23-
testResponseStatusLine(OkHttpClient());
24-
testCompressedResponseBody(OkHttpClient());
25-
testRedirect(OkHttpClient());
26-
testServerErrors(OkHttpClient());
27-
testClose(OkHttpClient.new);
28-
testIsolate(OkHttpClient.new);
29-
testRequestCookies(OkHttpClient(), canSendCookieHeaders: true);
30-
testResponseCookies(OkHttpClient(), canReceiveSetCookieHeaders: true);
18+
testAll(
19+
OkHttpClient.new,
20+
canStreamRequestBody: false,
21+
preservesMethodCase: true,
22+
supportsFoldedHeaders: false,
23+
canSendCookieHeaders: true,
24+
canReceiveSetCookieHeaders: true,
25+
);
3126
});
3227
}

pkgs/ok_http/jnigen.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ classes:
2828
- "okhttp3.Dispatcher"
2929
- "okhttp3.Cache"
3030
- "com.example.ok_http.RedirectInterceptor"
31+
- "com.example.ok_http.AsyncInputStreamReader"
32+
- "com.example.ok_http.DataCallback"
3133

3234
# Exclude the deprecated methods listed below
3335
# They cause syntax errors during the `dart format` step of JNIGen.

0 commit comments

Comments
 (0)