1
0
Fork 0
forked from svrjs/svrjs

Add termination of unused workers.

This commit is contained in:
Dorian Niemiec 2024-08-26 06:11:19 +02:00
parent 083b2794df
commit 662528847a
2 changed files with 99 additions and 59 deletions

View file

@ -1093,9 +1093,10 @@ let commands = {
} }
}, },
restart: (args, log) => { restart: (args, log) => {
if (cluster.isPrimary === undefined) log("This command is not supported on single-threaded " + name + "."); if (cluster.isPrimary === undefined)
log("This command is not supported on single-threaded " + name + ".");
else log("This command need to be run in " + name + " master."); else log("This command need to be run in " + name + " master.");
} },
}; };
// Load commands from middleware // Load commands from middleware
@ -1104,9 +1105,11 @@ middleware.forEach((middlewareO) => {
Object.keys(middlewareO.commands).forEach((command) => { Object.keys(middlewareO.commands).forEach((command) => {
const prevCommand = commands[command]; const prevCommand = commands[command];
if (prevCommand) { if (prevCommand) {
commands[command] = (args, log) => middlewareO.commands[command](args, log, prevCommand); commands[command] = (args, log) =>
middlewareO.commands[command](args, log, prevCommand);
} else { } else {
commands[command] = (args, log) => middlewareO.commands[command](args, log, () => {}); commands[command] = (args, log) =>
middlewareO.commands[command](args, log, () => {});
} }
}); });
} }
@ -1259,10 +1262,14 @@ process.messageEventListeners.push((worker, serverconsole) => {
}; };
}); });
let isWorkerHungUpBuff = true;
function msgListener(message) { function msgListener(message) {
if (message == "\x12END") { if (message == "\x12END") {
for (let i = 0; i < Object.keys(cluster.workers).length; i++) { for (let i = 0; i < Object.keys(cluster.workers).length; i++) {
cluster.workers[Object.keys(cluster.workers)[i]].removeAllListeners("message"); cluster.workers[Object.keys(cluster.workers)[i]].removeAllListeners(
"message",
);
addListenersToWorker(cluster.workers[Object.keys(cluster.workers)[i]]); addListenersToWorker(cluster.workers[Object.keys(cluster.workers)[i]]);
} }
} }
@ -1270,6 +1277,10 @@ function msgListener(message) {
// Do nothing // Do nothing
} else if (message == "\x12CLOSE") { } else if (message == "\x12CLOSE") {
closedMaster = true; closedMaster = true;
} else if (message == "\x12KILLOK") {
if (typeof isWorkerHungUpBuff != "undefined") isWorkerHungUpBuff = false;
} else if (message == "\x12KILLTERMMSG") {
serverconsole.locmessage("Terminating unused worker process...");
} else if (message[0] == "\x12") { } else if (message[0] == "\x12") {
console.log("RECEIVED CONTROL MESSAGE: " + message.substr(1)); console.log("RECEIVED CONTROL MESSAGE: " + message.substr(1));
} else { } else {
@ -1667,37 +1678,44 @@ function start(init) {
process.send("\x12SAVEERR" + err.message); process.send("\x12SAVEERR" + err.message);
} }
process.send("\x12END"); process.send("\x12END");
} else if (line == "\x14KILLPING") { }*/ else if (line == "\x14KILLPING") {
if (!reallyExiting) { //if (!reallyExiting) {
process.send("\x12KILLOK"); process.send("\x12KILLOK");
process.send("\x12END"); process.send("\x12END");
} //}
// Refuse to send, when it's really exiting. Main process will treat the worker as hung up anyway... // Refuse to send, when it's really exiting. Main process will treat the worker as hung up anyway...
} else if (line == "\x14PINGPING") { } /* else if (line == "\x14PINGPING") {
if (!reallyExiting) { if (!reallyExiting) {
process.send("\x12PINGOK"); process.send("\x12PINGOK");
process.send("\x12END"); process.send("\x12END");
} }
// Refuse to send, when it's really exiting. Main process will treat the worker as hung up anyway... // Refuse to send, when it's really exiting. Main process will treat the worker as hung up anyway...
} else if (line == "\x14KILLREQ") { }*/ else if (line == "\x14KILLREQ") {
if (reqcounter - reqcounterKillReq < 2) { if (process.reqcounter - reqcounterKillReq < 2) {
process.send("\x12KILLTERMMSG"); process.send("\x12KILLTERMMSG");
process.nextTick(commands.stop); process.nextTick(() => {
commands.stop([], () => {});
});
} else { } else {
reqcounterKillReq = reqcounter; reqcounterKillReq = process.reqcounter;
} }
}*/ else if (commands[line.split(" ")[0]] !== undefined && commands[line.split(" ")[0]] !== null) { } else if (
commands[line.split(" ")[0]] !== undefined &&
commands[line.split(" ")[0]] !== null
) {
var argss = line.split(" "); var argss = line.split(" ");
var command = argss.shift(); var command = argss.shift();
commands[command](argss, (msg) => process.send(msg)); commands[command](argss, (msg) => process.send(msg));
process.send("\x12END"); process.send("\x12END");
} else { } else {
process.send("Unrecognized command \"" + line.split(" ")[0] + "\"."); process.send('Unrecognized command "' + line.split(" ")[0] + '".');
process.send("\x12END"); process.send("\x12END");
} }
} catch (err) { } catch (err) {
if (line != "") { if (line != "") {
process.send("Can't execute command \"" + line.split(" ")[0] + "\"."); process.send(
"Can't execute command \"" + line.split(" ")[0] + '".',
);
process.send("\x12END"); process.send("\x12END");
} }
} }
@ -1706,7 +1724,7 @@ function start(init) {
const rla = readline.createInterface({ const rla = readline.createInterface({
input: process.stdin, input: process.stdin,
output: process.stdout, output: process.stdout,
prompt: "" prompt: "",
}); });
rla.prompt(); rla.prompt();
rla.on("line", function (line) { rla.on("line", function (line) {
@ -1716,8 +1734,10 @@ function start(init) {
if (line != "") { if (line != "") {
if (cluster.isPrimary !== undefined) { if (cluster.isPrimary !== undefined) {
var allWorkers = Object.keys(cluster.workers); var allWorkers = Object.keys(cluster.workers);
if (command == "block") commands.block(argss, serverconsole.climessage); if (command == "block")
if (command == "unblock") commands.unblock(argss, serverconsole.climessage); commands.block(argss, serverconsole.climessage);
if (command == "unblock")
commands.unblock(argss, serverconsole.climessage);
if (command == "restart") { if (command == "restart") {
var stopError = false; var stopError = false;
exiting = true; exiting = true;
@ -1730,7 +1750,10 @@ function start(init) {
stopError = true; stopError = true;
} }
} }
if (stopError) serverconsole.climessage("Some " + name + " workers might not be stopped."); if (stopError)
serverconsole.climessage(
"Some " + name + " workers might not be stopped.",
);
SVRJSInitialized = false; SVRJSInitialized = false;
closedMaster = true; closedMaster = true;
@ -1758,7 +1781,9 @@ function start(init) {
cluster.workers[clusterID].removeAllListeners("message"); cluster.workers[clusterID].removeAllListeners("message");
addListenersToWorker(cluster.workers[clusterID]); addListenersToWorker(cluster.workers[clusterID]);
} }
serverconsole.climessage("Can't run command \"" + command + "\"."); serverconsole.climessage(
"Can't run command \"" + command + '".',
);
} }
}); });
if (command == "stop") { if (command == "stop") {
@ -1770,7 +1795,9 @@ function start(init) {
try { try {
commands[command](argss, serverconsole.climessage); commands[command](argss, serverconsole.climessage);
} catch (err) { } catch (err) {
serverconsole.climessage("Unrecognized command \"" + command + "\"."); serverconsole.climessage(
'Unrecognized command "' + command + '".',
);
} }
} }
} }
@ -1939,22 +1966,24 @@ function start(init) {
} }
}, 4550); }, 4550);
/*
// Termination of unused good workers // Termination of unused good workers
if (!disableUnusedWorkerTermination && cluster.isPrimary !== undefined) { if (
!process.serverConfig.disableUnusedWorkerTermination &&
cluster.isPrimary !== undefined
) {
setTimeout(function () { setTimeout(function () {
setInterval(function () { setInterval(function () {
if (!closedMaster && !exiting) { if (!closedMaster && !exiting) {
var allWorkers = Object.keys(cluster.workers); const allWorkers = Object.keys(cluster.workers);
var minWorkers = 0; let minWorkers = 0;
minWorkers = Math.ceil(workersToFork * 0.625); minWorkers = Math.ceil(workersToFork * 0.625);
if (minWorkers < 2) minWorkers = 2; if (minWorkers < 2) minWorkers = 2;
if (minWorkers > 12) minWorkers = 12; if (minWorkers > 12) minWorkers = 12;
var goodWorkers = []; let goodWorkers = [];
function checkWorker(callback, _id) { const checkWorker = (callback, _id) => {
if (typeof _id === "undefined") _id = 0; if (typeof _id === "undefined") _id = 0;
if (_id >= allWorkers.length) { if (_id >= allWorkers.length) {
callback(); callback();
@ -1963,7 +1992,10 @@ function start(init) {
try { try {
if (cluster.workers[allWorkers[_id]]) { if (cluster.workers[allWorkers[_id]]) {
isWorkerHungUpBuff = true; isWorkerHungUpBuff = true;
cluster.workers[allWorkers[_id]].on("message", msgListener); cluster.workers[allWorkers[_id]].on(
"message",
msgListener,
);
cluster.workers[allWorkers[_id]].send("\x14KILLPING"); cluster.workers[allWorkers[_id]].send("\x14KILLPING");
setTimeout(function () { setTimeout(function () {
if (isWorkerHungUpBuff) { if (isWorkerHungUpBuff) {
@ -1978,37 +2010,45 @@ function start(init) {
} }
} catch (err) { } catch (err) {
if (cluster.workers[allWorkers[_id]]) { if (cluster.workers[allWorkers[_id]]) {
cluster.workers[allWorkers[_id]].removeAllListeners("message"); cluster.workers[allWorkers[_id]].removeAllListeners(
cluster.workers[allWorkers[_id]].on("message", bruteForceListenerWrapper(cluster.workers[allWorkers[_id]])); "message",
cluster.workers[allWorkers[_id]].on("message", listenConnListener); );
addListenersToWorker(cluster.workers[allWorkers[_id]]);
} }
checkWorker(callback, _id + 1); checkWorker(callback, _id + 1);
} }
} };
checkWorker(function () { checkWorker(() => {
if (goodWorkers.length > minWorkers) { if (goodWorkers.length > minWorkers) {
var wN = Math.floor(Math.random() * goodWorkers.length); const wN = Math.floor(Math.random() * goodWorkers.length);
if (wN == goodWorkers.length) return; if (wN == goodWorkers.length) return;
try { try {
if (cluster.workers[goodWorkers[wN]]) { if (cluster.workers[goodWorkers[wN]]) {
isWorkerHungUpBuff = true; isWorkerHungUpBuff = true;
cluster.workers[goodWorkers[wN]].on("message", msgListener); cluster.workers[goodWorkers[wN]].on(
"message",
msgListener,
);
cluster.workers[goodWorkers[wN]].send("\x14KILLREQ"); cluster.workers[goodWorkers[wN]].send("\x14KILLREQ");
} }
} catch (err) { } catch (err) {
if (cluster.workers[goodWorkers[wN]]) { if (cluster.workers[goodWorkers[wN]]) {
cluster.workers[goodWorkers[wN]].removeAllListeners("message"); cluster.workers[goodWorkers[wN]].removeAllListeners(
cluster.workers[goodWorkers[wN]].on("message", bruteForceListenerWrapper(cluster.workers[goodWorkers[wN]])); "message",
cluster.workers[goodWorkers[wN]].on("message", listenConnListener); );
addListenersToWorker(cluster.workers[goodWorkers[wN]]);
} }
serverconsole.locwarnmessage("There was a problem while terminating unused worker process. Reason: " + err.message); serverconsole.locwarnmessage(
"There was a problem while terminating unused worker process. Reason: " +
err.message,
);
} }
} }
}); });
} }
}, 300000); }, 300000);
}, 2000); }, 2000);
}*/ }
} }
} }
} }