Skip to content

Commit 0203252

Browse files
authored
retry workflows grpc stream (microsoft#66)
Signed-off-by: Fabian Martinez <[email protected]>
1 parent ad93993 commit 0203252

File tree

1 file changed

+76
-37
lines changed

1 file changed

+76
-37
lines changed

src/worker/task-hub-grpc-worker.ts

Lines changed: 76 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ export class TaskHubGrpcWorker {
2424
private _tls?: boolean;
2525
private _grpcChannelOptions?: grpc.ChannelOptions;
2626
private _isRunning: boolean;
27+
private _stopWorker: boolean;
2728
private _stub: stubs.TaskHubSidecarServiceClient | null;
2829

2930
constructor(hostAddress?: string, options?: grpc.ChannelOptions, useTLS?: boolean) {
@@ -33,6 +34,7 @@ export class TaskHubGrpcWorker {
3334
this._grpcChannelOptions = options;
3435
this._responseStream = null;
3536
this._isRunning = false;
37+
this._stopWorker = false;
3638
this._stub = null;
3739
}
3840

@@ -99,52 +101,83 @@ export class TaskHubGrpcWorker {
99101
* Therefore, we open the stream and simply listen through the eventemitter behind the scenes
100102
*/
101103
async start(): Promise<void> {
102-
const client = new GrpcClient(this._hostAddress, this._grpcChannelOptions, this._tls);
103-
104104
if (this._isRunning) {
105105
throw new Error("The worker is already running.");
106106
}
107107

108-
// send a "Hello" message to the sidecar to ensure that it's listening
109-
const prom = promisify(client.stub.hello.bind(client.stub));
110-
await prom(new Empty());
111-
112-
// Stream work items from the sidecar
113-
const stubGetWorkItemsReq = new pb.GetWorkItemsRequest();
108+
const client = new GrpcClient(this._hostAddress, this._grpcChannelOptions, this._tls);
114109
this._stub = client.stub;
115-
this._responseStream = client.stub.getWorkItems(stubGetWorkItemsReq);
116-
117-
console.log(`Successfully connected to ${this._hostAddress}. Waiting for work items...`);
118-
119-
// Wait for a work item to be received
120-
this._responseStream.on("data", (workItem: pb.WorkItem) => {
121-
if (workItem.hasOrchestratorrequest()) {
122-
console.log(
123-
`Received "Orchestrator Request" work item with instance id '${workItem
124-
?.getOrchestratorrequest()
125-
?.getInstanceid()}'`,
126-
);
127-
this._executeOrchestrator(workItem.getOrchestratorrequest() as any, client.stub);
128-
} else if (workItem.hasActivityrequest()) {
129-
console.log(`Received "Activity Request" work item`);
130-
this._executeActivity(workItem.getActivityrequest() as any, client.stub);
131-
} else {
132-
console.log(`Received unknown work item`);
133-
}
134-
});
135-
136-
// Wait for the stream to end or error
137-
this._responseStream.on("end", () => {
138-
console.log("Stream ended");
139-
});
140110

141-
this._responseStream.on("error", (err: Error) => {
142-
console.log("Stream error", err);
143-
});
111+
// do not await so it runs in the background
112+
this.internalRunWorker(client);
144113

145114
this._isRunning = true;
146115
}
147116

117+
async internalRunWorker(client: GrpcClient, isRetry: boolean = false): Promise<void> {
118+
try {
119+
// send a "Hello" message to the sidecar to ensure that it's listening
120+
const prom = promisify(client.stub.hello.bind(client.stub));
121+
await prom(new Empty());
122+
123+
// Stream work items from the sidecar
124+
const stream = client.stub.getWorkItems(new pb.GetWorkItemsRequest());
125+
this._responseStream = stream;
126+
127+
console.log(`Successfully connected to ${this._hostAddress}. Waiting for work items...`);
128+
129+
// Wait for a work item to be received
130+
stream.on("data", (workItem: pb.WorkItem) => {
131+
if (workItem.hasOrchestratorrequest()) {
132+
console.log(
133+
`Received "Orchestrator Request" work item with instance id '${workItem
134+
?.getOrchestratorrequest()
135+
?.getInstanceid()}'`,
136+
);
137+
this._executeOrchestrator(workItem.getOrchestratorrequest() as any, client.stub);
138+
} else if (workItem.hasActivityrequest()) {
139+
console.log(`Received "Activity Request" work item`);
140+
this._executeActivity(workItem.getActivityrequest() as any, client.stub);
141+
} else {
142+
console.log(`Received unknown work item`);
143+
}
144+
});
145+
146+
// Wait for the stream to end or error
147+
stream.on("end", async () => {
148+
stream.cancel();
149+
stream.destroy();
150+
if (this._stopWorker) {
151+
console.log("Stream ended");
152+
return;
153+
}
154+
console.log("Stream abruptly closed, will retry the connection...");
155+
// TODO consider exponential backoff
156+
await sleep(5000);
157+
// do not await
158+
this.internalRunWorker(client, true);
159+
});
160+
161+
stream.on("error", (err: Error) => {
162+
console.log("Stream error", err);
163+
});
164+
} catch (err) {
165+
if (this._stopWorker) {
166+
// ignoring the error because the worker has been stopped
167+
return;
168+
}
169+
console.log(`Error on grpc stream: ${err}`);
170+
if (!isRetry) {
171+
throw err;
172+
}
173+
console.log("Connection will be retried...");
174+
// TODO consider exponential backoff
175+
await sleep(5000);
176+
this.internalRunWorker(client, true);
177+
return;
178+
}
179+
}
180+
148181
/**
149182
* Stop the worker and wait for any pending work items to complete
150183
*/
@@ -153,6 +186,8 @@ export class TaskHubGrpcWorker {
153186
throw new Error("The worker is not running.");
154187
}
155188

189+
this._stopWorker = true;
190+
156191
this._responseStream?.cancel();
157192
this._responseStream?.destroy();
158193

@@ -162,7 +197,7 @@ export class TaskHubGrpcWorker {
162197

163198
// Wait a bit to let the async operations finish
164199
// https://github.com/grpc/grpc-node/issues/1563#issuecomment-829483711
165-
await new Promise((resolve) => setTimeout(resolve, 1000));
200+
await sleep(1000);
166201
}
167202

168203
/**
@@ -265,3 +300,7 @@ export class TaskHubGrpcWorker {
265300
}
266301
}
267302
}
303+
304+
function sleep(ms: number): Promise<void> {
305+
return new Promise((resolve) => setTimeout(resolve, ms));
306+
}

0 commit comments

Comments
 (0)