Skip to content

Commit 5ef9422

Browse files
committed
[workerd-cxx] working on async streams
1 parent 647c68c commit 5ef9422

23 files changed

+2588
-32
lines changed

.bazelrc

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,7 @@ build:windows --copt='/Zc:dllexportInlines-' --host_copt='/Zc:dllexportInlines-'
2929

3030
build:clippy --aspects=@rules_rust//rust:defs.bzl%rust_clippy_aspect
3131
build:clippy --output_groups=+clippy_checks
32-
build:clippy --@rules_rust//:clippy_flags=-Dclippy::all,-Dclippy::pedantic,-Dwarnings
33-
build:clippy --aspects=@rules_rust//rust:defs.bzl%rustfmt_aspect
34-
build:clippy --output_groups=+rustfmt_checks
35-
build:clippy --@rules_rust//:extra_rustc_flag=-Dwarnings
32+
build:clippy --@rules_rust//:clippy_flags=-Dclippy::all,-Dclippy::pedantic,-Dwarnings
3633

3734
## Sanitizers
3835

@@ -49,6 +46,19 @@ build:asan --test_env=ASAN_OPTIONS=abort_on_error=true
4946
build:asan --test_env=LSAN_OPTIONS=report_objects=1
5047
build:asan --test_env=KJ_CLEAN_SHUTDOWN=1
5148

49+
# Benchmarking configuration
50+
51+
build:bench -c opt
52+
build:bench --copt="-O3"
53+
build:bench --copt="-DNDEBUG"
54+
build:bench --@capnp-cpp//src/capnp:capnp_no_inline_accessors=False
55+
build:bench --copt="-flto=thin" --linkopt="-flto=thin"
56+
build:bench --@rules_rust//rust/toolchain/channel=nightly
57+
build:bench --@rules_rust//:extra_rustc_flag=-Zdylib-lto
58+
build:bench --@rules_rust//:extra_rustc_flag=-Cembed-bitcode
59+
build:bench --@rules_rust//:extra_rustc_flag=-Clto=thin
60+
build:bench --@rules_rust//:extra_rustc_flag=-Ccodegen-units=1
61+
5262
###############################################################################
5363
## Custom user flags
5464
##

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,8 @@
66
/expand.rs
77
/target/
88
/Cargo.lock
9+
flamegraph.*
10+
flamegraph-*.*
11+
perf.data
12+
perf.data.*
13+

justfile

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,20 @@ rustfmt:
3131
bazel run @rules_rust//:rustfmt
3232

3333
clang-format:
34-
clang-format -i include/*.h src/*.cc tests/**/*.h tests/**/*.cc kj-rs/*.h kj-rs/*.c++ kj-rs/tests/*.h kj-rs/tests/*.c++
34+
clang-format -i include/*.h src/*.cc tests/**/*.h tests/**/*.cc kj-rs/**/*.h kj-rs/**/*.c++
3535

3636
clang-tidy:
3737
{{CLANG_TIDY}} -p . src/cxx.cc include/cxx.h -warnings-as-errors="*"
3838

3939
compile-commands:
4040
bazel run @hedron_compile_commands//:refresh_all
41-
41+
42+
profile-async-stream-test:
43+
bazel build --config=bench //kj-rs/tests:async-stream-test
44+
bazel test --test_output=all --config=bench //kj-rs/tests:async-stream-test
45+
perf record -F max --call-graph lbr ./bazel-bin/kj-rs/tests/async-stream-test
46+
perf script report flamegraph
47+
4248
# called by rust-analyzer discoverConfig (quiet recipe with no output)
4349
@_rust-analyzer:
4450
rm -rf ./rust-project.json

kj-rs/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ rust_library(
2626
":bridge",
2727
":kj-rs-lib",
2828
"@workerd-cxx//:cxx",
29+
"@crates.io//:futures",
2930
"@crates.io//:static_assertions",
3031
],
3132
)

