Skip to content

Commit 909edb3

Browse files
implement worker pool for high-concurrency NSFW detection
1 parent 60c5f79 commit 909edb3

File tree

2 files changed

+96
-46
lines changed

2 files changed

+96
-46
lines changed

nsfwDetection/nsfwProcess.js

Lines changed: 75 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,90 @@
1-
// nsfwDetection/nsfwProcess.js
21
import { fork } from 'child_process';
32
import path from 'path';
3+
import os from 'os';
44

55
const nsfwServicePath = path.resolve('nsfwDetection/nsfwService.cjs');
6-
const nsfwProcess = fork(nsfwServicePath, [], { stdio: ['pipe', 'pipe', 'pipe', 'ipc'] });
6+
const WORKER_COUNT = os.cpus().length; // Use number of CPU cores
7+
const WORKER_TIMEOUT = 5000; // 5 seconds per request
78

8-
let ready = false;
9-
const pending = new Map();
9+
let workers = [];
10+
let readyWorkers = new Set();
11+
let pending = new Map(); // Map: id -> { resolve, reject, timeout, workerIdx }
12+
let rrIndex = 0; // Round-robin index
1013

11-
nsfwProcess.on('message', (msg) => {
12-
if (msg.type === 'ready') ready = true;
13-
if (msg.type === 'result' && pending.has(msg.id)) {
14-
// Log predictions from the child process
15-
if (msg.predictions) {
16-
console.log('NSFW Predictions:', msg.predictions);
14+
function spawnWorker(idx) {
15+
const worker = fork(nsfwServicePath, [], { stdio: ['pipe', 'pipe', 'pipe', 'ipc'] });
16+
worker.on('message', (msg) => {
17+
if (msg.type === 'ready') {
18+
readyWorkers.add(idx);
1719
}
18-
pending.get(msg.id)(msg.safe);
19-
pending.delete(msg.id);
20-
}
21-
if (msg.type === 'error' && pending.has(msg.id)) {
22-
pending.get(msg.id)(false, msg.error);
23-
pending.delete(msg.id);
24-
}
25-
});
20+
if (msg.type === 'result' && pending.has(msg.id)) {
21+
const { resolve, timeout } = pending.get(msg.id);
22+
clearTimeout(timeout);
23+
resolve(msg.safe);
24+
pending.delete(msg.id);
25+
}
26+
if (msg.type === 'error' && pending.has(msg.id)) {
27+
const { reject, timeout } = pending.get(msg.id);
28+
clearTimeout(timeout);
29+
reject(new Error(msg.error));
30+
pending.delete(msg.id);
31+
}
32+
});
33+
worker.on('exit', () => {
34+
readyWorkers.delete(idx);
35+
// Clean up any pending tasks for this worker
36+
for (const [id, task] of pending.entries()) {
37+
if (task.workerIdx === idx) {
38+
clearTimeout(task.timeout);
39+
task.reject(new Error('NSFW worker crashed'));
40+
pending.delete(id);
41+
}
42+
}
43+
// Restart worker
44+
setTimeout(() => {
45+
workers[idx] = spawnWorker(idx);
46+
}, 2000);
47+
});
48+
worker.on('error', () => {
49+
readyWorkers.delete(idx);
50+
});
51+
return worker;
52+
}
53+
54+
// Initialize worker pool
55+
for (let i = 0; i < WORKER_COUNT; i++) {
56+
workers.push(spawnWorker(i));
57+
}
2658

2759
export function isNsfwReady() {
28-
return ready;
60+
return readyWorkers.size > 0;
2961
}
3062

3163
export function checkNsfw(imagePath) {
3264
return new Promise((resolve, reject) => {
33-
if (!ready) return reject(new Error('NSFW service not ready'));
65+
if (readyWorkers.size === 0) return reject(new Error('NSFW service not ready'));
66+
// Pick next ready worker in round-robin
67+
let tries = 0;
68+
let workerIdx = rrIndex;
69+
while (!readyWorkers.has(workerIdx) && tries < WORKER_COUNT) {
70+
workerIdx = (workerIdx + 1) % WORKER_COUNT;
71+
tries++;
72+
}
73+
if (!readyWorkers.has(workerIdx)) {
74+
return reject(new Error('No NSFW workers ready'));
75+
}
76+
rrIndex = (workerIdx + 1) % WORKER_COUNT;
3477
const id = Date.now() + Math.random();
35-
nsfwProcess.send({ type: 'check', imagePath, id });
36-
pending.set(id, (safe, error) => {
37-
if (error) return reject(new Error(error));
38-
resolve(safe);
39-
});
78+
try {
79+
workers[workerIdx].send({ type: 'check', imagePath, id });
80+
const timeout = setTimeout(() => {
81+
pending.delete(id);
82+
reject(new Error('NSFW check timed out'));
83+
}, WORKER_TIMEOUT);
84+
pending.set(id, { resolve, reject, timeout, workerIdx });
85+
} catch (err) {
86+
pending.delete(id);
87+
reject(new Error('Failed to send image to NSFW service'));
88+
}
4089
});
41-
}
90+
}

