Skip to content

Commit cd6eb1a

Browse files
committed
Update compression.c++ to use state-machine.h
More of a proof-of-concept application to show the utility in practice. Signed-off-by: James M Snell <[email protected]>
1 parent 9a0c8a9 commit cd6eb1a

File tree

2 files changed

+63
-57
lines changed

2 files changed

+63
-57
lines changed

src/workerd/api/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ wd_cc_library(
248248
visibility = ["//visibility:public"],
249249
deps = [
250250
"//src/workerd/io",
251+
"//src/workerd/util:state-machine",
251252
"@nbytes",
252253
],
253254
)

src/workerd/api/streams/compression.c++

Lines changed: 62 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <workerd/api/system-streams.h>
1010
#include <workerd/io/features.h>
1111
#include <workerd/util/ring-buffer.h>
12+
#include <workerd/util/state-machine.h>
1213

1314
namespace workerd::api {
1415
CompressionAllocator::CompressionAllocator(
@@ -244,49 +245,33 @@ class CompressionStreamImpl: public kj::Refcounted,
244245
explicit CompressionStreamImpl(kj::String format,
245246
Context::ContextFlags flags,
246247
kj::Arc<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
247-
: context(mode, format, flags, kj::mv(externalMemoryTarget)) {}
248+
: context(mode, format, flags, kj::mv(externalMemoryTarget)) {
249+
state.template transitionTo<Open>();
250+
}
248251

249252
// WritableStreamSink implementation ---------------------------------------------------
250253

251254
kj::Promise<void> write(kj::ArrayPtr<const byte> buffer) override {
252-
KJ_SWITCH_ONEOF(state) {
253-
KJ_CASE_ONEOF(ended, Ended) {
254-
JSG_FAIL_REQUIRE(Error, "Write after close");
255-
}
256-
KJ_CASE_ONEOF(exception, kj::Exception) {
257-
kj::throwFatalException(kj::cp(exception));
258-
}
259-
KJ_CASE_ONEOF(open, Open) {
260-
context.setInput(buffer.begin(), buffer.size());
261-
writeInternal(Z_NO_FLUSH);
262-
co_return;
263-
}
264-
}
265-
KJ_UNREACHABLE;
255+
requireActive("Write after close");
256+
context.setInput(buffer.begin(), buffer.size());
257+
writeInternal(Z_NO_FLUSH);
258+
co_return;
266259
}
267260

268261
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const kj::byte>> pieces) override {
269-
// We check for Ended, Exception here so that we catch
270-
// these even if pieces is empty.
271-
KJ_SWITCH_ONEOF(state) {
272-
KJ_CASE_ONEOF(ended, Ended) {
273-
JSG_FAIL_REQUIRE(Error, "Write after close");
274-
}
275-
KJ_CASE_ONEOF(exception, kj::Exception) {
276-
kj::throwFatalException(kj::cp(exception));
277-
}
278-
KJ_CASE_ONEOF(open, Open) {
279-
for (auto piece: pieces) {
280-
co_await write(piece);
281-
}
282-
co_return;
283-
}
262+
// We check state here so that we catch errors even if pieces is empty.
263+
requireActive("Write after close");
264+
for (auto piece: pieces) {
265+
co_await write(piece);
284266
}
285-
KJ_UNREACHABLE;
267+
co_return;
286268
}
287269

288270
kj::Promise<void> end() override {
289-
state = Ended();
271+
// Use transitionFromTo to ensure we're in Open state before ending.
272+
// This provides a clearer error if end() is called twice.
273+
auto result = state.template transitionFromTo<Open, Ended>();
274+
KJ_REQUIRE(result != kj::none, "Stream already ended or errored");
290275
writeInternal(Z_FINISH);
291276
co_return;
292277
}
@@ -303,27 +288,30 @@ class CompressionStreamImpl: public kj::Refcounted,
303288

304289
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
305290
KJ_ASSERT(minBytes <= maxBytes);
306-
KJ_SWITCH_ONEOF(state) {
307-
KJ_CASE_ONEOF(ended, Ended) {
308-
// There might still be data in the output buffer remaining to read.
309-
if (output.empty()) {
310-
co_return static_cast<size_t>(0);
311-
}
312-
co_return co_await tryReadInternal(
313-
kj::arrayPtr(reinterpret_cast<kj::byte*>(buffer), maxBytes), minBytes);
314-
}
315-
KJ_CASE_ONEOF(exception, kj::Exception) {
316-
kj::throwFatalException(kj::cp(exception));
317-
}
318-
KJ_CASE_ONEOF(open, Open) {
319-
co_return co_await tryReadInternal(
320-
kj::arrayPtr(reinterpret_cast<kj::byte*>(buffer), maxBytes), minBytes);
321-
}
291+
// Re-throw any stored exception
292+
KJ_IF_SOME(exception, state.tryGetError()) {
293+
kj::throwFatalException(kj::cp(exception));
322294
}
323-
KJ_UNREACHABLE;
295+
// If stream has ended normally and no buffered data, return EOF
296+
if (state.isTerminal() && output.empty()) {
297+
co_return static_cast<size_t>(0);
298+
}
299+
// Active or terminal with data remaining
300+
co_return co_await tryReadInternal(
301+
kj::arrayPtr(reinterpret_cast<kj::byte*>(buffer), maxBytes), minBytes);
324302
}
325303

326304
private:
305+
// Helper to check that the stream is still active (Open state).
306+
// Throws an appropriate error if the stream has ended or errored.
307+
void requireActive(kj::StringPtr errorMessage) {
308+
KJ_IF_SOME(exception, state.tryGetError()) {
309+
kj::throwFatalException(kj::cp(exception));
310+
}
311+
// isActive() returns true only if in Open state (the ActiveState)
312+
JSG_REQUIRE(state.isActive(), Error, errorMessage);
313+
}
314+
327315
struct PendingRead {
328316
kj::ArrayPtr<kj::byte> buffer;
329317
size_t minBytes = 1;
@@ -343,7 +331,9 @@ class CompressionStreamImpl: public kj::Refcounted,
343331
}
344332

345333
canceler.cancel(kj::cp(reason));
346-
state = kj::mv(reason);
334+
// Use forceTransitionTo because cancelInternal may be called when already
335+
// in an error state (e.g., from writeInternal error handling).
336+
state.template forceTransitionTo<kj::Exception>(kj::mv(reason));
347337
}
348338

349339
kj::Promise<size_t> tryReadInternal(kj::ArrayPtr<kj::byte> dest, size_t minBytes) {
@@ -357,9 +347,9 @@ class CompressionStreamImpl: public kj::Refcounted,
357347
// If the output currently contains >= minBytes, then we'll fulfill
358348
// the read immediately, removing as many bytes as possible from the
359349
// output queue.
360-
// If we reached the end, resolve the read immediately as well, since no
361-
// new data is expected.
362-
if (output.size() >= minBytes || state.template is<Ended>()) {
350+
// If we reached the end (terminal state), resolve the read immediately
351+
// as well, since no new data is expected.
352+
if (output.size() >= minBytes || state.isTerminal()) {
363353
co_return copyIntoBuffer(dest);
364354
}
365355

@@ -385,7 +375,8 @@ class CompressionStreamImpl: public kj::Refcounted,
385375
void writeInternal(int flush) {
386376
// TODO(later): This does not yet implement any backpressure. A caller can keep calling
387377
// write without reading, which will continue to fill the internal buffer.
388-
KJ_ASSERT(flush == Z_FINISH || state.template is<Open>());
378+
// Either we're finishing (state is Ended) or we must still be active (Open)
379+
KJ_ASSERT(flush == Z_FINISH || state.isActive());
389380
Context::Result result;
390381

391382
while (true) {
@@ -477,10 +468,24 @@ class CompressionStreamImpl: public kj::Refcounted,
477468
}
478469
}
479470

480-
struct Ended {};
481-
struct Open {};
471+
struct Ended {
472+
static constexpr kj::StringPtr NAME = "ended"_kj;
473+
};
474+
struct Open {
475+
static constexpr kj::StringPtr NAME = "open"_kj;
476+
};
482477

483-
kj::OneOf<Open, Ended, kj::Exception> state = Open();
478+
// State machine for tracking compression stream lifecycle:
479+
// Open -> Ended (normal close via end())
480+
// Open -> kj::Exception (error via abortWrite())
481+
// Both Ended and kj::Exception are terminal states.
482+
ComposableStateMachine<TerminalStates<Ended, kj::Exception>,
483+
ErrorState<kj::Exception>,
484+
ActiveState<Open>,
485+
Open,
486+
Ended,
487+
kj::Exception>
488+
state;
484489
Context context;
485490

486491
kj::Canceler canceler;

0 commit comments

Comments
 (0)