Skip to content

Commit 8812e8c

Browse files
committed
🐛 [dweb/helper,dweb/core] 修复一些内存管理问题
🐛 修复 Producer.Consumer 的 close 没有正确移除自身的问题 🐛 修复 IpcBodySender 没有正确销毁 usableByIpcConsumer 的问题
1 parent 8675dab commit 8812e8c

File tree

5 files changed

+57
-15
lines changed

5 files changed

+57
-15
lines changed

next/kmp/core/src/commonMain/kotlin/org/dweb_browser/core/ipc/helper/IpcFork.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import org.dweb_browser.core.help.types.CommonAppManifest
1313
class IpcFork(
1414
val pid: Int,
1515
val autoStart: Boolean,
16-
val startReason: String?,
16+
val startReason: String? = null,
1717
val locale: CommonAppManifest,
1818
val remote: CommonAppManifest,
1919
) : IpcRawMessage, IpcMessage {

next/kmp/core/src/commonTest/kotlin/ReadableStreamTest.kt

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
11
package info.bagen.dwebbrowser
22

33
import io.ktor.utils.io.cancel
4+
import io.ktor.utils.io.copyAndClose
5+
import kotlinx.coroutines.CoroutineStart
46
import kotlinx.coroutines.DelicateCoroutinesApi
57
import kotlinx.coroutines.GlobalScope
68
import kotlinx.coroutines.delay
79
import kotlinx.coroutines.launch
810
import org.dweb_browser.core.ipc.helper.ReadableStream
11+
import org.dweb_browser.helper.ByteReadChannelDelegate
912
import org.dweb_browser.helper.SafeInt
1013
import org.dweb_browser.helper.canReadContent
14+
import org.dweb_browser.helper.commonConsumeEachArrayRange
1115
import org.dweb_browser.helper.consumeEachArrayRange
16+
import org.dweb_browser.helper.createByteChannel
1217
import org.dweb_browser.helper.readAvailableByteArray
1318
import org.dweb_browser.test.runCommonTest
1419
import kotlin.test.Test
20+
import kotlin.test.assertContentEquals
1521
import kotlin.test.assertEquals
1622
import kotlin.test.assertTrue
1723

@@ -20,7 +26,7 @@ class ReadableStreamTestTest {
2026
@Test
2127
fun readableStreamAvailableTest() = runCommonTest {
2228
println("start")
23-
val stream = ReadableStream(this,onStart = { controller ->
29+
val stream = ReadableStream(this, onStart = { controller ->
2430
GlobalScope.launch {
2531
var i = 5
2632
while (i-- > 0) {
@@ -55,10 +61,10 @@ class ReadableStreamTestTest {
5561

5662
@Test
5763
fun testCancel() = runCommonTest {
58-
val readableStream = ReadableStream(this,onStart = { controller ->
64+
val readableStream = ReadableStream(this, onStart = { controller ->
5965
GlobalScope.launch {
6066
var i = 0
61-
while (true) {
67+
while (true) {
6268
controller.enqueue(byteArrayOf(i++.toByte()))
6369
delay(400)
6470
}
@@ -82,4 +88,36 @@ class ReadableStreamTestTest {
8288
println("readableStream closed")
8389
assertTrue { true }
8490
}
91+
92+
val byteArray = byteArrayOf(elements = (1..640000).distinct().map { it.toByte() }.toByteArray())
93+
94+
@Test
95+
fun testClose() = runCommonTest(1000) { time ->
96+
println("test-$time")
97+
98+
val stream = ReadableStream(this, onStart = { controller ->
99+
launch(start = CoroutineStart.UNDISPATCHED) {
100+
controller.enqueue(byteArray)
101+
}
102+
launch {
103+
controller.closeWrite()
104+
}
105+
})
106+
val sink = createByteChannel()
107+
launch {
108+
when (val source = stream.stream.getReader("xix")) {
109+
is ByteReadChannelDelegate -> {
110+
source.sourceByteReadChannel
111+
}
112+
else -> source
113+
}.copyAndClose(sink)
114+
}
115+
116+
var res = byteArrayOf()
117+
sink.commonConsumeEachArrayRange { byteArray, last ->
118+
res += byteArray
119+
}
120+
121+
assertContentEquals(res, byteArray)
122+
}
85123
}

toolkit/dweb-core/src/ipc/endpoint/CommonEndpoint.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ export abstract class CommonEndpoint extends IpcEndpoint {
7474
case ENDPOINT_MESSAGE_TYPE.IPC: {
7575
const producer = this.getIpcMessageProducer(endpointMessage.pid);
7676
const ipc = await producer.ipcPo.promise;
77-
producer.producer.trySend($normalizeIpcMessage(endpointMessage.ipcMessage, ipc));
77+
void producer.producer.trySend($normalizeIpcMessage(endpointMessage.ipcMessage, ipc));
7878
break;
7979
}
8080
case ENDPOINT_MESSAGE_TYPE.LIFECYCLE: {

toolkit/dweb-core/src/ipc/ipc-message/stream/IpcBodySender.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,8 @@ export class IpcBodySender extends IpcBody {
296296
if (usableIpcBodyMapper === undefined) {
297297
const mapper = new UsableIpcBodyMapper();
298298
IpcUsableIpcBodyMap.set(ipc, mapper);
299-
ipc.onStream("usableByIpc").collect((event) => {
299+
const usableByIpcConsumer = ipc.onStream("usableByIpc");
300+
usableByIpcConsumer.collect((event) => {
300301
const message = event.data;
301302
switch (message.type) {
302303
case IPC_MESSAGE_TYPE.STREAM_PULLING:
@@ -313,7 +314,10 @@ export class IpcBodySender extends IpcBody {
313314
}
314315
event.consume();
315316
});
316-
mapper.onDestroy(() => IpcUsableIpcBodyMap.delete(ipc));
317+
mapper.onDestroy(() => {
318+
usableByIpcConsumer.close();
319+
IpcUsableIpcBodyMap.delete(ipc);
320+
});
317321
usableIpcBodyMapper = mapper;
318322
}
319323
if (usableIpcBodyMapper.add(streamId, ipcBody)) {

toolkit/dweb-helper/src/Producer.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ export class Producer<T> {
8080
/**过滤触发 */
8181
consumeMapNotNull<R>(mapNotNull: (data: T) => R | undefined): R | undefined {
8282
const result = mapNotNull(this.data);
83-
if (result !== null) {
83+
if (result !== null && result !== undefined) {
8484
this.consume();
85-
return result as R;
85+
return result;
8686
}
8787
}
8888
consumeFilter<R extends T = T>(filter: (data: T) => boolean) {
@@ -148,14 +148,14 @@ export class Producer<T> {
148148
/**无保证发送 */
149149
sendBeacon(value: T) {
150150
const event = this.event(value);
151-
this.doEmit(event);
151+
return this.doEmit(event);
152152
}
153153

154154
trySend(value: T) {
155155
if (this.isClosedForSend) {
156-
this.sendBeacon(value);
156+
return this.sendBeacon(value);
157157
} else {
158-
this.#doSend(value);
158+
return this.#doSend(value);
159159
}
160160
}
161161

@@ -197,9 +197,7 @@ export class Producer<T> {
197197
//#region Consumer
198198
/**消费者 */
199199
static #Consumer = class Consumer<T> {
200-
constructor(readonly name: string, readonly producer: Producer<T>) {
201-
producer.consumers.add(this);
202-
}
200+
constructor(readonly name: string, readonly producer: Producer<T>) {}
203201

204202
toString() {
205203
return `Consumer<[${this.producer.name}]${this.name}>`;
@@ -279,6 +277,8 @@ export class Producer<T> {
279277
#destroySignal = createSignal();
280278
onDestroy = this.#destroySignal.listen;
281279
close(cause?: string) {
280+
this.input.close();
281+
this.producer.consumers.delete(this);
282282
this.#errorCatcher.resolve(cause);
283283
}
284284
};

0 commit comments

Comments
 (0)