Skip to content

Commit 1610d30

Browse files
committed
feat: 🎸 upate rpc codec implementation
1 parent ead50de commit 1610d30

File tree

6 files changed

+330
-338
lines changed

6 files changed

+330
-338
lines changed

src/rpc/README.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,12 @@ const framedMessage = rmEncoder.encodeRecord(rpcMessage);
3030
const rmDecoder = new RmRecordDecoder();
3131
const rpcDecoder = new RpcMessageDecoder();
3232

33-
// First extract the record from the byte stream
33+
// First extract the record from the byte stream (returns Reader)
3434
rmDecoder.push(tcpData);
3535
const record = rmDecoder.readRecord();
3636

37-
// Then decode the RPC message
37+
// Then decode the RPC message from the Reader
3838
if (record) {
39-
rpcDecoder.push(record);
40-
const message = rpcDecoder.readMessage();
39+
const message = rpcDecoder.decodeMessage(record);
4140
}
4241
```

src/rpc/RpcMessageDecoder.ts

Lines changed: 68 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
import {Reader} from '@jsonjoy.com/buffers/lib/Reader';
22
import {RpcMsgType, RpcReplyStat, RpcAcceptStat, RpcRejectStat, RPC_VERSION} from './constants';
33
import {RpcDecodingError} from './errors';
4-
import {RpcOpaqueAuth, RpcCallBody, RpcAcceptedReply, RpcRejectedReply, RpcMessage, RpcMismatchInfo} from './messages';
4+
import {
5+
RpcOpaqueAuth,
6+
RpcCallMessage,
7+
RpcAcceptedReplyMessage,
8+
RpcRejectedReplyMessage,
9+
RpcMessage,
10+
RpcMismatchInfo,
11+
} from './messages';
512

613
export class RpcMessageDecoder {
714
public decodeMessage(reader: Reader): RpcMessage | undefined {
@@ -15,43 +22,88 @@ export class RpcMessageDecoder {
1522
const msgType = reader.u32();
1623
let message: RpcMessage | undefined;
1724
if (msgType === RpcMsgType.CALL) {
18-
const callBody = this.readCallBody(reader);
19-
if (!callBody) {
25+
if (reader.size() < 20) {
2026
reader.x = startPos;
2127
return undefined;
2228
}
23-
const params = reader.size() > 0 ? reader.buf(reader.size()) : undefined;
24-
callBody.params = params;
25-
message = new RpcMessage(xid, callBody);
29+
const rpcvers = reader.u32();
30+
if (rpcvers !== RPC_VERSION) {
31+
throw new RpcDecodingError(`Unsupported RPC version: ${rpcvers}`);
32+
}
33+
const prog = reader.u32();
34+
const vers = reader.u32();
35+
const proc = reader.u32();
36+
const cred = this.readOpaqueAuth(reader);
37+
if (!cred) {
38+
reader.x = startPos;
39+
return undefined;
40+
}
41+
const verf = this.readOpaqueAuth(reader);
42+
if (!verf) {
43+
reader.x = startPos;
44+
return undefined;
45+
}
46+
const params = reader.size() > 0 ? reader.cut(reader.size()) : undefined;
47+
message = new RpcCallMessage(xid, rpcvers, prog, vers, proc, cred, verf, params);
2648
} else if (msgType === RpcMsgType.REPLY) {
2749
if (reader.size() < 4) {
2850
reader.x = startPos;
2951
return undefined;
3052
}
3153
const replyStat = reader.u32();
3254
if (replyStat === RpcReplyStat.MSG_ACCEPTED) {
33-
const reply = this.readAcceptedReply(reader);
34-
if (!reply) {
55+
const verf = this.readOpaqueAuth(reader);
56+
if (!verf) {
3557
reader.x = startPos;
3658
return undefined;
3759
}
38-
const results = reader.size() > 0 ? reader.buf(reader.size()) : undefined;
39-
reply.results = results;
40-
message = new RpcMessage(xid, reply);
60+
if (reader.size() < 4) {
61+
reader.x = startPos;
62+
return undefined;
63+
}
64+
const acceptStat = reader.u32();
65+
let mismatchInfo: RpcMismatchInfo | undefined;
66+
if (acceptStat === RpcAcceptStat.PROG_MISMATCH) {
67+
if (reader.size() < 8) {
68+
reader.x = startPos;
69+
return undefined;
70+
}
71+
const low = reader.u32();
72+
const high = reader.u32();
73+
mismatchInfo = new RpcMismatchInfo(low, high);
74+
}
75+
const results = reader.size() > 0 ? reader.cut(reader.size()) : undefined;
76+
message = new RpcAcceptedReplyMessage(xid, verf, acceptStat, mismatchInfo, results);
4177
} else if (replyStat === RpcReplyStat.MSG_DENIED) {
42-
const reply = this.readRejectedReply(reader);
43-
if (!reply) {
78+
if (reader.size() < 4) {
4479
reader.x = startPos;
4580
return undefined;
4681
}
47-
message = new RpcMessage(xid, reply);
82+
const rejectStat = reader.u32();
83+
let mismatchInfo: RpcMismatchInfo | undefined;
84+
let authStat: number | undefined;
85+
if (rejectStat === RpcRejectStat.RPC_MISMATCH) {
86+
if (reader.size() < 8) {
87+
reader.x = startPos;
88+
return undefined;
89+
}
90+
const low = reader.u32();
91+
const high = reader.u32();
92+
mismatchInfo = new RpcMismatchInfo(low, high);
93+
} else if (rejectStat === RpcRejectStat.AUTH_ERROR) {
94+
if (reader.size() < 4) {
95+
reader.x = startPos;
96+
return undefined;
97+
}
98+
authStat = reader.u32();
99+
}
100+
message = new RpcRejectedReplyMessage(xid, rejectStat, mismatchInfo, authStat);
48101
} else {
49102
throw new RpcDecodingError('Invalid reply_stat');
50103
}
51104
} else {
52105
throw new RpcDecodingError('Invalid msg_type');
53106
}
54-
reader.consume();
55107
return message;
56108
} catch (err) {
57109
if (err instanceof RangeError) {
@@ -62,55 +114,7 @@ export class RpcMessageDecoder {
62114
}
63115
}
64116

65-
private readCallBody(reader: StreamingReader): RpcCallBody | undefined {
66-
if (reader.size() < 20) return undefined;
67-
const rpcvers = reader.u32();
68-
if (rpcvers !== RPC_VERSION) {
69-
throw new RpcDecodingError(`Unsupported RPC version: ${rpcvers}`);
70-
}
71-
const prog = reader.u32();
72-
const vers = reader.u32();
73-
const proc = reader.u32();
74-
const cred = this.readOpaqueAuth(reader);
75-
if (!cred) return undefined;
76-
const verf = this.readOpaqueAuth(reader);
77-
if (!verf) return undefined;
78-
return new RpcCallBody(rpcvers, prog, vers, proc, cred, verf);
79-
}
80-
81-
private readAcceptedReply(reader: StreamingReader): RpcAcceptedReply | undefined {
82-
const verf = this.readOpaqueAuth(reader);
83-
if (!verf) return undefined;
84-
if (reader.size() < 4) return undefined;
85-
const acceptStat = reader.u32();
86-
let mismatchInfo: RpcMismatchInfo | undefined;
87-
if (acceptStat === RpcAcceptStat.PROG_MISMATCH) {
88-
if (reader.size() < 8) return undefined;
89-
const low = reader.u32();
90-
const high = reader.u32();
91-
mismatchInfo = new RpcMismatchInfo(low, high);
92-
}
93-
return new RpcAcceptedReply(verf, acceptStat, mismatchInfo, undefined);
94-
}
95-
96-
private readRejectedReply(reader: StreamingReader): RpcRejectedReply | undefined {
97-
if (reader.size() < 4) return undefined;
98-
const rejectStat = reader.u32();
99-
let mismatchInfo: RpcMismatchInfo | undefined;
100-
let authStat: number | undefined;
101-
if (rejectStat === RpcRejectStat.RPC_MISMATCH) {
102-
if (reader.size() < 8) return undefined;
103-
const low = reader.u32();
104-
const high = reader.u32();
105-
mismatchInfo = new RpcMismatchInfo(low, high);
106-
} else if (rejectStat === RpcRejectStat.AUTH_ERROR) {
107-
if (reader.size() < 4) return undefined;
108-
authStat = reader.u32();
109-
}
110-
return new RpcRejectedReply(rejectStat, mismatchInfo, authStat);
111-
}
112-
113-
private readOpaqueAuth(reader: StreamingReader): RpcOpaqueAuth | undefined {
117+
private readOpaqueAuth(reader: Reader): RpcOpaqueAuth | undefined {
114118
if (reader.size() < 8) return undefined;
115119
const flavor = reader.u32();
116120
const length = reader.u32();

src/rpc/RpcMessageEncoder.ts

Lines changed: 40 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
11
import {Writer} from '@jsonjoy.com/util/lib/buffers/Writer';
22
import type {IWriter, IWriterGrowable} from '@jsonjoy.com/util/lib/buffers';
3+
import type {Reader} from '@jsonjoy.com/buffers/lib/Reader';
34
import {RpcMsgType, RpcReplyStat, RPC_VERSION} from './constants';
45
import {RpcEncodingError} from './errors';
5-
import {RpcOpaqueAuth, RpcCallBody, RpcAcceptedReply, RpcRejectedReply, RpcMessage} from './messages';
6+
import {
7+
RpcOpaqueAuth,
8+
RpcCallMessage,
9+
RpcAcceptedReplyMessage,
10+
RpcRejectedReplyMessage,
11+
RpcMessage,
12+
} from './messages';
613

714
export class RpcMessageEncoder<W extends IWriter & IWriterGrowable = IWriter & IWriterGrowable> {
815
constructor(public readonly writer: W = new Writer() as any) {}
@@ -14,7 +21,7 @@ export class RpcMessageEncoder<W extends IWriter & IWriterGrowable = IWriter & I
1421
proc: number,
1522
cred: RpcOpaqueAuth,
1623
verf: RpcOpaqueAuth,
17-
params?: Uint8Array,
24+
params?: Reader | Uint8Array,
1825
): Uint8Array {
1926
this.writeCall(xid, prog, vers, proc, cred, verf, params);
2027
return this.writer.flush();
@@ -25,7 +32,7 @@ export class RpcMessageEncoder<W extends IWriter & IWriterGrowable = IWriter & I
2532
verf: RpcOpaqueAuth,
2633
acceptStat: number,
2734
mismatchInfo?: {low: number; high: number},
28-
results?: Uint8Array,
35+
results?: Reader | Uint8Array,
2936
): Uint8Array {
3037
this.writeAcceptedReply(xid, verf, acceptStat, mismatchInfo, results);
3138
return this.writer.flush();
@@ -47,23 +54,12 @@ export class RpcMessageEncoder<W extends IWriter & IWriterGrowable = IWriter & I
4754
}
4855

4956
public writeMessage(msg: RpcMessage): void {
50-
const writer = this.writer;
51-
writer.u32(msg.xid);
52-
const body = msg.body;
53-
if (body instanceof RpcCallBody) {
54-
writer.u32(RpcMsgType.CALL);
55-
this.writeCallBody(body);
56-
if (body.params && body.params.length > 0) {
57-
writer.buf(body.params, body.params.length);
58-
}
59-
} else if (body instanceof RpcAcceptedReply) {
60-
writer.u32(RpcMsgType.REPLY);
61-
writer.u32(RpcReplyStat.MSG_ACCEPTED);
62-
this.writeAcceptedReplyBody(body.verf, body.stat, body.mismatchInfo, body.results);
63-
} else if (body instanceof RpcRejectedReply) {
64-
writer.u32(RpcMsgType.REPLY);
65-
writer.u32(RpcReplyStat.MSG_DENIED);
66-
this.writeRejectedReplyBody(body.stat, body.mismatchInfo, body.authStat);
57+
if (msg instanceof RpcCallMessage) {
58+
this.writeCall(msg.xid, msg.prog, msg.vers, msg.proc, msg.cred, msg.verf, msg.params);
59+
} else if (msg instanceof RpcAcceptedReplyMessage) {
60+
this.writeAcceptedReply(msg.xid, msg.verf, msg.stat, msg.mismatchInfo, msg.results);
61+
} else if (msg instanceof RpcRejectedReplyMessage) {
62+
this.writeRejectedReply(msg.xid, msg.stat, msg.mismatchInfo, msg.authStat);
6763
}
6864
}
6965

@@ -74,7 +70,7 @@ export class RpcMessageEncoder<W extends IWriter & IWriterGrowable = IWriter & I
7470
proc: number,
7571
cred: RpcOpaqueAuth,
7672
verf: RpcOpaqueAuth,
77-
params?: Uint8Array,
73+
params?: Reader | Uint8Array,
7874
): void {
7975
const writer = this.writer;
8076
writer.u32(xid);
@@ -85,50 +81,48 @@ export class RpcMessageEncoder<W extends IWriter & IWriterGrowable = IWriter & I
8581
writer.u32(proc);
8682
this.writeOpaqueAuth(cred);
8783
this.writeOpaqueAuth(verf);
88-
if (params && params.length > 0) {
89-
writer.buf(params, params.length);
84+
if (params) {
85+
if (params instanceof Uint8Array) {
86+
if (params.length > 0) {
87+
writer.buf(params, params.length);
88+
}
89+
} else {
90+
const size = params.size();
91+
if (size > 0) {
92+
writer.buf(params.uint8, size);
93+
}
94+
}
9095
}
9196
}
9297

93-
private writeCallBody(body: RpcCallBody): void {
94-
const writer = this.writer;
95-
writer.u32(body.rpcvers);
96-
writer.u32(body.prog);
97-
writer.u32(body.vers);
98-
writer.u32(body.proc);
99-
this.writeOpaqueAuth(body.cred);
100-
this.writeOpaqueAuth(body.verf);
101-
}
102-
10398
public writeAcceptedReply(
10499
xid: number,
105100
verf: RpcOpaqueAuth,
106101
acceptStat: number,
107102
mismatchInfo?: {low: number; high: number},
108-
results?: Uint8Array,
103+
results?: Reader | Uint8Array,
109104
): void {
110105
const writer = this.writer;
111106
writer.u32(xid);
112107
writer.u32(RpcMsgType.REPLY);
113108
writer.u32(RpcReplyStat.MSG_ACCEPTED);
114-
this.writeAcceptedReplyBody(verf, acceptStat, mismatchInfo, results);
115-
}
116-
117-
private writeAcceptedReplyBody(
118-
verf: RpcOpaqueAuth,
119-
acceptStat: number,
120-
mismatchInfo?: {low: number; high: number},
121-
results?: Uint8Array,
122-
): void {
123-
const writer = this.writer;
124109
this.writeOpaqueAuth(verf);
125110
writer.u32(acceptStat);
126111
if (mismatchInfo) {
127112
writer.u32(mismatchInfo.low);
128113
writer.u32(mismatchInfo.high);
129114
}
130-
if (results && results.length > 0) {
131-
writer.buf(results, results.length);
115+
if (results) {
116+
if (results instanceof Uint8Array) {
117+
if (results.length > 0) {
118+
writer.buf(results, results.length);
119+
}
120+
} else {
121+
const size = results.size();
122+
if (size > 0) {
123+
writer.buf(results.uint8, size);
124+
}
125+
}
132126
}
133127
}
134128

@@ -142,15 +136,6 @@ export class RpcMessageEncoder<W extends IWriter & IWriterGrowable = IWriter & I
142136
writer.u32(xid);
143137
writer.u32(RpcMsgType.REPLY);
144138
writer.u32(RpcReplyStat.MSG_DENIED);
145-
this.writeRejectedReplyBody(rejectStat, mismatchInfo, authStat);
146-
}
147-
148-
private writeRejectedReplyBody(
149-
rejectStat: number,
150-
mismatchInfo?: {low: number; high: number},
151-
authStat?: number,
152-
): void {
153-
const writer = this.writer;
154139
writer.u32(rejectStat);
155140
if (mismatchInfo) {
156141
writer.u32(mismatchInfo.low);

0 commit comments

Comments
 (0)