Skip to content

Commit 339eb37

Browse files
committed
grpc-js: Refactor in preparation for retries
1 parent 422e8cb commit 339eb37

33 files changed

+1886
-1878
lines changed

packages/grpc-js/src/call-credentials-filter.ts

Lines changed: 0 additions & 86 deletions
This file was deleted.
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Copyright 2022 gRPC authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
import { CallCredentials } from "./call-credentials";
19+
import { Status } from "./constants";
20+
import { Deadline } from "./deadline";
21+
import { Metadata } from "./metadata";
22+
import { ServerSurfaceCall } from "./server-call";
23+
24+
export interface CallStreamOptions {
25+
deadline: Deadline;
26+
flags: number;
27+
host: string;
28+
parentCall: ServerSurfaceCall | null;
29+
}
30+
31+
export type PartialCallStreamOptions = Partial<CallStreamOptions>;
32+
33+
export interface StatusObject {
34+
code: Status;
35+
details: string;
36+
metadata: Metadata;
37+
}
38+
39+
export const enum WriteFlags {
40+
BufferHint = 1,
41+
NoCompress = 2,
42+
WriteThrough = 4,
43+
}
44+
45+
export interface WriteObject {
46+
message: Buffer;
47+
flags?: number;
48+
}
49+
50+
export interface MetadataListener {
51+
(metadata: Metadata, next: (metadata: Metadata) => void): void;
52+
}
53+
54+
export interface MessageListener {
55+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
56+
(message: any, next: (message: any) => void): void;
57+
}
58+
59+
export interface StatusListener {
60+
(status: StatusObject, next: (status: StatusObject) => void): void;
61+
}
62+
63+
export interface FullListener {
64+
onReceiveMetadata: MetadataListener;
65+
onReceiveMessage: MessageListener;
66+
onReceiveStatus: StatusListener;
67+
}
68+
69+
export type Listener = Partial<FullListener>;
70+
71+
/**
72+
* An object with methods for handling the responses to a call.
73+
*/
74+
export interface InterceptingListener {
75+
onReceiveMetadata(metadata: Metadata): void;
76+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
77+
onReceiveMessage(message: any): void;
78+
onReceiveStatus(status: StatusObject): void;
79+
}
80+
81+
export function isInterceptingListener(
82+
listener: Listener | InterceptingListener
83+
): listener is InterceptingListener {
84+
return (
85+
listener.onReceiveMetadata !== undefined &&
86+
listener.onReceiveMetadata.length === 1
87+
);
88+
}
89+
90+
export class InterceptingListenerImpl implements InterceptingListener {
91+
private processingMetadata = false;
92+
private hasPendingMessage = false;
93+
private pendingMessage: any;
94+
private processingMessage = false;
95+
private pendingStatus: StatusObject | null = null;
96+
constructor(
97+
private listener: FullListener,
98+
private nextListener: InterceptingListener
99+
) {}
100+
101+
private processPendingMessage() {
102+
if (this.hasPendingMessage) {
103+
this.nextListener.onReceiveMessage(this.pendingMessage);
104+
this.pendingMessage = null;
105+
this.hasPendingMessage = false;
106+
}
107+
}
108+
109+
private processPendingStatus() {
110+
if (this.pendingStatus) {
111+
this.nextListener.onReceiveStatus(this.pendingStatus);
112+
}
113+
}
114+
115+
onReceiveMetadata(metadata: Metadata): void {
116+
this.processingMetadata = true;
117+
this.listener.onReceiveMetadata(metadata, (metadata) => {
118+
this.processingMetadata = false;
119+
this.nextListener.onReceiveMetadata(metadata);
120+
this.processPendingMessage();
121+
this.processPendingStatus();
122+
});
123+
}
124+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
125+
onReceiveMessage(message: any): void {
126+
/* If this listener processes messages asynchronously, the last message may
127+
* be reordered with respect to the status */
128+
this.processingMessage = true;
129+
this.listener.onReceiveMessage(message, (msg) => {
130+
this.processingMessage = false;
131+
if (this.processingMetadata) {
132+
this.pendingMessage = msg;
133+
this.hasPendingMessage = true;
134+
} else {
135+
this.nextListener.onReceiveMessage(msg);
136+
this.processPendingStatus();
137+
}
138+
});
139+
}
140+
onReceiveStatus(status: StatusObject): void {
141+
this.listener.onReceiveStatus(status, (processedStatus) => {
142+
if (this.processingMetadata || this.processingMessage) {
143+
this.pendingStatus = processedStatus;
144+
} else {
145+
this.nextListener.onReceiveStatus(processedStatus);
146+
}
147+
});
148+
}
149+
}
150+
151+
export interface WriteCallback {
152+
(error?: Error | null): void;
153+
}
154+
155+
export interface MessageContext {
156+
callback?: WriteCallback;
157+
flags?: number;
158+
}
159+
160+
export interface Call {
161+
cancelWithStatus(status: Status, details: string): void;
162+
getPeer(): string;
163+
start(metadata: Metadata, listener: InterceptingListener): void;
164+
sendMessageWithContext(context: MessageContext, message: Buffer): void;
165+
startRead(): void;
166+
halfClose(): void;
167+
getCallNumber(): number;
168+
setCredentials(credentials: CallCredentials): void;
169+
}

packages/grpc-js/src/call-number.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2022 gRPC authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
let nextCallNumber = 0;
19+
20+
export function getNextCallNumber() {
21+
return nextCallNumber++;
22+
}

0 commit comments

Comments
 (0)