Skip to content

Commit 604a83d

Browse files
Fix Websocket Cancellation Handling (#917)
* Continue reading websocket to handle closing * added websocket transport to cancel propagation test (#904) (cherry picked from commit 36be481) * Added cancellation test Co-authored-by: Wiktor Jurkiewicz <[email protected]>
1 parent 5cc8ddf commit 604a83d

File tree

3 files changed

+197
-83
lines changed

3 files changed

+197
-83
lines changed

go/grpcweb/websocket_wrapper.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,16 @@ func (w *webSocketWrappedReader) Read(p []byte) (int, error) {
176176

177177
// If the frame consists of only a single byte of value 1 then this indicates the client has finished sending
178178
if len(framePayload) == 1 && framePayload[0] == 1 {
179+
go func() {
180+
for {
181+
messageType, _, err := w.wsConn.Read(w.context)
182+
if err == io.EOF || messageType == 0 {
183+
// The client has closed the connection. Indicate to the response writer that it should close
184+
w.cancel()
185+
return
186+
}
187+
}
188+
}()
179189
return 0, io.EOF
180190
}
181191

integration_test/ts/src/cancellation.spec.ts

Lines changed: 82 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import {
1414
} from "../_proto/improbable/grpcweb/test/test_pb";
1515
import {TestService, TestUtilService} from "../_proto/improbable/grpcweb/test/test_pb_service";
1616
import {DEBUG, continueStream} from "./util";
17-
import { runWithHttp1AndHttp2 } from "./testRpcCombinations";
17+
import { runWithHttp1AndHttp2, runWithSupportedTransports } from "./testRpcCombinations";
1818

1919
describe("Cancellation", () => {
2020
runWithHttp1AndHttp2(({testHostUrl}) => {
@@ -52,91 +52,94 @@ describe("Cancellation", () => {
5252
assert.equal(transportCancelFuncInvoked, true, "transport's cancel func must be invoked");
5353
});
5454

55-
it("should handle aborting a streaming response mid-stream with propagation of the disconnection to the server", (done) => {
56-
let onMessageId = 0;
55+
runWithSupportedTransports((transport) => {
56+
it("should handle aborting a streaming response mid-stream with propagation of the disconnection to the server", (done) => {
57+
let onMessageId = 0;
5758

58-
const streamIdentifier = `rpc-${Math.random()}`;
59+
const streamIdentifier = `rpc-${Math.random()}`;
5960

60-
const ping = new PingRequest();
61-
ping.setValue("hello world");
62-
ping.setResponseCount(100); // Request more messages than the client will accept before cancelling
63-
ping.setStreamIdentifier(streamIdentifier);
64-
65-
let reqObj: grpc.Request;
66-
67-
// Checks are performed every 1s = 15s total wait
68-
const maxAbortChecks = 15;
69-
70-
const numMessagesBeforeAbort = 5;
71-
72-
const doAbort = () => {
73-
DEBUG && debug("doAbort");
74-
reqObj.close();
75-
76-
// To ensure that the transport is successfully closing the connection, poll the server every 1s until
77-
// it confirms the connection was closed. Connection closure is immediate in some browser/transport combinations,
78-
// but can take several seconds in others.
79-
function checkAbort(attempt: number) {
80-
DEBUG && debug("checkAbort", attempt);
81-
continueStream(testHostUrl, streamIdentifier, (status) => {
82-
DEBUG && debug("checkAbort.continueStream.status", status);
83-
84-
const checkStreamClosedRequest = new CheckStreamClosedRequest();
85-
checkStreamClosedRequest.setStreamIdentifier(streamIdentifier);
86-
grpc.unary(TestUtilService.CheckStreamClosed, {
87-
debug: DEBUG,
88-
request: checkStreamClosedRequest,
89-
host: testHostUrl,
90-
onEnd: ({message}) => {
91-
const closed = ( message as CheckStreamClosedResponse ).getClosed();
92-
DEBUG && debug("closed", closed);
93-
if (closed) {
94-
done();
95-
} else {
96-
if (attempt >= maxAbortChecks) {
97-
assert.ok(closed, `server did not observe connection closure within ${maxAbortChecks} seconds`);
61+
const ping = new PingRequest();
62+
ping.setValue("hello world");
63+
ping.setResponseCount(100); // Request more messages than the client will accept before cancelling
64+
ping.setStreamIdentifier(streamIdentifier);
65+
66+
let reqObj: grpc.Request;
67+
68+
// Checks are performed every 1s = 15s total wait
69+
const maxAbortChecks = 15;
70+
71+
const numMessagesBeforeAbort = 5;
72+
73+
const doAbort = () => {
74+
DEBUG && debug("doAbort");
75+
reqObj.close();
76+
77+
// To ensure that the transport is successfully closing the connection, poll the server every 1s until
78+
// it confirms the connection was closed. Connection closure is immediate in some browser/transport combinations,
79+
// but can take several seconds in others.
80+
function checkAbort(attempt: number) {
81+
DEBUG && debug("checkAbort", attempt);
82+
continueStream(testHostUrl, streamIdentifier, (status) => {
83+
DEBUG && debug("checkAbort.continueStream.status", status);
84+
85+
const checkStreamClosedRequest = new CheckStreamClosedRequest();
86+
checkStreamClosedRequest.setStreamIdentifier(streamIdentifier);
87+
grpc.unary(TestUtilService.CheckStreamClosed, {
88+
debug: DEBUG,
89+
request: checkStreamClosedRequest,
90+
host: testHostUrl,
91+
onEnd: ({message}) => {
92+
const closed = ( message as CheckStreamClosedResponse ).getClosed();
93+
DEBUG && debug("closed", closed);
94+
if (closed) {
9895
done();
9996
} else {
100-
setTimeout(() => {
101-
checkAbort(attempt + 1);
102-
}, 1000);
97+
if (attempt >= maxAbortChecks) {
98+
assert.ok(closed, `server did not observe connection closure within ${maxAbortChecks} seconds`);
99+
done();
100+
} else {
101+
setTimeout(() => {
102+
checkAbort(attempt + 1);
103+
}, 1000);
104+
}
103105
}
104-
}
105-
},
106-
})
107-
});
108-
}
106+
},
107+
})
108+
});
109+
}
109110

110-
checkAbort(0);
111-
};
111+
checkAbort(0);
112+
};
112113

113-
reqObj = grpc.invoke(TestService.PingList, {
114-
debug: DEBUG,
115-
request: ping,
116-
host: testHostUrl,
117-
onHeaders: (headers: grpc.Metadata) => {
118-
DEBUG && debug("headers", headers);
119-
},
120-
onMessage: (message: PingResponse) => {
121-
assert.ok(message instanceof PingResponse);
122-
DEBUG && debug("onMessage.message.getCounter()", message.getCounter());
123-
assert.strictEqual(message.getCounter(), onMessageId++);
124-
if (message.getCounter() === numMessagesBeforeAbort) {
125-
// Abort after receiving numMessagesBeforeAbort messages
126-
doAbort();
127-
} else if (message.getCounter() < numMessagesBeforeAbort) {
128-
// Only request the next message if not yet aborted
129-
continueStream(testHostUrl, streamIdentifier, (status) => {
130-
DEBUG && debug("onMessage.continueStream.status", status);
131-
});
114+
reqObj = grpc.invoke(TestService.PingList, {
115+
debug: DEBUG,
116+
request: ping,
117+
host: testHostUrl,
118+
transport: transport,
119+
onHeaders: (headers: grpc.Metadata) => {
120+
DEBUG && debug("headers", headers);
121+
},
122+
onMessage: (message: PingResponse) => {
123+
assert.ok(message instanceof PingResponse);
124+
DEBUG && debug("onMessage.message.getCounter()", message.getCounter());
125+
assert.strictEqual(message.getCounter(), onMessageId++);
126+
if (message.getCounter() === numMessagesBeforeAbort) {
127+
// Abort after receiving numMessagesBeforeAbort messages
128+
doAbort();
129+
} else if (message.getCounter() < numMessagesBeforeAbort) {
130+
// Only request the next message if not yet aborted
131+
continueStream(testHostUrl, streamIdentifier, (status) => {
132+
DEBUG && debug("onMessage.continueStream.status", status);
133+
});
134+
}
135+
},
136+
onEnd: (status: grpc.Code, statusMessage: string, trailers: grpc.Metadata) => {
137+
DEBUG && debug("status", status, "statusMessage", statusMessage, "trailers", trailers);
138+
// onEnd shouldn't be called if abort is called prior to the response ending
139+
assert.fail();
132140
}
133-
},
134-
onEnd: (status: grpc.Code, statusMessage: string, trailers: grpc.Metadata) => {
135-
DEBUG && debug("status", status, "statusMessage", statusMessage, "trailers", trailers);
136-
// onEnd shouldn't be called if abort is called prior to the response ending
137-
assert.fail();
138-
}
139-
});
140-
}, 20000);
141+
});
142+
}, 20000);
143+
})
141144
});
142145
});

integration_test/ts/src/client.websocket.spec.ts

Lines changed: 105 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,14 @@ import { grpc } from "@improbable-eng/grpc-web";
44
import { debug } from "../../../client/grpc-web/src/debug";
55
import { assert } from "chai";
66
// Generated Test Classes
7-
import { PingRequest, PingResponse } from "../_proto/improbable/grpcweb/test/test_pb";
8-
import { TestService } from "../_proto/improbable/grpcweb/test/test_pb_service";
9-
import { DEBUG, DISABLE_WEBSOCKET_TESTS } from "./util";
7+
import {
8+
CheckStreamClosedRequest,
9+
CheckStreamClosedResponse,
10+
PingRequest,
11+
PingResponse
12+
} from "../_proto/improbable/grpcweb/test/test_pb";
13+
import { TestService, TestUtilService } from "../_proto/improbable/grpcweb/test/test_pb_service";
14+
import { continueStream, DEBUG, DISABLE_WEBSOCKET_TESTS } from "./util";
1015
import { headerTrailerCombos, runWithHttp1AndHttp2 } from "./testRpcCombinations";
1116

1217
if (DISABLE_WEBSOCKET_TESTS) {
@@ -73,7 +78,7 @@ if (DISABLE_WEBSOCKET_TESTS) {
7378

7479
describe("bidirectional (websockets)", () => {
7580
headerTrailerCombos((withHeaders, withTrailers) => {
76-
it("should make a bidirectional request that is terminated by the client", (done) => {
81+
it("should make a bidirectional request that is ended by the client finishing sending", (done) => {
7782
let didGetOnHeaders = false;
7883
let counter = 1;
7984
let lastMessage = `helloworld:${counter}`;
@@ -129,6 +134,102 @@ if (DISABLE_WEBSOCKET_TESTS) {
129134
});
130135
});
131136

137+
headerTrailerCombos((withHeaders, withTrailers) => {
138+
it("should make a bidirectional request that is aborted by the client with propagation of the disconnection to the server", (done) => {
139+
let didGetOnHeaders = false;
140+
let counter = 1;
141+
const streamIdentifier = `rpc-${Math.random()}`;
142+
let lastMessage = `helloworld:${counter}`;
143+
const ping = new PingRequest();
144+
ping.setStreamIdentifier(streamIdentifier);
145+
ping.setSendHeaders(withHeaders);
146+
ping.setSendTrailers(withTrailers);
147+
ping.setValue(lastMessage);
148+
149+
const client = grpc.client(TestService.PingPongBidi, {
150+
debug: DEBUG,
151+
host: testHostUrl,
152+
transport: grpc.WebsocketTransport(),
153+
});
154+
155+
// Checks are performed every 1s = 15s total wait
156+
const maxAbortChecks = 15;
157+
158+
const doAbort = () => {
159+
DEBUG && debug("doAbort");
160+
client.close();
161+
162+
// To ensure that the transport is successfully closing the connection, poll the server every 1s until
163+
// it confirms the connection was closed. Connection closure is immediate in some browser/transport combinations,
164+
// but can take several seconds in others.
165+
function checkAbort(attempt: number) {
166+
DEBUG && debug("checkAbort", attempt);
167+
continueStream(testHostUrl, streamIdentifier, (status) => {
168+
DEBUG && debug("checkAbort.continueStream.status", status);
169+
170+
const checkStreamClosedRequest = new CheckStreamClosedRequest();
171+
checkStreamClosedRequest.setStreamIdentifier(streamIdentifier);
172+
grpc.unary(TestUtilService.CheckStreamClosed, {
173+
debug: DEBUG,
174+
request: checkStreamClosedRequest,
175+
host: testHostUrl,
176+
onEnd: ({message}) => {
177+
const closed = ( message as CheckStreamClosedResponse ).getClosed();
178+
DEBUG && debug("closed", closed);
179+
if (closed) {
180+
done();
181+
} else {
182+
if (attempt >= maxAbortChecks) {
183+
assert.ok(closed, `server did not observe connection closure within ${maxAbortChecks} seconds`);
184+
done();
185+
} else {
186+
setTimeout(() => {
187+
checkAbort(attempt + 1);
188+
}, 1000);
189+
}
190+
}
191+
},
192+
})
193+
});
194+
}
195+
196+
checkAbort(0);
197+
};
198+
199+
client.onHeaders((headers: grpc.Metadata) => {
200+
DEBUG && debug("headers", headers);
201+
didGetOnHeaders = true;
202+
if (withHeaders) {
203+
assert.deepEqual(headers.get("HeaderTestKey1"), ["ServerValue1"]);
204+
assert.deepEqual(headers.get("HeaderTestKey2"), ["ServerValue2"]);
205+
}
206+
});
207+
client.onMessage((message: PingResponse) => {
208+
assert.ok(message instanceof PingResponse);
209+
assert.deepEqual(message.getValue(), lastMessage);
210+
211+
if (counter === 10) {
212+
doAbort();
213+
} else {
214+
counter++;
215+
lastMessage = `helloworld:${counter}`;
216+
const ping = new PingRequest();
217+
ping.setValue(lastMessage);
218+
client.send(ping);
219+
}
220+
});
221+
client.onEnd((status: grpc.Code, statusMessage: string, trailers: grpc.Metadata) => {
222+
DEBUG && debug("status", status, "statusMessage", statusMessage);
223+
// onEnd shouldn't be called if abort is called prior to the response ending
224+
assert.fail();
225+
});
226+
client.start();
227+
228+
// send initial message
229+
client.send(ping);
230+
});
231+
});
232+
132233
headerTrailerCombos((withHeaders, withTrailers) => {
133234
it("should make a bidirectional request that is terminated by the server", (done) => {
134235
let didGetOnHeaders = false;

0 commit comments

Comments
 (0)