Skip to content

Commit 71833a0

Browse files
committed
feat: updated client caller interfaces
- Related #501
1 parent 1d88f5d commit 71833a0

File tree

4 files changed

+68
-207
lines changed

4 files changed

+68
-207
lines changed

src/RPC/RPCClient.ts

Lines changed: 14 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@ import type {
33
JsonRpcRequestMessage,
44
StreamPairCreateCallback,
55
ClientManifest,
6-
MapRawCallers,
76
} from './types';
87
import type { JSONValue } from 'types';
9-
import type { ReadableWritablePair, WritableStream } from 'stream/web';
8+
import type {
9+
ReadableWritablePair,
10+
WritableStream,
11+
ReadableStream,
12+
} from 'stream/web';
1013
import type {
1114
JsonRpcRequest,
1215
JsonRpcResponse,
@@ -78,27 +81,9 @@ class RPCClient<M extends ClientManifest> {
7881
case 'CLIENT':
7982
return () => this.clientStreamCaller(method);
8083
case 'DUPLEX':
81-
return (f) => this.duplexStreamCaller(method, f);
84+
return () => this.duplexStreamCaller(method);
8285
case 'RAW':
83-
default:
84-
return;
85-
}
86-
},
87-
},
88-
);
89-
protected rawMethodsProxy = new Proxy(
90-
{},
91-
{
92-
get: (_, method) => {
93-
if (typeof method === 'symbol') throw never();
94-
switch (this.callerTypes[method]) {
95-
case 'DUPLEX':
96-
return () => this.rawDuplexStreamCaller(method);
97-
case 'RAW':
98-
return (params) => this.rawStreamCaller(method, params);
99-
case 'SERVER':
100-
case 'CLIENT':
101-
case 'UNARY':
86+
return (header) => this.rawStreamCaller(method, header);
10287
default:
10388
return;
10489
}
@@ -138,17 +123,12 @@ class RPCClient<M extends ClientManifest> {
138123
return this.methodsProxy as MapCallers<M>;
139124
}
140125

141-
@ready(new rpcErrors.ErrorRpcDestroyed())
142-
public get rawMethods(): MapRawCallers<M> {
143-
return this.rawMethodsProxy as MapRawCallers<M>;
144-
}
145-
146126
@ready(new rpcErrors.ErrorRpcDestroyed())
147127
public async unaryCaller<I extends JSONValue, O extends JSONValue>(
148128
method: string,
149129
parameters: I,
150130
): Promise<O> {
151-
const callerInterface = await this.rawDuplexStreamCaller<I, O>(method);
131+
const callerInterface = await this.duplexStreamCaller<I, O>(method);
152132
const reader = callerInterface.readable.getReader();
153133
const writer = callerInterface.writable.getWriter();
154134
await writer.write(parameters);
@@ -165,18 +145,13 @@ class RPCClient<M extends ClientManifest> {
165145
public async serverStreamCaller<I extends JSONValue, O extends JSONValue>(
166146
method: string,
167147
parameters: I,
168-
): Promise<AsyncIterable<O>> {
169-
const callerInterface = await this.rawDuplexStreamCaller<I, O>(method);
148+
): Promise<ReadableStream<O>> {
149+
const callerInterface = await this.duplexStreamCaller<I, O>(method);
170150
const writer = callerInterface.writable.getWriter();
171151
await writer.write(parameters);
172152
await writer.close();
173153

174-
const outputGen = async function* () {
175-
for await (const value of callerInterface.readable) {
176-
yield value;
177-
}
178-
};
179-
return outputGen();
154+
return callerInterface.readable;
180155
}
181156

182157
@ready(new rpcErrors.ErrorRpcDestroyed())
@@ -186,7 +161,7 @@ class RPCClient<M extends ClientManifest> {
186161
output: Promise<O>;
187162
writable: WritableStream<I>;
188163
}> {
189-
const callerInterface = await this.rawDuplexStreamCaller<I, O>(method);
164+
const callerInterface = await this.duplexStreamCaller<I, O>(method);
190165
const reader = callerInterface.readable.getReader();
191166
const output = reader.read().then(({ value, done }) => {
192167
if (done) {
@@ -203,27 +178,6 @@ class RPCClient<M extends ClientManifest> {
203178
@ready(new rpcErrors.ErrorRpcDestroyed())
204179
public async duplexStreamCaller<I extends JSONValue, O extends JSONValue>(
205180
method: string,
206-
f: (output: AsyncIterable<O>) => AsyncIterable<I>,
207-
): Promise<void> {
208-
const callerInterface = await this.rawDuplexStreamCaller<I, O>(method);
209-
const outputGenerator = async function* () {
210-
for await (const value of callerInterface.readable) {
211-
yield value;
212-
}
213-
};
214-
const writer = callerInterface.writable.getWriter();
215-
try {
216-
for await (const value of f(outputGenerator())) {
217-
await writer.write(value);
218-
}
219-
} finally {
220-
await writer.close();
221-
}
222-
}
223-
224-
@ready(new rpcErrors.ErrorRpcDestroyed())
225-
public async rawDuplexStreamCaller<I extends JSONValue, O extends JSONValue>(
226-
method: string,
227181
): Promise<ReadableWritablePair<O, I>> {
228182
const outputMessageTransformStream = clientOutputTransformStream<O>();
229183
const inputMessageTransformStream = clientInputTransformStream<I>(method);
@@ -249,14 +203,14 @@ class RPCClient<M extends ClientManifest> {
249203
@ready(new rpcErrors.ErrorRpcDestroyed())
250204
public async rawStreamCaller(
251205
method: string,
252-
params: JSONValue,
206+
headerParams: JSONValue,
253207
): Promise<ReadableWritablePair<Uint8Array, Uint8Array>> {
254208
const streamPair = await this.streamPairCreateCallback();
255209
const tempWriter = streamPair.writable.getWriter();
256210
const header: JsonRpcRequestMessage = {
257211
jsonrpc: '2.0',
258212
method,
259-
params,
213+
params: headerParams,
260214
id: null,
261215
};
262216
await tempWriter.write(Buffer.from(JSON.stringify(header)));

src/RPC/types.ts

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ type UnaryCallerImplementation<
156156
type ServerCallerImplementation<
157157
I extends JSONValue = JSONValue,
158158
O extends JSONValue = JSONValue,
159-
> = (parameters: I) => Promise<AsyncIterable<O>>;
159+
> = (parameters: I) => Promise<ReadableStream<O>>;
160160

161161
type ClientCallerImplementation<
162162
I extends JSONValue = JSONValue,
@@ -166,17 +166,10 @@ type ClientCallerImplementation<
166166
type DuplexCallerImplementation<
167167
I extends JSONValue = JSONValue,
168168
O extends JSONValue = JSONValue,
169-
> = (f: (output: AsyncIterable<O>) => AsyncIterable<I>) => Promise<void>;
170-
171-
// Raw callers
172-
173-
type RawDuplexCallerImplementation<
174-
I extends JSONValue = JSONValue,
175-
O extends JSONValue = JSONValue,
176169
> = () => Promise<ReadableWritablePair<O, I>>;
177170

178171
type RawCallerImplementation = (
179-
params: JSONValue,
172+
headerParams: JSONValue,
180173
) => Promise<ReadableWritablePair<Uint8Array, Uint8Array>>;
181174

182175
type ConvertDuplexCaller<T> = T extends DuplexCaller<infer I, infer O>
@@ -203,14 +196,6 @@ type ConvertCaller<T extends Caller> = T extends DuplexCaller
203196
? ConvertClientCaller<T>
204197
: T extends UnaryCaller
205198
? ConvertUnaryCaller<T>
206-
: never;
207-
208-
type ConvertRawDuplexStreamHandler<T> = T extends DuplexCaller<infer I, infer O>
209-
? RawDuplexCallerImplementation<I, O>
210-
: never;
211-
212-
type ConvertRawCaller<T> = T extends DuplexCaller
213-
? ConvertRawDuplexStreamHandler<T>
214199
: T extends RawCaller
215200
? RawCallerImplementation
216201
: never;
@@ -224,10 +209,6 @@ type MapCallers<T extends ClientManifest> = {
224209
[K in keyof T]: ConvertCaller<T[K]>;
225210
};
226211

227-
type MapRawCallers<T extends ClientManifest> = {
228-
[K in keyof T]: ConvertRawCaller<T[K]>;
229-
};
230-
231212
export type {
232213
JsonRpcRequestMessage,
233214
JsonRpcRequestNotification,
@@ -250,5 +231,4 @@ export type {
250231
ClientManifest,
251232
HandlerType,
252233
MapCallers,
253-
MapRawCallers,
254234
};

tests/RPC/RPC.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ describe('RPC', () => {
6363
logger,
6464
});
6565

66-
const callerInterface = await rpcClient.rawMethods.testMethod({
66+
const callerInterface = await rpcClient.methods.testMethod({
6767
hello: 'world',
6868
});
6969
const writer = callerInterface.writable.getWriter();
@@ -116,7 +116,7 @@ describe('RPC', () => {
116116
logger,
117117
});
118118

119-
const callerInterface = await rpcClient.rawMethods.testMethod();
119+
const callerInterface = await rpcClient.methods.testMethod();
120120
const writer = callerInterface.writable.getWriter();
121121
const reader = callerInterface.readable.getReader();
122122
for (const value of values) {

0 commit comments

Comments
 (0)