Skip to content

Commit 6a18fa2

Browse files
committed
fix(stream): remove UnwrapStream, it doesn't work
1 parent d862077 commit 6a18fa2

File tree

6 files changed

+13
-37
lines changed

6 files changed

+13
-37
lines changed

libraries/adb-server-node-tcp/src/index.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import {
66
MaybeConsumable,
77
PushReadableStream,
88
tryClose,
9-
WrapWritableStream,
10-
WritableStream,
119
} from "@yume-chan/stream-extra";
1210
import type { ValueOrPromise } from "@yume-chan/struct";
1311

@@ -36,7 +34,7 @@ function nodeSocketToConnection(
3634
tryClose(controller);
3735
});
3836
}),
39-
writable: new WritableStream<Uint8Array>({
37+
writable: new MaybeConsumable.WritableStream<Uint8Array>({
4038
write: (chunk) => {
4139
return new Promise<void>((resolve, reject) => {
4240
socket.write(chunk, (err) => {
@@ -100,9 +98,7 @@ export class AdbServerNodeTcpConnector
10098
await handler({
10199
service: address!,
102100
readable: connection.readable,
103-
writable: new WrapWritableStream(
104-
connection.writable,
105-
).bePipedThroughFrom(new MaybeConsumable.UnwrapStream()),
101+
writable: connection.writable,
106102
get closed() {
107103
return connection.closed;
108104
},

libraries/adb/src/commands/sync/push.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ async function pipeFileData(
4141
file.pipeThrough(new DistributionStream(packetSize, true))
4242
.pipeTo(
4343
new MaybeConsumable.WritableStream({
44-
write: async (chunk) => {
45-
await adbSyncWriteRequest(
44+
write(chunk) {
45+
return adbSyncWriteRequest(
4646
locked,
4747
AdbSyncRequestId.Data,
4848
chunk,

libraries/adb/src/commands/sync/socket.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ export class AdbSyncSocketLocked implements AsyncExactReadable {
3535
this.#combiner = new BufferCombiner(bufferSize);
3636
}
3737

38-
async #write(buffer: Uint8Array) {
38+
#write(buffer: Uint8Array) {
3939
// `#combiner` will reuse the buffer, so we need to use the Consumable pattern
40-
await Consumable.WritableStream.write(this.#writer, buffer);
40+
return Consumable.WritableStream.write(this.#writer, buffer);
4141
}
4242

4343
async flush() {

libraries/adb/src/server/client.ts

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,10 @@ import type {
66
AbortSignal,
77
ReadableWritablePair,
88
WritableStreamDefaultWriter,
9+
MaybeConsumable,
910
} from "@yume-chan/stream-extra";
1011
import {
1112
BufferedReadableStream,
12-
MaybeConsumable,
13-
WrapWritableStream,
1413
tryCancel,
1514
tryClose,
1615
} from "@yume-chan/stream-extra";
@@ -211,8 +210,8 @@ export class AdbServerClient {
211210
readonly wireless = new AdbServerClient.WirelessCommands(this);
212211
readonly mDns = new AdbServerClient.MDnsCommands(this);
213212

214-
constructor(connection: AdbServerClient.ServerConnector) {
215-
this.connector = connection;
213+
constructor(connector: AdbServerClient.ServerConnector) {
214+
this.connector = connector;
216215
}
217216

218217
async createConnection(
@@ -437,9 +436,7 @@ export class AdbServerClient {
437436
transportId,
438437
service,
439438
readable: socket.readable,
440-
writable: new WrapWritableStream(
441-
socket.writable,
442-
).bePipedThroughFrom(new MaybeConsumable.UnwrapStream()),
439+
writable: socket.writable,
443440
get closed() {
444441
return socket.closed;
445442
},
@@ -567,7 +564,7 @@ export namespace AdbServerClient {
567564
}
568565

569566
export interface ServerConnection
570-
extends ReadableWritablePair<Uint8Array, Uint8Array>,
567+
extends ReadableWritablePair<Uint8Array, MaybeConsumable<Uint8Array>>,
571568
Closeable {
572569
get closed(): Promise<void>;
573570
}

libraries/adb/src/server/transport.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,7 @@ export class AdbServerTransport implements AdbTransport {
7676

7777
async connect(service: string): Promise<AdbSocket> {
7878
return await this.#client.createDeviceConnection(
79-
{
80-
transportId: this.transportId,
81-
},
79+
{ transportId: this.transportId },
8280
service,
8381
);
8482
}

libraries/stream-extra/src/maybe-consumable-ns.ts

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@ import type {
44
QueuingStrategy,
55
WritableStreamDefaultController,
66
} from "./stream.js";
7-
import {
8-
WritableStream as NativeWritableStream,
9-
TransformStream,
10-
} from "./stream.js";
7+
import { WritableStream as NativeWritableStream } from "./stream.js";
118

129
export function getValue<T>(value: MaybeConsumable<T>): T {
1310
return value instanceof Consumable ? value.value : value;
@@ -24,18 +21,6 @@ export function tryConsume<T, R>(
2421
}
2522
}
2623

27-
export class UnwrapStream<T> extends TransformStream<MaybeConsumable<T>, T> {
28-
constructor() {
29-
super({
30-
transform(chunk, controller) {
31-
tryConsume(chunk, (chunk) => {
32-
controller.enqueue(chunk as T);
33-
});
34-
},
35-
});
36-
}
37-
}
38-
3924
export interface WritableStreamSink<in T> {
4025
start?(
4126
controller: WritableStreamDefaultController,

0 commit comments

Comments
 (0)