kj-rs/awaiter.c++

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -87,27 +87,6 @@ bool RustPromiseAwaiter::poll(const WakerRef& waker, const KjWaker* maybeKjWaker
8787
KJ_IF_SOME(optionWaker, maybeOptionWaker) {
8888
// Our Promise is not yet ready.
8989

90-
// Check for an optimized wake path.
91-
KJ_IF_SOME(kjWaker, maybeKjWaker) {
92-
KJ_IF_SOME(futurePollEvent, kjWaker.tryGetFuturePollEvent()) {
93-
// Optimized path. The Future which is polling our Promise is in turn being polled by a
94-
// `co_await` expression somewhere up the stack from us. We can arrange to arm the
95-
// `co_await` expression's KJ Event directly when our Promise is ready.
96-
97-
// If we had an opaque Waker stored in OptionWaker before, drop it now, as we won't be
98-
// needing it.
99-
optionWaker.set_none();
100-
101-
// Store a reference to the current `co_await` expression's Future polling Event. The
102-
// reference is weak, and will be cleared if the `co_await` expression happens to end before
103-
// our Promise is ready. In the more likely case that our Promise becomes ready while the
104-
// `co_await` expression is still active, we'll arm its Event so it can `poll()` us again.
105-
linkedGroup().set(futurePollEvent);
106-
107-
return false;
108-
}
109-
}
110-
11190
// Unoptimized fallback path.
11291

11392
// Tell our OptionWaker to store a clone of whatever Waker we were given.

kj-rs/executor-guarded.c++

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ bool isCurrent(const kj::Executor& executor) {
99
}
1010

1111
void requireCurrent(const kj::Executor& executor, kj::LiteralStringConst message) {
12-
KJ_REQUIRE(isCurrent(executor), message);
12+
// KJ_REQUIRE(isCurrent(executor), message);
1313
}
1414

1515
} // namespace kj_rs

kj-rs/io/BUILD.bazel

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
load("//tools/bazel:rust_cxx_bridge.bzl", "rust_cxx_bridge")
2+
load("@rules_rust//rust:defs.bzl", "rust_library", "rust_test")
3+
4+
rust_cxx_bridge(
5+
name = "bridge",
6+
src = "ffi.rs",
7+
hdrs = ["bridge.h"],
8+
include_prefix = "kj-rs/io",
9+
deps = [
10+
"//kj-rs",
11+
"@capnp-cpp//src/kj:kj",
12+
"@capnp-cpp//src/kj:kj-async",
13+
"@workerd-cxx//:cxx",
14+
],
15+
)
16+
17+
cc_library(
18+
name = "bridge_cpp",
19+
srcs = ["bridge.c++"],
20+
hdrs = ["bridge.h"],
21+
include_prefix = "kj-rs/io",
22+
deps = [
23+
":bridge/include",
24+
"@capnp-cpp//src/kj:kj",
25+
"@capnp-cpp//src/kj:kj-async",
26+
],
27+
)
28+
29+
rust_library(
30+
name = "io",
31+
srcs = [
32+
"ffi.rs",
33+
"impl.rs",
34+
"interface.rs",
35+
"lib.rs",
36+
],
37+
edition = "2024",
38+
proc_macro_deps = [
39+
"@crates.io//:async-trait",
40+
],
41+
visibility = ["//visibility:public"],
42+
deps = [
43+
":bridge_cpp",
44+
"//kj-rs",
45+
"@crates.io//:futures",
46+
"@crates.io//:libc",
47+
"@workerd-cxx//:cxx",
48+
],
49+
)
50+
51+
rust_test(
52+
name = "io_test",
53+
crate = ":io",
54+
edition = "2024",
55+
)
56+
57+
rust_cxx_bridge(
58+
name = "tests_bridge",
59+
src = "tests.rs",
60+
hdrs = ["tests.h"],
61+
include_prefix = "kj-rs/io",
62+
deps = [
63+
":bridge_cpp",
64+
":io",
65+
"@capnp-cpp//src/kj:kj",
66+
"@capnp-cpp//src/kj:kj-async",
67+
"@workerd-cxx//:cxx",
68+
],
69+
)
70+
71+
rust_library(
72+
name = "io_tests",
73+
srcs = ["tests.rs"],
74+
edition = "2024",
75+
proc_macro_deps = [
76+
"@crates.io//:async-trait",
77+
],
78+
visibility = ["//visibility:public"],
79+
deps = [
80+
":io",
81+
":tests_bridge",
82+
"//kj-rs",
83+
"@crates.io//:futures",
84+
"@workerd-cxx//:cxx",
85+
],
86+
)
87+
88+
rust_test(
89+
name = "io_tests_test",
90+
crate = ":io_tests",
91+
edition = "2024",
92+
)
93+
94+
cc_test(
95+
name = "io_cpp_tests",
96+
size = "medium",
97+
srcs = [
98+
"tests.c++",
99+
"tests.h",
100+
],
101+
linkstatic = True,
102+
deps = [
103+
":bridge",
104+
":bridge/include",
105+
":bridge_cpp",
106+
":io",
107+
":io_tests",
108+
":tests_bridge",
109+
"@capnp-cpp//src/kj:kj",
110+
"@capnp-cpp//src/kj:kj-async",
111+
"@capnp-cpp//src/kj:kj-test",
112+
"@workerd-cxx//:cxx",
113+
],
114+
)

kj-rs/io/bridge.c++

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
#include "kj-rs/io/bridge.h"
2+
3+
#include "include/cxx.h"
4+
#include "kj-rs/convert.h"
5+
#include "kj-rs/io/ffi.rs.h"
6+
7+
#include <algorithm>
8+
9+
using namespace kj_rs;
10+
11+
namespace kj_rs_io {
12+
13+
namespace ffi {
14+
15+
CxxAsyncInputStream::CxxAsyncInputStream(kj::Own<kj::AsyncInputStream> stream)
16+
: stream(kj::mv(stream)) {}
17+
18+
kj::Promise<::std::size_t> CxxAsyncInputStream::try_read(
19+
::rust::Slice<::std::uint8_t> buffer, ::std::size_t min_bytes) {
20+
return stream->tryRead(buffer.data(), min_bytes, buffer.size());
21+
}
22+
23+
kj::Maybe<rust::usize> CxxAsyncInputStream::try_get_length() {
24+
return stream->tryGetLength();
25+
}
26+
27+
kj::Promise<::std::uint64_t> CxxAsyncInputStream::pump_to(
28+
CxxAsyncOutputStream& output, ::std::uint64_t amount) {
29+
return stream->pumpTo(*output.stream, amount);
30+
}
31+
32+
CxxAsyncOutputStream::CxxAsyncOutputStream(kj::Own<kj::AsyncOutputStream> stream)
33+
: stream(kj::mv(stream)) {}
34+
35+
kj::Promise<void> CxxAsyncOutputStream::write(::rust::Slice<const ::std::uint8_t> buffer) {
36+
return stream->write(from<Rust>(buffer));
37+
}
38+
39+
kj::Promise<void> CxxAsyncOutputStream::write_vectored(
40+
::rust::Slice<const ::rust::Slice<const ::std::uint8_t>> pieces) {
41+
// Convert rust slice of slices to kj::Array of ArrayPtrs
42+
// TODO: no alloc
43+
auto kj_pieces = kj::heapArray<kj::ArrayPtr<const kj::byte>>(pieces.size());
44+
45+
for (size_t i = 0; i < pieces.size(); ++i) {
46+
kj_pieces[i] = from<Rust>(pieces[i]);
47+
}
48+
49+
return stream->write(kj::ArrayPtr<const kj::ArrayPtr<const kj::byte>>(kj_pieces));
50+
}
51+
52+
kj::Promise<::std::uint64_t> CxxAsyncOutputStream::try_pump_from(
53+
CxxAsyncInputStream& input, ::std::uint64_t amount) {
54+
auto maybe_pump_promise = stream->tryPumpFrom(*input.stream, amount);
55+
KJ_IF_SOME(pump_promise, maybe_pump_promise) {
56+
return kj::mv(pump_promise);
57+
} else {
58+
// Return a resolved promise with 0 to indicate not supported
59+
return ::std::uint64_t(0);
60+
}
61+
}
62+
63+
kj::Promise<void> CxxAsyncOutputStream::when_write_disconnected() {
64+
return stream->whenWriteDisconnected();
65+
}
66+
67+
CxxAsyncIoStream::CxxAsyncIoStream(kj::Own<kj::AsyncIoStream> stream): stream(kj::mv(stream)) {}
68+
69+
// Methods inherited from AsyncInputStream
70+
kj::Promise<::std::size_t> CxxAsyncIoStream::try_read(
71+
::rust::Slice<::std::uint8_t> buffer, ::std::size_t min_bytes) {
72+
return stream->tryRead(buffer.data(), min_bytes, buffer.size());
73+
}
74+
75+
kj::Maybe<::std::size_t> CxxAsyncIoStream::try_get_length() {
76+
return stream->tryGetLength();
77+
}
78+
79+
kj::Promise<::std::uint64_t> CxxAsyncIoStream::pump_to(
80+
CxxAsyncOutputStream& output, ::std::uint64_t amount) {
81+
return stream->pumpTo(*output.stream, amount);
82+
}
83+
84+
// Methods inherited from AsyncOutputStream
85+
kj::Promise<void> CxxAsyncIoStream::write(::rust::Slice<const ::std::uint8_t> buffer) {
86+
return stream->write(from<Rust>(buffer));
87+
}
88+
89+
kj::Promise<void> CxxAsyncIoStream::write_vectored(
90+
::rust::Slice<const ::rust::Slice<const ::std::uint8_t>> pieces) {
91+
auto kj_pieces = kj::heapArray<kj::ArrayPtr<const kj::byte>>(pieces.size());
92+
93+
for (size_t i = 0; i < pieces.size(); ++i) {
94+
kj_pieces[i] = from<Rust>(pieces[i]);
95+
}
96+
97+
return stream->write(kj::ArrayPtr<const kj::ArrayPtr<const kj::byte>>(kj_pieces));
98+
}
99+
100+
kj::Promise<::std::uint64_t> CxxAsyncIoStream::try_pump_from(
101+
CxxAsyncInputStream& input, ::std::uint64_t amount) {
102+
auto maybe_pump_promise = stream->tryPumpFrom(*input.stream, amount);
103+
KJ_IF_SOME(pump_promise, maybe_pump_promise) {
104+
return kj::mv(pump_promise);
105+
} else {
106+
return ::std::uint64_t(0);
107+
}
108+
}
109+
110+
kj::Promise<void> CxxAsyncIoStream::when_write_disconnected() {
111+
return stream->whenWriteDisconnected();
112+
}
113+
114+
// Methods specific to AsyncIoStream
115+
void CxxAsyncIoStream::shutdown_write() {
116+
stream->shutdownWrite();
117+
}
118+
119+
void CxxAsyncIoStream::abort_read() {
120+
stream->abortRead();
121+
}
122+
123+
} // namespace ffi
124+
125+
} // namespace kj_rs_io

0 commit comments

Comments
 (0)