Skip to content

Commit 5891981

Browse files
authored
Merge pull request #13 from devit-tel/feature/add-batch-timeout
Feature/add batch timeout
2 parents d0c5aa6 + bfedb64 commit 5891981

File tree

6 files changed

+131
-61
lines changed

6 files changed

+131
-61
lines changed

package-lock.json

Lines changed: 17 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
"dependencies": {
2121
"@melonade/melonade-declaration": "^0.19.2",
2222
"axios": "^0.20.0",
23-
"node-rdkafka": "^2.9.1",
23+
"node-rdkafka": "^2.10.0",
24+
"promise-timeout": "^1.3.0",
2425
"ramda": "^0.26.1",
2526
"tslib": "^2.0.1"
2627
},
2728
"devDependencies": {
2829
"@types/jest": "^24.0.23",
2930
"@types/node": "^10.17.5",
31+
"@types/promise-timeout": "^1.3.0",
3032
"@types/ramda": "^0.26.33",
3133
"jest": "^26.4.2",
3234
"prettier": "^2.0.5",

src/example/syncWorker.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import { State, Task } from '@melonade/melonade-declaration';
22
import { SyncWorker, TaskStates } from '..';
33

4-
const kafkaServers = process.env['MELONADE_KAFKA_SERVERS'];
5-
const namespace = process.env['MELONADE_NAMESPACE'];
6-
const processManagerUrl =
7-
process.env['MELONADE_PROCESS_MANAGER_URL'] || 'http://localhost:8081';
4+
const kafkaServers = 'localhost:29092';
5+
const namespace = 'docker-compose';
6+
const processManagerUrl = 'http://localhost:8081';
87

98
const sleep = (ms: number) => new Promise((res) => setTimeout(res, ms));
109

10+
// tslint:disable-next-line: no-for-in
1111
for (const forkID in new Array(1).fill(null)) {
1212
for (const workerId of [1, 2, 3]) {
1313
const worker = new SyncWorker(

src/example/worker.ts

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,25 @@
11
import { State, Task } from '@melonade/melonade-declaration';
22
import { Worker } from '..';
33

4-
const kafkaServers = process.env['MELONADE_KAFKA_SERVERS'];
5-
const namespace = process.env['MELONADE_NAMESPACE'];
4+
const kafkaServers = 'localhost:29092';
5+
const namespace = 'docker-compose';
66

7+
const sleep = (ms: number) => new Promise((res) => setTimeout(res, ms));
8+
9+
// tslint:disable-next-line: no-for-in
710
for (const forkID in new Array(1).fill(null)) {
8-
for (const workerId of [1, 2, 3]) {
11+
for (const workerId of [4]) {
912
const worker = new Worker(
1013
// task name
1114
`t${workerId}`,
1215

1316
// process task
14-
(task, _logger, _isTimeOut, updateTask) => {
15-
setTimeout(() => {
16-
updateTask(task, {
17-
status: State.TaskStates.Completed,
18-
});
19-
20-
console.log(`Async Completed ${task.taskName}`);
21-
}, 5000);
22-
23-
console.log(`Processing ${task.taskName}`);
17+
async (task, _logger, _isTimeOut) => {
18+
console.log(`Processing ${task.taskName}: ${task.taskId}`);
19+
await sleep(5 * 1000);
20+
console.log(`Processed ${task.taskName}: ${task.taskId}`);
2421
return {
25-
status: State.TaskStates.Inprogress,
22+
status: State.TaskStates.Completed,
2623
};
2724
},
2825

src/syncWorker.ts

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import {
77
LibrdKafkaError,
88
Message,
99
} from 'node-rdkafka';
10+
import { timeout, TimeoutError } from 'promise-timeout';
11+
import * as R from 'ramda';
1012
import { jsonTryParse } from './utils/common';
1113
import {
1214
isTaskTimeout,
@@ -25,6 +27,7 @@ export interface ISyncWorkerConfig {
2527
autoStart?: boolean;
2628
latencyCompensationMs?: number;
2729
trackingRunningTasks?: boolean;
30+
batchTimeoutMs?: number;
2831
}
2932

3033
export interface ISyncUpdateTask {
@@ -39,6 +42,7 @@ const DEFAULT_WORKER_CONFIG = {
3942
autoStart: true,
4043
latencyCompensationMs: 50,
4144
trackingRunningTasks: false,
45+
batchTimeoutMs: 10 * 60 * 1000, // 10 mins
4246
} as ISyncWorkerConfig;
4347

4448
// Maybe use kafka streamAPI
@@ -57,7 +61,7 @@ export class SyncWorker extends EventEmitter {
5761
isTimeout: boolean,
5862
) => void | Promise<void>;
5963
private runningTasks: {
60-
[taskId: string]: Task.ITask;
64+
[taskId: string]: Task.ITask | string;
6165
} = {};
6266
private tasksName: string | string[];
6367

@@ -98,6 +102,10 @@ export class SyncWorker extends EventEmitter {
98102
'bootstrap.servers': workerConfig.kafkaServers,
99103
'group.id': `melonade-${this.workerConfig.namespace}-client-${tn}`,
100104
'enable.auto.commit': false,
105+
'max.poll.interval.ms': Math.max(
106+
300000,
107+
this.workerConfig.batchTimeoutMs * 5,
108+
),
101109
...kafkaConfig,
102110
},
103111
{ 'auto.offset.reset': 'latest' },
@@ -126,13 +134,14 @@ export class SyncWorker extends EventEmitter {
126134
this.consumer.connect();
127135

128136
process.once('SIGTERM', () => {
137+
console.log(`${this.tasksName}: unsubscribed`);
129138
this.consumer.unsubscribe();
130139
});
131140
}
132141

133142
get health(): {
134143
consumer: 'connected' | 'disconnected';
135-
tasks: { [taskId: string]: Task.ITask };
144+
tasks: { [taskId: string]: Task.ITask | string };
136145
} {
137146
return {
138147
consumer: this.consumer.isConnected() ? 'connected' : 'disconnected',
@@ -143,12 +152,13 @@ export class SyncWorker extends EventEmitter {
143152
consume = (
144153
messageNumber: number = this.workerConfig.maximumPollingTasks,
145154
): Promise<Task.ITask[]> => {
146-
return new Promise((resolve: Function, reject: Function) => {
155+
return new Promise((resolve: Function) => {
147156
this.consumer.consume(
148157
messageNumber,
149158
(error: LibrdKafkaError, messages: Message[]) => {
150159
if (error) {
151-
setTimeout(() => reject(error), 1000);
160+
console.log(`${this.tasksName}: consume error`, error);
161+
setTimeout(() => resolve([]), 1000);
152162
} else {
153163
resolve(
154164
messages.map((message: Kafka.kafkaConsumerMessage) =>
@@ -181,7 +191,12 @@ export class SyncWorker extends EventEmitter {
181191
};
182192

183193
commit = () => {
184-
return this.consumer.commit();
194+
try {
195+
// @ts-ignore
196+
this.consumer.commitSync();
197+
} catch (error) {
198+
console.log(`${this.tasksName}: commit error`, error);
199+
}
185200
};
186201

187202
private dispatchTask = async (task: Task.ITask, isTimeout: boolean) => {
@@ -207,31 +222,45 @@ export class SyncWorker extends EventEmitter {
207222

208223
if (this.workerConfig.trackingRunningTasks) {
209224
this.runningTasks[task.taskId] = task;
225+
} else {
226+
this.runningTasks[task.taskId] = task.taskId;
210227
}
211228

212229
try {
213230
await this.dispatchTask(task, isTimeout);
214231
} catch (error) {
215232
console.warn(this.tasksName, error);
216233
} finally {
217-
if (this.workerConfig.trackingRunningTasks) {
218-
delete this.runningTasks[task.taskId];
219-
}
234+
delete this.runningTasks[task.taskId];
220235
}
221236
};
222237

223238
private poll = async () => {
224239
// https://github.com/nodejs/node/issues/6673
225240
while (this.isSubscribed) {
226-
try {
227-
const tasks = await this.consume();
228-
if (tasks.length > 0) {
229-
await Promise.all(tasks.map(this.processTask));
241+
const tasks = await this.consume();
242+
if (tasks.length > 0) {
243+
try {
244+
if (this.workerConfig.batchTimeoutMs > 0) {
245+
await timeout(
246+
Promise.all(tasks.map(this.processTask)),
247+
this.workerConfig.batchTimeoutMs,
248+
);
249+
} else {
250+
await Promise.all(tasks.map(this.processTask));
251+
}
252+
} catch (error) {
253+
if (error instanceof TimeoutError) {
254+
console.log(
255+
`${this.tasksName}: batch timeout`,
256+
R.keys(this.runningTasks),
257+
);
258+
} else {
259+
console.log(this.tasksName, 'process error', error);
260+
}
261+
} finally {
230262
this.commit();
231263
}
232-
} catch (err) {
233-
// In case of consume error
234-
console.log(this.tasksName, err);
235264
}
236265
}
237266

0 commit comments

Comments
 (0)