Skip to content

Commit 4e8918d

Browse files
committed
test(graalvm): webstream tests and polyglot proxies
Signed-off-by: Dario Valdespino <dvaldespino00@gmail.com>
1 parent f08e5f2 commit 4e8918d

27 files changed

+994
-393
lines changed

packages/graalvm/src/main/kotlin/elide/runtime/gvm/internals/intrinsics/js/webstreams/ReadableStreamBase.kt

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,14 @@
1313
package elide.runtime.gvm.internals.intrinsics.js.webstreams
1414

1515
import org.graalvm.polyglot.Value
16+
import org.graalvm.polyglot.proxy.ProxyExecutable
1617
import java.util.*
1718
import java.util.concurrent.atomic.AtomicInteger
1819
import java.util.concurrent.atomic.AtomicReference
1920
import java.util.concurrent.locks.ReentrantLock
2021
import elide.runtime.exec.GuestExecutor
22+
import elide.runtime.gvm.internals.intrinsics.js.abort.AbortSignal
23+
import elide.runtime.interop.ReadOnlyProxyObject
2124
import elide.runtime.intrinsics.js.*
2225
import elide.runtime.intrinsics.js.ReadableStream.ReadResult
2326
import elide.runtime.intrinsics.js.err.TypeError
@@ -30,7 +33,7 @@ import elide.vm.annotations.Polyglot
3033
* methods used by other components. This contract allows readers and controllers to be decoupled from the internal
3134
* state of the stream itself.
3235
*/
33-
internal abstract class ReadableStreamBase : ReadableStream {
36+
internal abstract class ReadableStreamBase : ReadableStream, ReadOnlyProxyObject {
3437
/** Executor used to schedule stream operations. */
3538
protected abstract val executor: GuestExecutor
3639

@@ -106,7 +109,7 @@ internal abstract class ReadableStreamBase : ReadableStream {
106109

107110
source.pull(controller).then(
108111
onFulfilled = {
109-
if (sourceState.compareAndSet(SOURCE_PULL_AGAIN, SOURCE_PULLING)) maybePull()
112+
if (sourceState.compareAndSet(SOURCE_PULL_AGAIN, SOURCE_READY)) maybePull()
110113
else sourceState.compareAndSet(SOURCE_PULLING, SOURCE_READY)
111114
},
112115
onCatch = ::error,
@@ -151,7 +154,7 @@ internal abstract class ReadableStreamBase : ReadableStream {
151154
if (destination.locked) throw TypeError.create("Destination is locked, cannot pipe")
152155

153156
// unpack options
154-
val signal = options?.getMember("signal")?.takeIf { it.isHostObject }?.asHostObject<AbortSignal>()
157+
val signal = options?.getMember("signal")?.takeIf { it.isProxyObject }?.asProxyObject<AbortSignal>()
155158
val preventClose = options?.getMember("preventClose")?.takeIf { it.isBoolean }?.asBoolean() ?: false
156159
val preventAbort = options?.getMember("preventAbort")?.takeIf { it.isBoolean }?.asBoolean() ?: false
157160
val preventCancel = options?.getMember("preventCancel")?.takeIf { it.isBoolean }?.asBoolean() ?: false
@@ -187,6 +190,31 @@ internal abstract class ReadableStreamBase : ReadableStream {
187190
}
188191
}
189192

193+
override fun getMemberKeys(): Array<String> = MEMBERS
194+
override fun getMember(key: String?): Any? = when (key) {
195+
MEMBER_LOCKED -> locked
196+
MEMBER_CANCEL -> ProxyExecutable { cancel(it.firstOrNull()) }
197+
MEMBER_GET_READER -> ProxyExecutable { getReader(it.firstOrNull()) }
198+
MEMBER_TEE -> ProxyExecutable { tee() }
199+
MEMBER_PIPE_TO -> ProxyExecutable { args ->
200+
val destination = args.firstOrNull() ?: throw TypeError.create("A pipe destination must be specified")
201+
val destinationStream = runCatching { destination.asProxyObject<WritableStream>() }
202+
.getOrElse { throw TypeError.create("The specified destination stream is not a writable stream", it) }
203+
204+
pipeTo(destinationStream, args.getOrNull(1))
205+
}
206+
207+
MEMBER_PIPE_THROUGH -> ProxyExecutable { args ->
208+
val transform = args.firstOrNull() ?: throw TypeError.create("A transform stream must be specified")
209+
val transformStream = runCatching { transform.asProxyObject<TransformStream>() }
210+
.getOrElse { throw TypeError.create("The specified destination stream is not a transform stream", it) }
211+
212+
pipeThrough(transformStream, args.getOrNull(1))
213+
}
214+
215+
else -> null
216+
}
217+
190218
internal companion object {
191219
// stream state
192220
internal const val READABLE_STREAM_READABLE = 0
@@ -200,6 +228,22 @@ internal abstract class ReadableStreamBase : ReadableStream {
200228
internal const val SOURCE_PULL_AGAIN = 3
201229
internal const val SOURCE_CLOSING = 4
202230
internal const val SOURCE_CLOSED = 5
231+
232+
private const val MEMBER_LOCKED = "locked"
233+
private const val MEMBER_CANCEL = "cancel"
234+
private const val MEMBER_GET_READER = "getReader"
235+
private const val MEMBER_TEE = "tee"
236+
private const val MEMBER_PIPE_TO = "pipeTo"
237+
private const val MEMBER_PIPE_THROUGH = "pipeThrough"
238+
239+
private val MEMBERS = arrayOf(
240+
MEMBER_LOCKED,
241+
MEMBER_CANCEL,
242+
MEMBER_GET_READER,
243+
MEMBER_TEE,
244+
MEMBER_PIPE_TO,
245+
MEMBER_PIPE_THROUGH,
246+
)
203247
}
204248
}
205249

packages/graalvm/src/main/kotlin/elide/runtime/gvm/internals/intrinsics/js/webstreams/ReadableStreamIntrinsic.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
*/
1313
package elide.runtime.gvm.internals.intrinsics.js.webstreams
1414

15+
import elide.annotations.Singleton
1516
import elide.runtime.core.DelicateElideApi
1617
import elide.runtime.gvm.api.Intrinsic
1718
import elide.runtime.gvm.internals.intrinsics.js.AbstractJsIntrinsic
@@ -24,7 +25,7 @@ import elide.runtime.intrinsics.js.stream.ReadableStreamBYOBReader
2425
import elide.runtime.intrinsics.js.stream.ReadableStreamDefaultReader
2526

2627
/** Implementation of readable streams (via the Web Streams standard). */
27-
@Intrinsic(global = "ReadableStream") internal class ReadableStreamIntrinsic : AbstractJsIntrinsic() {
28+
@Intrinsic(global = "ReadableStream") @Singleton internal class ReadableStreamIntrinsic : AbstractJsIntrinsic() {
2829
@OptIn(DelicateElideApi::class)
2930
override fun install(bindings: GuestIntrinsic.MutableIntrinsicBindings) {
3031
bindings[READABLE_STREAM_SYMBOL.asPublicJsSymbol()] = ReadableStream

packages/graalvm/src/main/kotlin/elide/runtime/gvm/internals/intrinsics/js/webstreams/TransformDefaultStream.kt

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import java.util.concurrent.atomic.AtomicReference
1818
import elide.runtime.exec.GuestExecutor
1919
import elide.runtime.gvm.internals.intrinsics.js.webstreams.ReadableStreamBase.Companion.READABLE_STREAM_ERRORED
2020
import elide.runtime.gvm.internals.intrinsics.js.webstreams.WritableDefaultStream.Companion.WRITABLE_STREAM_ERRORED
21+
import elide.runtime.interop.ReadOnlyProxyObject
2122
import elide.runtime.intrinsics.js.CompletableJsPromise
2223
import elide.runtime.intrinsics.js.JsPromise
2324
import elide.runtime.intrinsics.js.TransformStream
@@ -30,8 +31,7 @@ internal class TransformDefaultStream(
3031
executor: GuestExecutor,
3132
writableStrategy: QueueingStrategy = QueueingStrategy.DefaultReadStrategy,
3233
readableStrategy: QueueingStrategy = QueueingStrategy.DefaultReadStrategy,
33-
) : TransformStream {
34-
34+
) : TransformStream, ReadOnlyProxyObject {
3535
@JvmInline private value class DelegatingSource(private val stream: TransformDefaultStream) : ReadableStreamSource {
3636
override fun pull(controller: ReadableStreamController): JsPromise<Unit> {
3737
check(stream.backpressure.get())
@@ -161,13 +161,13 @@ internal class TransformDefaultStream(
161161

162162
private val controller = TransformStreamDefaultControllerToken(this)
163163

164-
@get:Polyglot override val readable: ReadableDefaultStream = ReadableDefaultStream(
165-
source = DelegatingSource(this),
166-
strategy = readableStrategy,
167-
executor
164+
@Polyglot override val readable: ReadableDefaultStream = ReadableDefaultStream(
165+
source = DelegatingSource(this),
166+
strategy = readableStrategy,
167+
executor,
168168
)
169169

170-
@get:Polyglot override val writable: WritableDefaultStream = WritableDefaultStream(
170+
@Polyglot override val writable: WritableDefaultStream = WritableDefaultStream(
171171
sink = DelegatingSink(this),
172172
strategy = writableStrategy,
173173
)
@@ -204,4 +204,17 @@ internal class TransformDefaultStream(
204204

205205
return promise
206206
}
207+
208+
override fun getMemberKeys(): Array<String> = MEMBERS
209+
override fun getMember(key: String?): Any? = when (key) {
210+
MEMBER_READABLE -> readable
211+
MEMBER_WRITABLE -> writable
212+
else -> null
213+
}
214+
215+
private companion object {
216+
private const val MEMBER_READABLE = "readable"
217+
private const val MEMBER_WRITABLE = "writable"
218+
private val MEMBERS = arrayOf(MEMBER_READABLE, MEMBER_WRITABLE)
219+
}
207220
}

packages/graalvm/src/main/kotlin/elide/runtime/gvm/internals/intrinsics/js/webstreams/TransformStreamIntrinsic.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
*/
1313
package elide.runtime.gvm.internals.intrinsics.js.webstreams
1414

15+
import elide.annotations.Singleton
1516
import elide.runtime.core.DelicateElideApi
1617
import elide.runtime.gvm.api.Intrinsic
1718
import elide.runtime.gvm.internals.intrinsics.js.AbstractJsIntrinsic
@@ -21,7 +22,7 @@ import elide.runtime.intrinsics.js.TransformStream
2122
import elide.runtime.intrinsics.js.WritableStream
2223

2324
/** Implementation of transform streams (via the Web Streams standard). */
24-
@Intrinsic(global = "TransformStream") internal class TransformStreamIntrinsic : AbstractJsIntrinsic() {
25+
@Intrinsic(global = "TransformStream") @Singleton internal class TransformStreamIntrinsic : AbstractJsIntrinsic() {
2526
@OptIn(DelicateElideApi::class)
2627
override fun install(bindings: GuestIntrinsic.MutableIntrinsicBindings) {
2728
bindings[TRANSFORM_STREAM_SYMBOL.asPublicJsSymbol()] = TransformStream

packages/graalvm/src/main/kotlin/elide/runtime/gvm/internals/intrinsics/js/webstreams/WritableDefaultStream.kt

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ package elide.runtime.gvm.internals.intrinsics.js.webstreams
1414

1515
import com.google.common.util.concurrent.AtomicDouble
1616
import org.graalvm.polyglot.Value
17+
import org.graalvm.polyglot.proxy.ProxyExecutable
1718
import java.util.concurrent.ConcurrentLinkedQueue
1819
import java.util.concurrent.atomic.AtomicBoolean
1920
import java.util.concurrent.atomic.AtomicInteger
2021
import java.util.concurrent.atomic.AtomicReference
2122
import elide.runtime.gvm.internals.intrinsics.js.abort.AbortController
2223
import elide.runtime.gvm.internals.intrinsics.js.webstreams.WritableDefaultStream.QueueElement.Chunk
2324
import elide.runtime.gvm.internals.intrinsics.js.webstreams.WritableDefaultStream.QueueElement.CloseToken
25+
import elide.runtime.interop.ReadOnlyProxyObject
2426
import elide.runtime.intrinsics.js.AbortSignal
2527
import elide.runtime.intrinsics.js.CompletableJsPromise
2628
import elide.runtime.intrinsics.js.JsPromise
@@ -41,16 +43,31 @@ internal class WritableDefaultStream(
4143
private val sink: WritableStreamSink,
4244
/** Queueing strategy used to control backpressure. */
4345
private val strategy: QueueingStrategy = QueueingStrategy.DefaultWriteStrategy,
44-
) : WritableStream {
46+
) : WritableStream, ReadOnlyProxyObject {
4547
/** Inline wrapper for the stream providing the controller API by delegation. */
46-
@JvmInline private value class ControllerToken(val stream: WritableDefaultStream) : WritableStreamDefaultController {
48+
@JvmInline private value class ControllerToken(
49+
val stream: WritableDefaultStream
50+
) : WritableStreamDefaultController, ReadOnlyProxyObject {
4751
override val signal: AbortSignal
4852
get() = stream.abortController.signal
4953

5054
override fun error(e: Any?) {
5155
if (stream.state.get() != WRITABLE_STREAM_WRITABLE) return
5256
stream.startErroring(e)
5357
}
58+
59+
override fun getMemberKeys(): Array<String> = MEMBERS
60+
override fun getMember(key: String?): Any? = when (key) {
61+
MEMBER_SIGNAL -> signal
62+
MEMBER_ERROR -> ProxyExecutable { error(it.firstOrNull()) }
63+
else -> null
64+
}
65+
66+
private companion object {
67+
private const val MEMBER_SIGNAL = "signal"
68+
private const val MEMBER_ERROR = "error"
69+
private val MEMBERS = arrayOf(MEMBER_SIGNAL, MEMBER_ERROR)
70+
}
5471
}
5572

5673
/**
@@ -528,10 +545,25 @@ internal class WritableDefaultStream(
528545
else closeStream()
529546
}
530547

548+
override fun getMemberKeys(): Array<String> = MEMBERS
549+
override fun getMember(key: String?): Any? = when (key) {
550+
MEMBER_CLOSE -> ProxyExecutable { close() }
551+
MEMBER_ABORT -> ProxyExecutable { abort(it.firstOrNull()) }
552+
MEMBER_GET_WRITER -> ProxyExecutable { getWriter() }
553+
MEMBER_LOCKED -> locked
554+
else -> null
555+
}
556+
531557
internal companion object {
532558
internal const val WRITABLE_STREAM_WRITABLE: Int = 0
533559
internal const val WRITABLE_STREAM_CLOSED: Int = 1
534560
internal const val WRITABLE_STREAM_ERRORING: Int = 2
535561
internal const val WRITABLE_STREAM_ERRORED: Int = 3
562+
563+
private const val MEMBER_CLOSE = "close"
564+
private const val MEMBER_ABORT = "abort"
565+
private const val MEMBER_GET_WRITER = "getWriter"
566+
private const val MEMBER_LOCKED = "locked"
567+
private val MEMBERS = arrayOf(MEMBER_CLOSE, MEMBER_ABORT, MEMBER_GET_WRITER, MEMBER_LOCKED)
536568
}
537569
}

packages/graalvm/src/main/kotlin/elide/runtime/gvm/internals/intrinsics/js/webstreams/WritableStreamDefaultWriterToken.kt

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
package elide.runtime.gvm.internals.intrinsics.js.webstreams
1414

1515
import org.graalvm.polyglot.Value
16+
import org.graalvm.polyglot.proxy.ProxyExecutable
1617
import java.util.concurrent.atomic.AtomicBoolean
1718
import java.util.concurrent.atomic.AtomicReference
1819
import elide.runtime.gvm.internals.intrinsics.js.webstreams.WritableDefaultStream.Companion.WRITABLE_STREAM_CLOSED
1920
import elide.runtime.gvm.internals.intrinsics.js.webstreams.WritableDefaultStream.Companion.WRITABLE_STREAM_ERRORED
21+
import elide.runtime.interop.ReadOnlyProxyObject
2022
import elide.runtime.intrinsics.js.CompletableJsPromise
2123
import elide.runtime.intrinsics.js.JsPromise
2224
import elide.runtime.intrinsics.js.err.TypeError
@@ -34,10 +36,10 @@ internal class WritableStreamDefaultWriterToken(
3436
private val detached = AtomicBoolean()
3537

3638
/** A promise used to communicate changes in backpressure to data producers. */
37-
private val mutableReadyPromise = AtomicReference<CompletableJsPromise<Unit>>()
39+
private val mutableReadyPromise = AtomicReference(JsPromise<Unit>().also { it.resolve(Unit) })
3840

3941
/** A promise used to signal the closing of the stream to data producers. */
40-
private val mutableClosePromise = AtomicReference<CompletableJsPromise<Unit>>()
42+
private val mutableClosePromise = AtomicReference(JsPromise<Unit>())
4143

4244
@get:Polyglot override val ready: CompletableJsPromise<Unit> get() = mutableReadyPromise.get()
4345
@get:Polyglot override val closed: CompletableJsPromise<Unit> get() = mutableClosePromise.get()
@@ -56,8 +58,8 @@ internal class WritableStreamDefaultWriterToken(
5658
}
5759

5860
/**
59-
* Ensures that the writer's [ready] promise is rejected, by replacing it with a new rejected promise if it the
60-
* current already.instance is already settled, or rejecting it if it is pending.
61+
* Ensures that the writer's [ready] promise is rejected, by replacing it with a new rejected promise if the current
62+
* ready promise is already settled, or rejecting it if it is pending.
6163
*/
6264
internal fun ensureReadyPromiseRejected(reason: Any?) {
6365
val readyPromise = ready
@@ -66,8 +68,8 @@ internal class WritableStreamDefaultWriterToken(
6668
}
6769

6870
/**
69-
* Ensures that the writer's [closed] promise is rejected, by replacing it with a new rejected promise if it the
70-
* current already.instance is already settled, or rejecting it if it is pending.
71+
* Ensures that the writer's [closed] promise is rejected, by replacing it with a new rejected promise if the current
72+
* promise is already settled, or rejecting it if it is pending.
7173
*/
7274
internal fun ensureClosedPromiseRejected(reason: Any?) {
7375
val closePromise = ready
@@ -106,4 +108,36 @@ internal class WritableStreamDefaultWriterToken(
106108
stream.closeQueuedOrInflight -> JsPromise.rejected(TypeError.create("Stream is already closing"))
107109
else -> stream.closeStream()
108110
}
111+
112+
override fun getMemberKeys(): Array<String> = MEMBERS
113+
override fun getMember(key: String?): Any? = when (key) {
114+
MEMBER_READY -> ready
115+
MEMBER_CLOSED -> closed
116+
MEMBER_DESIRED_SIZE -> desiredSize
117+
MEMBER_CLOSE -> ProxyExecutable { close() }
118+
MEMBER_ABORT -> ProxyExecutable { abort(it.firstOrNull()) }
119+
MEMBER_WRITE -> ProxyExecutable { write(it.firstOrNull() ?: throw TypeError.create("Chunk must not be null")) }
120+
MEMBER_RELEASE_LOCK -> ProxyExecutable { releaseLock() }
121+
else -> null
122+
}
123+
124+
private companion object {
125+
private const val MEMBER_WRITE = "write"
126+
private const val MEMBER_RELEASE_LOCK = "releaseLock"
127+
private const val MEMBER_ABORT = "abort"
128+
private const val MEMBER_CLOSE = "close"
129+
private const val MEMBER_READY = "ready"
130+
private const val MEMBER_CLOSED = "closed"
131+
private const val MEMBER_DESIRED_SIZE = "desiredSize"
132+
133+
private val MEMBERS = arrayOf(
134+
MEMBER_READY,
135+
MEMBER_CLOSED,
136+
MEMBER_DESIRED_SIZE,
137+
MEMBER_CLOSE,
138+
MEMBER_ABORT,
139+
MEMBER_WRITE,
140+
MEMBER_RELEASE_LOCK,
141+
)
142+
}
109143
}

packages/graalvm/src/main/kotlin/elide/runtime/gvm/internals/intrinsics/js/webstreams/WritableStreamIntrinsic.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
*/
1313
package elide.runtime.gvm.internals.intrinsics.js.webstreams
1414

15+
import elide.annotations.Singleton
1516
import elide.runtime.core.DelicateElideApi
1617
import elide.runtime.gvm.api.Intrinsic
1718
import elide.runtime.gvm.internals.intrinsics.js.AbstractJsIntrinsic
@@ -21,7 +22,7 @@ import elide.runtime.intrinsics.js.WritableStream
2122
import elide.runtime.intrinsics.js.stream.WritableStreamDefaultWriter
2223

2324
/** Implementation of writable streams (via the Web Streams standard). */
24-
@Intrinsic(global = "WritableStream") internal class WritableStreamIntrinsic : AbstractJsIntrinsic() {
25+
@Intrinsic(global = "WritableStream") @Singleton internal class WritableStreamIntrinsic : AbstractJsIntrinsic() {
2526
@OptIn(DelicateElideApi::class)
2627
override fun install(bindings: GuestIntrinsic.MutableIntrinsicBindings) {
2728
bindings[WRITABLE_STREAM_SYMBOL.asPublicJsSymbol()] = WritableStream

packages/graalvm/src/main/kotlin/elide/runtime/intrinsics/js/JsPromise.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public interface JsPromise<out T> : ProxyObject {
144144
unwrapFulfilled: (Value?) -> T,
145145
unwrapRejected: (Value?) -> Any? = { it },
146146
): JsPromise<T>? {
147-
return if (value.isHostObject) runCatching { value.asHostObject<JsPromise<T>>() }.getOrNull()
147+
return if (value.isProxyObject) runCatching { value.asProxyObject<JsPromise<T>>() }.getOrNull()
148148
else if (!value.canInvokeMember(THEN_SYMBOL)) null
149149
else object : JsPromise<T> {
150150
override val isDone: Boolean get() = false

packages/graalvm/src/main/kotlin/elide/runtime/intrinsics/js/ReadableStream.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ import org.graalvm.polyglot.HostAccess
1616
import org.graalvm.polyglot.Value
1717
import org.graalvm.polyglot.proxy.ProxyInstantiable
1818
import org.graalvm.polyglot.proxy.ProxyIterable
19+
import org.graalvm.polyglot.proxy.ProxyObject
1920
import java.io.InputStream
2021
import java.io.Reader
2122
import java.nio.ByteBuffer
2223
import elide.annotations.API
2324
import elide.runtime.exec.GuestExecution
2425
import elide.runtime.gvm.internals.intrinsics.js.webstreams.*
26+
import elide.runtime.interop.ReadOnlyProxyObject
2527
import elide.runtime.intrinsics.js.ReadableStream.ReaderMode.BYOB
2628
import elide.runtime.intrinsics.js.ReadableStream.ReaderMode.Default
2729
import elide.runtime.intrinsics.js.stream.*
@@ -34,7 +36,7 @@ import elide.vm.annotations.Polyglot
3436
* Readable streams implement streams of arbitrary data which can be consumed by an interested developer, and form part
3537
* of the wider Web Streams API.
3638
*/
37-
@API @HostAccess.Implementable public interface ReadableStream : Stream, ProxyIterable {
39+
@API @HostAccess.Implementable public interface ReadableStream : Stream, ProxyIterable, ProxyObject {
3840
/**
3941
* Encapsulates the result of a read; [done] indicates whether the [value] is the final value that will be available
4042
* from the source.

0 commit comments

Comments
 (0)