Added process.singleThreaded flag.

This commit is contained in:
Dorian Niemiec 2024-08-24 16:54:28 +02:00
parent a3e4ee2328
commit 7a658613fe
2 changed files with 204 additions and 198 deletions

View file

@ -1,6 +1,5 @@
const http = require("http");
const fs = require("fs");
const cluster = require("./utils/clusterBunShim.js"); // Cluster module with shim for Bun
//const generateErrorStack = require("./utils/generateErrorStack.js");
const getOS = require("./utils/getOS.js");
const svrjsInfo = require("../svrjs.json");
@ -20,6 +19,10 @@ if (!fs.existsSync(__dirname + "/log")) fs.mkdirSync(__dirname + "/log");
if (!fs.existsSync(__dirname + "/mods")) fs.mkdirSync(__dirname + "/mods");
if (!fs.existsSync(__dirname + "/temp")) fs.mkdirSync(__dirname + "/temp");
// TODO: process.singleThreaded flag
process.singleThreaded = true;
const cluster = require("./utils/clusterBunShim.js"); // Cluster module with shim for Bun
const serverconsoleConstructor = require("./utils/serverconsole.js");
let inspectorURL = undefined;

View file

@ -3,228 +3,231 @@ const os = require("os");
const path = require("path");
let cluster = {};
try {
// Import cluster module
cluster = require("cluster");
} catch (err) {
// Clustering is not supported!
}
// Cluster & IPC shim for Bun
if (!process.singleThreaded) {
try {
// Import cluster module
cluster = require("cluster");
} catch (err) {
// Clustering is not supported!
}
cluster.bunShim = function () {
cluster.isMaster = !process.env.NODE_UNIQUE_ID;
cluster.isPrimary = cluster.isMaster;
cluster.isWorker = !cluster.isMaster;
cluster.__shimmed__ = true;
// Cluster & IPC shim for Bun
if (cluster.isWorker) {
// Shim the cluster.worker object for worker processes
cluster.worker = {
id: parseInt(process.env.NODE_UNIQUE_ID),
process: process,
isDead: function () {
return false;
},
send: function (message, b, c, d) {
process.send(message, b, c, d);
},
};
cluster.bunShim = function () {
cluster.isMaster = !process.env.NODE_UNIQUE_ID;
cluster.isPrimary = cluster.isMaster;
cluster.isWorker = !cluster.isMaster;
cluster.__shimmed__ = true;
if (!process.send) {
// Shim the process.send function for worker processes
if (cluster.isWorker) {
// Shim the cluster.worker object for worker processes
cluster.worker = {
id: parseInt(process.env.NODE_UNIQUE_ID),
process: process,
isDead: function () {
return false;
},
send: function (message, b, c, d) {
process.send(message, b, c, d);
},
};
// Create a fake IPC server to receive messages
let fakeIPCServer = net.createServer(function (socket) {
let receivedData = "";
if (!process.send) {
// Shim the process.send function for worker processes
socket.on("data", function (data) {
receivedData += data.toString();
// Create a fake IPC server to receive messages
let fakeIPCServer = net.createServer(function (socket) {
let receivedData = "";
socket.on("data", function (data) {
receivedData += data.toString();
});
socket.on("end", function () {
process.emit("message", receivedData);
});
});
socket.on("end", function () {
process.emit("message", receivedData);
});
});
fakeIPCServer.listen(
os.platform() === "win32"
? path.join(
"\\\\?\\pipe",
__dirname,
"temp/.W" + process.pid + ".ipc",
)
: __dirname + "/temp/.W" + process.pid + ".ipc",
);
process.send = function (message) {
// Create a fake IPC connection to send messages
let fakeIPCConnection = net.createConnection(
fakeIPCServer.listen(
os.platform() === "win32"
? path.join(
"\\\\?\\pipe",
__dirname,
"temp/.P" + process.pid + ".ipc",
"temp/.W" + process.pid + ".ipc",
)
: __dirname + "/temp/.P" + process.pid + ".ipc",
function () {
fakeIPCConnection.end(message);
},
: __dirname + "/temp/.W" + process.pid + ".ipc",
);
};
process.removeFakeIPC = function () {
// Close IPC server
process.send = function () {};
fakeIPCServer.close();
};
}
}
// Custom implementation for cluster.fork()
cluster._workersCounter = 1;
cluster.workers = {};
cluster.fork = function (env) {
const child_process = require("child_process");
let newEnvironment = Object.assign(env ? env : process.env);
newEnvironment.NODE_UNIQUE_ID = cluster._workersCounter;
let newArguments = Object.assign(process.argv);
let command = newArguments.shift();
let newWorker = child_process.spawn(command, newArguments, {
env: newEnvironment,
stdio: ["inherit", "inherit", "inherit", "ipc"],
});
newWorker.process = newWorker;
newWorker.isDead = function () {
return newWorker.exitCode !== null || newWorker.killed;
};
newWorker.id = newEnvironment.NODE_UNIQUE_ID;
function checkSendImplementation(worker) {
let sendImplemented = true;
if (
!(
process.versions &&
process.versions.bun &&
process.versions.bun[0] != "0"
)
) {
if (!worker.send) {
sendImplemented = false;
}
let oldLog = console.log;
console.log = function (a, b, c, d, e, f) {
if (
a == "ChildProcess.prototype.send() - Sorry! Not implemented yet"
) {
throw new Error("NOT IMPLEMENTED");
} else {
oldLog(a, b, c, d, e, f);
}
};
try {
worker.send(undefined);
} catch (err) {
if (err.message === "NOT IMPLEMENTED") {
sendImplemented = false;
}
console.log(err);
}
console.log = oldLog;
}
return sendImplemented;
}
if (!checkSendImplementation(newWorker)) {
// Create a fake IPC server for worker process to receive messages
let fakeWorkerIPCServer = net.createServer(function (socket) {
let receivedData = "";
socket.on("data", function (data) {
receivedData += data.toString();
});
socket.on("end", function () {
newWorker.emit("message", receivedData);
});
});
fakeWorkerIPCServer.listen(
os.platform() === "win32"
? path.join(
"\\\\?\\pipe",
__dirname,
"temp/.P" + newWorker.process.pid + ".ipc",
)
: __dirname + "/temp/.P" + newWorker.process.pid + ".ipc",
);
// Cleanup when worker process exits
newWorker.on("exit", function () {
fakeWorkerIPCServer.close();
delete cluster.workers[newWorker.id];
});
newWorker.send = function (
message,
fakeParam2,
fakeParam3,
fakeParam4,
tries,
) {
if (!tries) tries = 0;
try {
// Create a fake IPC connection to send messages to worker process
let fakeWorkerIPCConnection = net.createConnection(
process.send = function (message) {
// Create a fake IPC connection to send messages
let fakeIPCConnection = net.createConnection(
os.platform() === "win32"
? path.join(
"\\\\?\\pipe",
__dirname,
"temp/.W" + newWorker.process.pid + ".ipc",
"temp/.P" + process.pid + ".ipc",
)
: __dirname + "/temp/.W" + newWorker.process.pid + ".ipc",
: __dirname + "/temp/.P" + process.pid + ".ipc",
function () {
fakeWorkerIPCConnection.end(message);
fakeIPCConnection.end(message);
},
);
} catch (err) {
if (tries > 50) throw err;
newWorker.send(
message,
fakeParam2,
fakeParam3,
fakeParam4,
tries + 1,
);
}
};
} else {
newWorker.on("exit", function () {
delete cluster.workers[newWorker.id];
});
};
process.removeFakeIPC = function () {
// Close IPC server
process.send = function () {};
fakeIPCServer.close();
};
}
}
cluster.workers[newWorker.id] = newWorker;
cluster._workersCounter++;
return newWorker;
};
};
// Custom implementation for cluster.fork()
cluster._workersCounter = 1;
cluster.workers = {};
cluster.fork = function (env) {
const child_process = require("child_process");
let newEnvironment = Object.assign(env ? env : process.env);
newEnvironment.NODE_UNIQUE_ID = cluster._workersCounter;
let newArguments = Object.assign(process.argv);
let command = newArguments.shift();
let newWorker = child_process.spawn(command, newArguments, {
env: newEnvironment,
stdio: ["inherit", "inherit", "inherit", "ipc"],
});
if (
process.isBun &&
(cluster.isMaster === undefined ||
(cluster.isMaster && process.env.NODE_UNIQUE_ID))
) {
cluster.bunShim();
newWorker.process = newWorker;
newWorker.isDead = function () {
return newWorker.exitCode !== null || newWorker.killed;
};
newWorker.id = newEnvironment.NODE_UNIQUE_ID;
function checkSendImplementation(worker) {
let sendImplemented = true;
if (
!(
process.versions &&
process.versions.bun &&
process.versions.bun[0] != "0"
)
) {
if (!worker.send) {
sendImplemented = false;
}
let oldLog = console.log;
console.log = function (a, b, c, d, e, f) {
if (
a == "ChildProcess.prototype.send() - Sorry! Not implemented yet"
) {
throw new Error("NOT IMPLEMENTED");
} else {
oldLog(a, b, c, d, e, f);
}
};
try {
worker.send(undefined);
} catch (err) {
if (err.message === "NOT IMPLEMENTED") {
sendImplemented = false;
}
console.log(err);
}
console.log = oldLog;
}
return sendImplemented;
}
if (!checkSendImplementation(newWorker)) {
// Create a fake IPC server for worker process to receive messages
let fakeWorkerIPCServer = net.createServer(function (socket) {
let receivedData = "";
socket.on("data", function (data) {
receivedData += data.toString();
});
socket.on("end", function () {
newWorker.emit("message", receivedData);
});
});
fakeWorkerIPCServer.listen(
os.platform() === "win32"
? path.join(
"\\\\?\\pipe",
__dirname,
"temp/.P" + newWorker.process.pid + ".ipc",
)
: __dirname + "/temp/.P" + newWorker.process.pid + ".ipc",
);
// Cleanup when worker process exits
newWorker.on("exit", function () {
fakeWorkerIPCServer.close();
delete cluster.workers[newWorker.id];
});
newWorker.send = function (
message,
fakeParam2,
fakeParam3,
fakeParam4,
tries,
) {
if (!tries) tries = 0;
try {
// Create a fake IPC connection to send messages to worker process
let fakeWorkerIPCConnection = net.createConnection(
os.platform() === "win32"
? path.join(
"\\\\?\\pipe",
__dirname,
"temp/.W" + newWorker.process.pid + ".ipc",
)
: __dirname + "/temp/.W" + newWorker.process.pid + ".ipc",
function () {
fakeWorkerIPCConnection.end(message);
},
);
} catch (err) {
if (tries > 50) throw err;
newWorker.send(
message,
fakeParam2,
fakeParam3,
fakeParam4,
tries + 1,
);
}
};
} else {
newWorker.on("exit", function () {
delete cluster.workers[newWorker.id];
});
}
cluster.workers[newWorker.id] = newWorker;
cluster._workersCounter++;
return newWorker;
};
};
if (
process.isBun &&
(cluster.isMaster === undefined ||
(cluster.isMaster && process.env.NODE_UNIQUE_ID))
) {
cluster.bunShim();
}
// Shim cluster.isPrimary field
if (cluster.isPrimary === undefined && cluster.isMaster !== undefined)
cluster.isPrimary = cluster.isMaster;
}
// Shim cluster.isPrimary field
if (cluster.isPrimary === undefined && cluster.isMaster !== undefined)
cluster.isPrimary = cluster.isMaster;
module.exports = cluster;