Skip to content

Commit 2209411

Browse files
feat: allow HttpClient program to stop preemptively (#248)
This PR adds gracefull handling of the deletion/stopping of the `HttpClient` program. When an unexpected packet is received, the host responds with a RST segment. Also, when the connection is closed the receiver queue is closed now. --------- Co-authored-by: Pedro Gallino <[email protected]>
1 parent a99ec5b commit 2209411

File tree

4 files changed

+213
-57
lines changed

4 files changed

+213
-57
lines changed

src/programs/http_client.ts

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ export class HttpClient extends ProgramBase {
3535
private dstId: DeviceId;
3636
private resource: string;
3737

38+
private stopped = false;
39+
3840
protected _parseInputs(inputs: string[]): void {
3941
if (inputs.length !== 2) {
4042
console.error(
@@ -50,13 +52,14 @@ export class HttpClient extends ProgramBase {
5052
// This starts the request from the background
5153
(async () => {
5254
await this.sendHttpRequest();
53-
this.signalStop();
55+
if (!this.stopped) {
56+
this.signalStop();
57+
}
5458
})();
5559
}
5660

5761
protected _stop() {
58-
// TODO: stop request preemptively?
59-
// Nothing to do
62+
this.stopped = true;
6063
}
6164

6265
private async sendHttpRequest() {
@@ -86,6 +89,10 @@ export class HttpClient extends ProgramBase {
8689
console.warn("HttpClient failed to connect");
8790
return;
8891
}
92+
if (this.stopped) {
93+
socket.abort();
94+
return;
95+
}
8996
const wrote = await socket.write(httpRequest);
9097
if (wrote < 0) {
9198
console.error("HttpClient failed to write to socket");
@@ -100,6 +107,10 @@ export class HttpClient extends ProgramBase {
100107
const expectedLength = RESOURCE_MAP.get(this.resource)?.length || 0;
101108
let totalRead = 0;
102109
while (totalRead < expectedLength) {
110+
if (this.stopped) {
111+
socket.abort();
112+
return;
113+
}
103114
const readLength = await socket.read(buffer);
104115
if (readLength < 0) {
105116
console.error("HttpClient failed to read from socket");

src/types/network-modules/asyncQueue.ts

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,28 +3,58 @@ export class AsyncQueue<T> {
33
private queue: T[] = [];
44
// Queue to hold resolve functions for promises created by pop().
55
private resolveQueue: ((value: T) => void)[] = [];
6+
// Flag to indicate if the queue is closed.
7+
private closed = false;
68

7-
push(item: T) {
8-
if (this.resolveQueue.length > 0) {
9+
/**
10+
* Pushes an item onto the queue.
11+
* @param item The item to push.
12+
* @returns true if the item was pushed, false if the queue is closed.
13+
*/
14+
push(item: T): boolean {
15+
if (this.closed) {
16+
return false;
17+
} else if (this.resolveQueue.length > 0) {
918
const resolve = this.resolveQueue.shift();
1019
if (resolve) {
1120
resolve(item);
1221
}
1322
} else {
1423
this.queue.push(item);
1524
}
25+
return true;
1626
}
1727

18-
// TODO: add timeouts
19-
pop(): Promise<T> {
28+
/**
29+
* Pops an item from the queue.
30+
* @returns A promise that resolves to the popped item, or undefined if the queue was closed.
31+
*/
32+
pop(): Promise<T | undefined> {
2033
if (!this.isEmpty()) {
2134
return Promise.resolve(this.queue.shift());
2235
}
36+
if (this.closed) {
37+
return Promise.resolve(undefined);
38+
}
2339
return new Promise((resolve) => {
2440
this.resolveQueue.push(resolve);
2541
});
2642
}
2743

44+
close() {
45+
this.closed = true;
46+
while (this.resolveQueue.length > 0) {
47+
const resolve = this.resolveQueue.shift();
48+
if (resolve) {
49+
resolve(undefined);
50+
}
51+
}
52+
}
53+
54+
isClosed(): boolean {
55+
return this.closed;
56+
}
57+
2858
isEmpty(): boolean {
2959
return this.queue.length === 0;
3060
}

0 commit comments

Comments
 (0)