Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit 6efbe11

Browse files
committed
chore: Update heartbeat cron job to run every second for more accurate updates
1 parent e1afea2 commit 6efbe11

File tree

2 files changed

+52
-49
lines changed

2 files changed

+52
-49
lines changed

apps/master/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,4 @@ await Promise.all([
5353

5454
new CronJob("* * * * *", updateScience).start();
5555
new CronJob("* * * * *", calculatePresenceUsage).start();
56-
new CronJob("* * * * *", updateHeartbeats).start();
56+
new CronJob("*/1 * * * * *", updateHeartbeats).start();
Lines changed: 51 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,67 @@
11
import pLimit from "p-limit";
22
import { mainLog, mongo, redis } from "../index.js";
33

4-
const limit = pLimit(Number.parseInt(process.env.LIMIT || "5"));
4+
const BATCH_LIMIT = pLimit(Number.parseInt(process.env.BATCH_LIMIT || "5"));
55
const BATCH_SIZE = Number.parseInt(process.env.BATCH_SIZE || "1000");
66

7+
const limit = pLimit(1);
78
export default async function () {
8-
const log = mainLog.extend("updateHeartbeats");
9-
let count = 0;
9+
return limit(async () => {
10+
const log = mainLog.extend("updateHeartbeats");
11+
let count = 0;
1012

11-
const processBatch = async (cursor: string) => {
12-
const result = await redis.hscan(
13-
"pmd-api.heartbeatUpdates",
14-
cursor,
15-
"COUNT",
16-
BATCH_SIZE
17-
);
18-
const newCursor = result[0];
13+
const processBatch = async (cursor: string) => {
14+
const result = await redis.hscan(
15+
"pmd-api.heartbeatUpdates",
16+
cursor,
17+
"COUNT",
18+
BATCH_SIZE
19+
);
20+
const newCursor = result[0];
1921

20-
let batchToDelete = [];
21-
let data = [];
22-
for (let i = 0; i < result[1].length; i += 2) {
23-
const identifier = result[1][i];
24-
data.push(JSON.parse(result[1][i + 1]));
22+
let batchToDelete = [];
23+
let data = [];
24+
for (let i = 0; i < result[1].length; i += 2) {
25+
const identifier = result[1][i];
26+
data.push(JSON.parse(result[1][i + 1]));
2527

26-
batchToDelete.push(identifier);
27-
count++;
28-
}
28+
batchToDelete.push(identifier);
29+
count++;
30+
}
2931

30-
if (batchToDelete.length) {
31-
await redis.hdel("pmd-api.heartbeatUpdates", ...batchToDelete);
32+
if (batchToDelete.length) {
33+
await redis.hdel("pmd-api.heartbeatUpdates", ...batchToDelete);
3234

33-
const res = await mongo
34-
.db("PreMiD")
35-
.collection("heartbeats")
36-
.bulkWrite(
37-
data.map(d => ({
38-
updateOne: {
39-
filter: { identifier: d.identifier },
40-
update: {
41-
$set: { ...d, updated: new Date(d.updated) }
42-
},
43-
upsert: true
44-
}
45-
}))
35+
const res = await mongo
36+
.db("PreMiD")
37+
.collection("heartbeats")
38+
.bulkWrite(
39+
data.map(d => ({
40+
updateOne: {
41+
filter: { identifier: d.identifier },
42+
update: {
43+
$set: { ...d, updated: new Date(d.updated) }
44+
},
45+
upsert: true
46+
}
47+
}))
48+
);
49+
log(
50+
"Batch %s: Inserted %s entries, Updated %s entries",
51+
Math.floor(count / BATCH_SIZE) + 1,
52+
res.upsertedCount,
53+
res.modifiedCount
4654
);
47-
log(
48-
"Batch %s: Inserted %s entries, Updated %s entries",
49-
Math.floor(count / BATCH_SIZE) + 1,
50-
res.upsertedCount,
51-
res.modifiedCount
52-
);
53-
}
55+
}
5456

55-
return newCursor;
56-
};
57+
return newCursor;
58+
};
5759

58-
let cursor = "0";
59-
do {
60-
cursor = await limit(() => processBatch(cursor));
61-
} while (cursor !== "0");
60+
let cursor = "0";
61+
do {
62+
cursor = await BATCH_LIMIT(() => processBatch(cursor));
63+
} while (cursor !== "0");
6264

63-
if (count > 0) log("Updated %s entries", count);
65+
if (count > 0) log("Updated %s entries", count);
66+
});
6467
}

0 commit comments

Comments
 (0)