From 662528847a666d43ce82c3ab5f3abd23a47e13cb Mon Sep 17 00:00:00 2001 From: Dorian Niemiec Date: Mon, 26 Aug 2024 06:11:19 +0200 Subject: [PATCH] Add termination of unused workers. --- src/handlers/serverErrorHandler.js | 30 +++---- src/index.js | 128 +++++++++++++++++++---------- 2 files changed, 99 insertions(+), 59 deletions(-) diff --git a/src/handlers/serverErrorHandler.js b/src/handlers/serverErrorHandler.js index 77432a2..ceb60c8 100644 --- a/src/handlers/serverErrorHandler.js +++ b/src/handlers/serverErrorHandler.js @@ -50,22 +50,22 @@ serverErrorHandler.resetAttempts = (isRedirect) => { process.messageEventListeners.push((worker, serverconsole) => { return (message) => { - if (worker.id == Object.keys(cluster.workers)[0]) { - if (message.indexOf("\x12ERRLIST") == 0) { - const tries = parseInt(message.substring(8, 9)); - const errCode = message.substring(9); - serverconsole.locerrmessage( - serverErrorDescs[errCode] - ? serverErrorDescs[errCode] - : serverErrorDescs["UNKNOWN"], - ); - serverconsole.locmessage(tries + " attempts left."); - } - if (message.length >= 9 && message.indexOf("\x12ERRCRASH") == 0) { - const errno = os.constants.errno[message.substring(9)]; - process.exit(errno !== undefined ? errno : 1); - } + if (worker.id == Object.keys(cluster.workers)[0]) { + if (message.indexOf("\x12ERRLIST") == 0) { + const tries = parseInt(message.substring(8, 9)); + const errCode = message.substring(9); + serverconsole.locerrmessage( + serverErrorDescs[errCode] + ? serverErrorDescs[errCode] + : serverErrorDescs["UNKNOWN"], + ); + serverconsole.locmessage(tries + " attempts left."); } + if (message.length >= 9 && message.indexOf("\x12ERRCRASH") == 0) { + const errno = os.constants.errno[message.substring(9)]; + process.exit(errno !== undefined ? errno : 1); + } + } }; }); diff --git a/src/index.js b/src/index.js index c3de588..e6056a6 100644 --- a/src/index.js +++ b/src/index.js @@ -1093,9 +1093,10 @@ let commands = { } }, restart: (args, log) => { - if (cluster.isPrimary === undefined) log("This command is not supported on single-threaded " + name + "."); + if (cluster.isPrimary === undefined) + log("This command is not supported on single-threaded " + name + "."); else log("This command need to be run in " + name + " master."); - } + }, }; // Load commands from middleware @@ -1104,9 +1105,11 @@ middleware.forEach((middlewareO) => { Object.keys(middlewareO.commands).forEach((command) => { const prevCommand = commands[command]; if (prevCommand) { - commands[command] = (args, log) => middlewareO.commands[command](args, log, prevCommand); + commands[command] = (args, log) => + middlewareO.commands[command](args, log, prevCommand); } else { - commands[command] = (args, log) => middlewareO.commands[command](args, log, () => {}); + commands[command] = (args, log) => + middlewareO.commands[command](args, log, () => {}); } }); } @@ -1259,10 +1262,14 @@ process.messageEventListeners.push((worker, serverconsole) => { }; }); +let isWorkerHungUpBuff = true; + function msgListener(message) { if (message == "\x12END") { for (let i = 0; i < Object.keys(cluster.workers).length; i++) { - cluster.workers[Object.keys(cluster.workers)[i]].removeAllListeners("message"); + cluster.workers[Object.keys(cluster.workers)[i]].removeAllListeners( + "message", + ); addListenersToWorker(cluster.workers[Object.keys(cluster.workers)[i]]); } } @@ -1270,6 +1277,10 @@ function msgListener(message) { // Do nothing } else if (message == "\x12CLOSE") { closedMaster = true; + } else if (message == "\x12KILLOK") { + if (typeof isWorkerHungUpBuff != "undefined") isWorkerHungUpBuff = false; + } else if (message == "\x12KILLTERMMSG") { + serverconsole.locmessage("Terminating unused worker process..."); } else if (message[0] == "\x12") { console.log("RECEIVED CONTROL MESSAGE: " + message.substr(1)); } else { @@ -1667,37 +1678,44 @@ function start(init) { process.send("\x12SAVEERR" + err.message); } process.send("\x12END"); - } else if (line == "\x14KILLPING") { - if (!reallyExiting) { - process.send("\x12KILLOK"); - process.send("\x12END"); - } + }*/ else if (line == "\x14KILLPING") { + //if (!reallyExiting) { + process.send("\x12KILLOK"); + process.send("\x12END"); + //} // Refuse to send, when it's really exiting. Main process will treat the worker as hung up anyway... - } else if (line == "\x14PINGPING") { + } /* else if (line == "\x14PINGPING") { if (!reallyExiting) { process.send("\x12PINGOK"); process.send("\x12END"); } // Refuse to send, when it's really exiting. Main process will treat the worker as hung up anyway... - } else if (line == "\x14KILLREQ") { - if (reqcounter - reqcounterKillReq < 2) { + }*/ else if (line == "\x14KILLREQ") { + if (process.reqcounter - reqcounterKillReq < 2) { process.send("\x12KILLTERMMSG"); - process.nextTick(commands.stop); + process.nextTick(() => { + commands.stop([], () => {}); + }); } else { - reqcounterKillReq = reqcounter; + reqcounterKillReq = process.reqcounter; } - }*/ else if (commands[line.split(" ")[0]] !== undefined && commands[line.split(" ")[0]] !== null) { + } else if ( + commands[line.split(" ")[0]] !== undefined && + commands[line.split(" ")[0]] !== null + ) { var argss = line.split(" "); var command = argss.shift(); commands[command](argss, (msg) => process.send(msg)); process.send("\x12END"); } else { - process.send("Unrecognized command \"" + line.split(" ")[0] + "\"."); + process.send('Unrecognized command "' + line.split(" ")[0] + '".'); process.send("\x12END"); } } catch (err) { if (line != "") { - process.send("Can't execute command \"" + line.split(" ")[0] + "\"."); + process.send( + "Can't execute command \"" + line.split(" ")[0] + '".', + ); process.send("\x12END"); } } @@ -1706,7 +1724,7 @@ function start(init) { const rla = readline.createInterface({ input: process.stdin, output: process.stdout, - prompt: "" + prompt: "", }); rla.prompt(); rla.on("line", function (line) { @@ -1716,8 +1734,10 @@ function start(init) { if (line != "") { if (cluster.isPrimary !== undefined) { var allWorkers = Object.keys(cluster.workers); - if (command == "block") commands.block(argss, serverconsole.climessage); - if (command == "unblock") commands.unblock(argss, serverconsole.climessage); + if (command == "block") + commands.block(argss, serverconsole.climessage); + if (command == "unblock") + commands.unblock(argss, serverconsole.climessage); if (command == "restart") { var stopError = false; exiting = true; @@ -1730,7 +1750,10 @@ function start(init) { stopError = true; } } - if (stopError) serverconsole.climessage("Some " + name + " workers might not be stopped."); + if (stopError) + serverconsole.climessage( + "Some " + name + " workers might not be stopped.", + ); SVRJSInitialized = false; closedMaster = true; @@ -1747,7 +1770,7 @@ function start(init) { exiting = true; allWorkers = Object.keys(cluster.workers); } - allWorkers.forEach((clusterID)=> { + allWorkers.forEach((clusterID) => { try { if (cluster.workers[clusterID]) { cluster.workers[clusterID].on("message", msgListener); @@ -1758,7 +1781,9 @@ function start(init) { cluster.workers[clusterID].removeAllListeners("message"); addListenersToWorker(cluster.workers[clusterID]); } - serverconsole.climessage("Can't run command \"" + command + "\"."); + serverconsole.climessage( + "Can't run command \"" + command + '".', + ); } }); if (command == "stop") { @@ -1770,7 +1795,9 @@ function start(init) { try { commands[command](argss, serverconsole.climessage); } catch (err) { - serverconsole.climessage("Unrecognized command \"" + command + "\"."); + serverconsole.climessage( + 'Unrecognized command "' + command + '".', + ); } } } @@ -1939,22 +1966,24 @@ function start(init) { } }, 4550); - /* // Termination of unused good workers - if (!disableUnusedWorkerTermination && cluster.isPrimary !== undefined) { + if ( + !process.serverConfig.disableUnusedWorkerTermination && + cluster.isPrimary !== undefined + ) { setTimeout(function () { setInterval(function () { if (!closedMaster && !exiting) { - var allWorkers = Object.keys(cluster.workers); + const allWorkers = Object.keys(cluster.workers); - var minWorkers = 0; + let minWorkers = 0; minWorkers = Math.ceil(workersToFork * 0.625); if (minWorkers < 2) minWorkers = 2; if (minWorkers > 12) minWorkers = 12; - var goodWorkers = []; + let goodWorkers = []; - function checkWorker(callback, _id) { + const checkWorker = (callback, _id) => { if (typeof _id === "undefined") _id = 0; if (_id >= allWorkers.length) { callback(); @@ -1963,7 +1992,10 @@ function start(init) { try { if (cluster.workers[allWorkers[_id]]) { isWorkerHungUpBuff = true; - cluster.workers[allWorkers[_id]].on("message", msgListener); + cluster.workers[allWorkers[_id]].on( + "message", + msgListener, + ); cluster.workers[allWorkers[_id]].send("\x14KILLPING"); setTimeout(function () { if (isWorkerHungUpBuff) { @@ -1978,37 +2010,45 @@ function start(init) { } } catch (err) { if (cluster.workers[allWorkers[_id]]) { - cluster.workers[allWorkers[_id]].removeAllListeners("message"); - cluster.workers[allWorkers[_id]].on("message", bruteForceListenerWrapper(cluster.workers[allWorkers[_id]])); - cluster.workers[allWorkers[_id]].on("message", listenConnListener); + cluster.workers[allWorkers[_id]].removeAllListeners( + "message", + ); + addListenersToWorker(cluster.workers[allWorkers[_id]]); } checkWorker(callback, _id + 1); } - } - checkWorker(function () { + }; + checkWorker(() => { if (goodWorkers.length > minWorkers) { - var wN = Math.floor(Math.random() * goodWorkers.length); + const wN = Math.floor(Math.random() * goodWorkers.length); if (wN == goodWorkers.length) return; try { if (cluster.workers[goodWorkers[wN]]) { isWorkerHungUpBuff = true; - cluster.workers[goodWorkers[wN]].on("message", msgListener); + cluster.workers[goodWorkers[wN]].on( + "message", + msgListener, + ); cluster.workers[goodWorkers[wN]].send("\x14KILLREQ"); } } catch (err) { if (cluster.workers[goodWorkers[wN]]) { - cluster.workers[goodWorkers[wN]].removeAllListeners("message"); - cluster.workers[goodWorkers[wN]].on("message", bruteForceListenerWrapper(cluster.workers[goodWorkers[wN]])); - cluster.workers[goodWorkers[wN]].on("message", listenConnListener); + cluster.workers[goodWorkers[wN]].removeAllListeners( + "message", + ); + addListenersToWorker(cluster.workers[goodWorkers[wN]]); } - serverconsole.locwarnmessage("There was a problem while terminating unused worker process. Reason: " + err.message); + serverconsole.locwarnmessage( + "There was a problem while terminating unused worker process. Reason: " + + err.message, + ); } } }); } }, 300000); }, 2000); - }*/ + } } } }