Skip to content

Commit 986a3d5

Browse files
committed
Add EndableAsyncOutputStream with end() method
1 parent 2ec31fe commit 986a3d5

File tree

3 files changed

+65
-0
lines changed

3 files changed

+65
-0
lines changed

src/workerd/api/streams/writable-sink-test.c++

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <workerd/jsg/jsg-test.h>
44
#include <workerd/tests/test-fixture.h>
55
#include <workerd/util/own-util.h>
6+
#include <workerd/util/stream-utils.h>
67

78
#include <kj/async-io.h>
89
#include <kj/compat/gzip.h>
@@ -132,6 +133,32 @@ class MemoryAsyncOutputStream final: public kj::AsyncOutputStream {
132133
kj::Vector<kj::byte> data;
133134
};
134135

136+
struct MockEndable final: public EndableAsyncOutputStream {
137+
bool isEnded = false;
138+
kj::Vector<kj::byte> data;
139+
140+
kj::Promise<void> write(kj::ArrayPtr<const kj::byte> buffer) override {
141+
data.addAll(buffer);
142+
co_return;
143+
}
144+
145+
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const kj::byte>> pieces) override {
146+
for (auto piece: pieces) {
147+
data.addAll(piece);
148+
}
149+
co_return;
150+
}
151+
152+
kj::Promise<void> whenWriteDisconnected() override {
153+
return kj::NEVER_DONE;
154+
}
155+
156+
kj::Promise<void> end() override {
157+
isEnded = true;
158+
co_return;
159+
}
160+
};
161+
135162
// ======================================================================================
136163
// Core WritableStreamSink Interface Tests
137164

@@ -517,5 +544,23 @@ KJ_TEST("IoContext aware wrapper") {
517544
KJ_ASSERT(inner.data == "some data"_kjb);
518545
}
519546

547+
// ======================================================================================
548+
// EndableAsyncOutputStream Tests
549+
550+
KJ_TEST("EndableAsyncOutputStream") {
551+
TestFixture fixture;
552+
MockEndable inner;
553+
auto fakeOwn = kj::Own<MockEndable>(&inner, kj::NullDisposer::instance);
554+
auto sink = newWritableStreamSink(kj::mv(fakeOwn));
555+
556+
fixture.runInIoContext([&](const auto& environment) -> kj::Promise<void> {
557+
co_await sink->write("some data"_kjb);
558+
co_await sink->end();
559+
});
560+
561+
KJ_ASSERT(inner.data == "some data"_kjb);
562+
KJ_ASSERT(inner.isEnded);
563+
}
564+
520565
} // namespace
521566
} // namespace workerd::api::streams

src/workerd/api/streams/writable-sink.c++

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <workerd/io/io-context.h>
44
#include <workerd/util/stream-utils.h>
55

6+
#include <capnp/compat/byte-stream.h>
67
#include <kj/async-io.h>
78
#include <kj/compat/brotli.h>
89
#include <kj/compat/gzip.h>
@@ -139,6 +140,11 @@ class WritableStreamSinkImpl: public WritableStreamSink {
139140
virtual kj::Promise<void> flush(kj::AsyncOutputStream& output) {
140141
// When using the default implementation, we assume IDENTITY encoding.
141142
KJ_ASSERT(encoding == rpc::StreamEncoding::IDENTITY);
143+
if (auto endable = dynamic_cast<EndableAsyncOutputStream*>(&output)) {
144+
co_await endable->end();
145+
} else if (auto endable = dynamic_cast<capnp::ExplicitEndOutputStream*>(&output)) {
146+
co_await endable->end();
147+
}
142148
// By default there's nothing to flush.
143149
co_return;
144150
}
@@ -199,6 +205,10 @@ class EncodedAsyncOutputStream final: public WritableStreamSinkImpl {
199205
co_await gzip->end();
200206
} else if (auto br = dynamic_cast<kj::BrotliAsyncOutputStream*>(&output)) {
201207
co_await br->end();
208+
} else if (auto endable = dynamic_cast<EndableAsyncOutputStream*>(&output)) {
209+
co_await endable->end();
210+
} else if (auto endable = dynamic_cast<capnp::ExplicitEndOutputStream*>(&output)) {
211+
co_await endable->end();
202212
}
203213
// By default there's nothing to flush.
204214
}

src/workerd/util/stream-utils.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@ class NeuterableIoStream: public kj::AsyncIoStream {
3333
virtual void neuter(kj::Exception ex) = 0;
3434
};
3535

36+
// Until kj::AsyncOutputStream has an end() method of its own... We
37+
// provide this subclass that adds it.
38+
class EndableAsyncOutputStream: public kj::AsyncOutputStream {
39+
public:
40+
// By default, end() is a no-op. Subclasses may override.
41+
virtual kj::Promise<void> end() {
42+
co_return;
43+
}
44+
};
45+
3646
kj::Own<NeuterableInputStream> newNeuterableInputStream(kj::AsyncInputStream&);
3747
kj::Own<NeuterableIoStream> newNeuterableIoStream(kj::AsyncIoStream&);
3848

0 commit comments

Comments
 (0)