nsfwDetection/nsfwService.cjs

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,39 @@
1-
// nsfwDetection/nsfwService.cjs
21
const nsfw = require('nsfwjs');
32
const tf = require('@tensorflow/tfjs-node');
43
const fs = require('fs');
54

65
let model;
76

87
function isUnsafe(predictions) {
9-
// Initialize probabilities with fallbacks
108
const prob = { Porn: 0, Hentai: 0, Sexy: 0, Neutral: 0, Drawing: 0 };
119
predictions.forEach(p => prob[p.className] = p.probability);
1210

13-
// Pre-calculate frequently used values
1411
const maxNsfw = Math.max(prob.Porn, prob.Hentai, prob.Sexy);
1512
const maxSafe = Math.max(prob.Neutral, prob.Drawing);
1613
const topPrediction = predictions.reduce((a, b) =>
1714
a.probability > b.probability ? a : b
1815
);
1916

20-
// 1. Explicit content threshold
2117
if (prob.Porn > 0.35 || prob.Hentai > 0.35) return true;
22-
23-
// 2. Weighted NSFW index
2418
if ((0.5 * prob.Porn + 0.3 * prob.Sexy + 0.2 * prob.Hentai) > 0.4) return true;
25-
26-
// 3. Relative risk ratio (with uncertainty buffer)
2719
if (maxNsfw >= 0.2 && maxSafe > 0 && maxNsfw / maxSafe > 2.5) return true;
28-
29-
// 4. Top class enforcement
30-
if (['Porn', 'Hentai'].includes(topPrediction.className) && topPrediction.probability > 0.25) {
31-
return true;
32-
}
33-
34-
// 5. Sexy content dominance
20+
if (['Porn', 'Hentai'].includes(topPrediction.className) && topPrediction.probability > 0.25) return true;
3521
if (prob.Sexy > 0.6 && prob.Sexy > (prob.Neutral + prob.Drawing)) return true;
3622

37-
// 6. [NEW] Uncertainty check - reject ambiguous predictions
3823
const entropy = -Object.values(prob).reduce((sum, p) => sum + (p * Math.log2(p || 1e-10)), 0);
39-
if (entropy > 0.8 && maxSafe < 0.4) return true; // High uncertainty + low safe confidence
24+
if (entropy > 0.8 && maxSafe < 0.4) return true;
4025

4126
return false;
4227
}
4328

4429
async function loadModel() {
45-
model = await nsfw.load();
46-
process.send && process.send({ type: 'ready' });
30+
try {
31+
model = await nsfw.load();
32+
process.send && process.send({ type: 'ready' });
33+
} catch (err) {
34+
console.error('Model loading failed:', err);
35+
process.exit(1);
36+
}
4737
}
4838

4939
loadModel();
@@ -62,4 +52,15 @@ process.on('message', async (msg) => {
6252
process.send({ type: 'error', error: err.message, id: msg.id });
6353
}
6454
}
65-
});
55+
});
56+
57+
// Handle unexpected crashes
58+
process.on('uncaughtException', (err) => {
59+
console.error('Uncaught Exception:', err);
60+
process.exit(1);
61+
});
62+
63+
process.on('unhandledRejection', (reason) => {
64+
console.error('Unhandled Rejection:', reason);
65+
process.exit(1);
66+
});

0 commit comments

Comments
 (0)