From 7a658613fea1b972aa14ff408221efc4d26f57c9 Mon Sep 17 00:00:00 2001 From: Dorian Niemiec Date: Sat, 24 Aug 2024 16:54:28 +0200 Subject: [PATCH] Added process.singleThreaded flag. --- src/index.js | 5 +- src/utils/clusterBunShim.js | 397 ++++++++++++++++++------------------ 2 files changed, 204 insertions(+), 198 deletions(-) diff --git a/src/index.js b/src/index.js index a10740c..4732d0d 100644 --- a/src/index.js +++ b/src/index.js @@ -1,6 +1,5 @@ const http = require("http"); const fs = require("fs"); -const cluster = require("./utils/clusterBunShim.js"); // Cluster module with shim for Bun //const generateErrorStack = require("./utils/generateErrorStack.js"); const getOS = require("./utils/getOS.js"); const svrjsInfo = require("../svrjs.json"); @@ -20,6 +19,10 @@ if (!fs.existsSync(__dirname + "/log")) fs.mkdirSync(__dirname + "/log"); if (!fs.existsSync(__dirname + "/mods")) fs.mkdirSync(__dirname + "/mods"); if (!fs.existsSync(__dirname + "/temp")) fs.mkdirSync(__dirname + "/temp"); +// TODO: process.singleThreaded flag +process.singleThreaded = true; +const cluster = require("./utils/clusterBunShim.js"); // Cluster module with shim for Bun + const serverconsoleConstructor = require("./utils/serverconsole.js"); let inspectorURL = undefined; diff --git a/src/utils/clusterBunShim.js b/src/utils/clusterBunShim.js index a5162dc..c0254f2 100644 --- a/src/utils/clusterBunShim.js +++ b/src/utils/clusterBunShim.js @@ -3,228 +3,231 @@ const os = require("os"); const path = require("path"); let cluster = {}; -try { - // Import cluster module - cluster = require("cluster"); -} catch (err) { - // Clustering is not supported! -} -// Cluster & IPC shim for Bun +if (!process.singleThreaded) { + try { + // Import cluster module + cluster = require("cluster"); + } catch (err) { + // Clustering is not supported! + } -cluster.bunShim = function () { - cluster.isMaster = !process.env.NODE_UNIQUE_ID; - cluster.isPrimary = cluster.isMaster; - cluster.isWorker = !cluster.isMaster; - cluster.__shimmed__ = true; + // Cluster & IPC shim for Bun - if (cluster.isWorker) { - // Shim the cluster.worker object for worker processes - cluster.worker = { - id: parseInt(process.env.NODE_UNIQUE_ID), - process: process, - isDead: function () { - return false; - }, - send: function (message, b, c, d) { - process.send(message, b, c, d); - }, - }; + cluster.bunShim = function () { + cluster.isMaster = !process.env.NODE_UNIQUE_ID; + cluster.isPrimary = cluster.isMaster; + cluster.isWorker = !cluster.isMaster; + cluster.__shimmed__ = true; - if (!process.send) { - // Shim the process.send function for worker processes + if (cluster.isWorker) { + // Shim the cluster.worker object for worker processes + cluster.worker = { + id: parseInt(process.env.NODE_UNIQUE_ID), + process: process, + isDead: function () { + return false; + }, + send: function (message, b, c, d) { + process.send(message, b, c, d); + }, + }; - // Create a fake IPC server to receive messages - let fakeIPCServer = net.createServer(function (socket) { - let receivedData = ""; + if (!process.send) { + // Shim the process.send function for worker processes - socket.on("data", function (data) { - receivedData += data.toString(); + // Create a fake IPC server to receive messages + let fakeIPCServer = net.createServer(function (socket) { + let receivedData = ""; + + socket.on("data", function (data) { + receivedData += data.toString(); + }); + + socket.on("end", function () { + process.emit("message", receivedData); + }); }); - - socket.on("end", function () { - process.emit("message", receivedData); - }); - }); - fakeIPCServer.listen( - os.platform() === "win32" - ? path.join( - "\\\\?\\pipe", - __dirname, - "temp/.W" + process.pid + ".ipc", - ) - : __dirname + "/temp/.W" + process.pid + ".ipc", - ); - - process.send = function (message) { - // Create a fake IPC connection to send messages - let fakeIPCConnection = net.createConnection( + fakeIPCServer.listen( os.platform() === "win32" ? path.join( "\\\\?\\pipe", __dirname, - "temp/.P" + process.pid + ".ipc", + "temp/.W" + process.pid + ".ipc", ) - : __dirname + "/temp/.P" + process.pid + ".ipc", - function () { - fakeIPCConnection.end(message); - }, + : __dirname + "/temp/.W" + process.pid + ".ipc", ); - }; - process.removeFakeIPC = function () { - // Close IPC server - process.send = function () {}; - fakeIPCServer.close(); - }; - } - } - - // Custom implementation for cluster.fork() - cluster._workersCounter = 1; - cluster.workers = {}; - cluster.fork = function (env) { - const child_process = require("child_process"); - let newEnvironment = Object.assign(env ? env : process.env); - newEnvironment.NODE_UNIQUE_ID = cluster._workersCounter; - let newArguments = Object.assign(process.argv); - let command = newArguments.shift(); - let newWorker = child_process.spawn(command, newArguments, { - env: newEnvironment, - stdio: ["inherit", "inherit", "inherit", "ipc"], - }); - - newWorker.process = newWorker; - newWorker.isDead = function () { - return newWorker.exitCode !== null || newWorker.killed; - }; - newWorker.id = newEnvironment.NODE_UNIQUE_ID; - - function checkSendImplementation(worker) { - let sendImplemented = true; - - if ( - !( - process.versions && - process.versions.bun && - process.versions.bun[0] != "0" - ) - ) { - if (!worker.send) { - sendImplemented = false; - } - - let oldLog = console.log; - console.log = function (a, b, c, d, e, f) { - if ( - a == "ChildProcess.prototype.send() - Sorry! Not implemented yet" - ) { - throw new Error("NOT IMPLEMENTED"); - } else { - oldLog(a, b, c, d, e, f); - } - }; - - try { - worker.send(undefined); - } catch (err) { - if (err.message === "NOT IMPLEMENTED") { - sendImplemented = false; - } - console.log(err); - } - - console.log = oldLog; - } - - return sendImplemented; - } - - if (!checkSendImplementation(newWorker)) { - // Create a fake IPC server for worker process to receive messages - let fakeWorkerIPCServer = net.createServer(function (socket) { - let receivedData = ""; - - socket.on("data", function (data) { - receivedData += data.toString(); - }); - - socket.on("end", function () { - newWorker.emit("message", receivedData); - }); - }); - fakeWorkerIPCServer.listen( - os.platform() === "win32" - ? path.join( - "\\\\?\\pipe", - __dirname, - "temp/.P" + newWorker.process.pid + ".ipc", - ) - : __dirname + "/temp/.P" + newWorker.process.pid + ".ipc", - ); - - // Cleanup when worker process exits - newWorker.on("exit", function () { - fakeWorkerIPCServer.close(); - delete cluster.workers[newWorker.id]; - }); - - newWorker.send = function ( - message, - fakeParam2, - fakeParam3, - fakeParam4, - tries, - ) { - if (!tries) tries = 0; - - try { - // Create a fake IPC connection to send messages to worker process - let fakeWorkerIPCConnection = net.createConnection( + process.send = function (message) { + // Create a fake IPC connection to send messages + let fakeIPCConnection = net.createConnection( os.platform() === "win32" ? path.join( "\\\\?\\pipe", __dirname, - "temp/.W" + newWorker.process.pid + ".ipc", + "temp/.P" + process.pid + ".ipc", ) - : __dirname + "/temp/.W" + newWorker.process.pid + ".ipc", + : __dirname + "/temp/.P" + process.pid + ".ipc", function () { - fakeWorkerIPCConnection.end(message); + fakeIPCConnection.end(message); }, ); - } catch (err) { - if (tries > 50) throw err; - newWorker.send( - message, - fakeParam2, - fakeParam3, - fakeParam4, - tries + 1, - ); - } - }; - } else { - newWorker.on("exit", function () { - delete cluster.workers[newWorker.id]; - }); + }; + + process.removeFakeIPC = function () { + // Close IPC server + process.send = function () {}; + fakeIPCServer.close(); + }; + } } - cluster.workers[newWorker.id] = newWorker; - cluster._workersCounter++; - return newWorker; - }; -}; + // Custom implementation for cluster.fork() + cluster._workersCounter = 1; + cluster.workers = {}; + cluster.fork = function (env) { + const child_process = require("child_process"); + let newEnvironment = Object.assign(env ? env : process.env); + newEnvironment.NODE_UNIQUE_ID = cluster._workersCounter; + let newArguments = Object.assign(process.argv); + let command = newArguments.shift(); + let newWorker = child_process.spawn(command, newArguments, { + env: newEnvironment, + stdio: ["inherit", "inherit", "inherit", "ipc"], + }); -if ( - process.isBun && - (cluster.isMaster === undefined || - (cluster.isMaster && process.env.NODE_UNIQUE_ID)) -) { - cluster.bunShim(); + newWorker.process = newWorker; + newWorker.isDead = function () { + return newWorker.exitCode !== null || newWorker.killed; + }; + newWorker.id = newEnvironment.NODE_UNIQUE_ID; + + function checkSendImplementation(worker) { + let sendImplemented = true; + + if ( + !( + process.versions && + process.versions.bun && + process.versions.bun[0] != "0" + ) + ) { + if (!worker.send) { + sendImplemented = false; + } + + let oldLog = console.log; + console.log = function (a, b, c, d, e, f) { + if ( + a == "ChildProcess.prototype.send() - Sorry! Not implemented yet" + ) { + throw new Error("NOT IMPLEMENTED"); + } else { + oldLog(a, b, c, d, e, f); + } + }; + + try { + worker.send(undefined); + } catch (err) { + if (err.message === "NOT IMPLEMENTED") { + sendImplemented = false; + } + console.log(err); + } + + console.log = oldLog; + } + + return sendImplemented; + } + + if (!checkSendImplementation(newWorker)) { + // Create a fake IPC server for worker process to receive messages + let fakeWorkerIPCServer = net.createServer(function (socket) { + let receivedData = ""; + + socket.on("data", function (data) { + receivedData += data.toString(); + }); + + socket.on("end", function () { + newWorker.emit("message", receivedData); + }); + }); + fakeWorkerIPCServer.listen( + os.platform() === "win32" + ? path.join( + "\\\\?\\pipe", + __dirname, + "temp/.P" + newWorker.process.pid + ".ipc", + ) + : __dirname + "/temp/.P" + newWorker.process.pid + ".ipc", + ); + + // Cleanup when worker process exits + newWorker.on("exit", function () { + fakeWorkerIPCServer.close(); + delete cluster.workers[newWorker.id]; + }); + + newWorker.send = function ( + message, + fakeParam2, + fakeParam3, + fakeParam4, + tries, + ) { + if (!tries) tries = 0; + + try { + // Create a fake IPC connection to send messages to worker process + let fakeWorkerIPCConnection = net.createConnection( + os.platform() === "win32" + ? path.join( + "\\\\?\\pipe", + __dirname, + "temp/.W" + newWorker.process.pid + ".ipc", + ) + : __dirname + "/temp/.W" + newWorker.process.pid + ".ipc", + function () { + fakeWorkerIPCConnection.end(message); + }, + ); + } catch (err) { + if (tries > 50) throw err; + newWorker.send( + message, + fakeParam2, + fakeParam3, + fakeParam4, + tries + 1, + ); + } + }; + } else { + newWorker.on("exit", function () { + delete cluster.workers[newWorker.id]; + }); + } + + cluster.workers[newWorker.id] = newWorker; + cluster._workersCounter++; + return newWorker; + }; + }; + + if ( + process.isBun && + (cluster.isMaster === undefined || + (cluster.isMaster && process.env.NODE_UNIQUE_ID)) + ) { + cluster.bunShim(); + } + + // Shim cluster.isPrimary field + if (cluster.isPrimary === undefined && cluster.isMaster !== undefined) + cluster.isPrimary = cluster.isMaster; } -// Shim cluster.isPrimary field -if (cluster.isPrimary === undefined && cluster.isMaster !== undefined) - cluster.isPrimary = cluster.isMaster; - module.exports = cluster;