Update cluster shim for Bun

This commit is contained in:
Dorian Niemiec 2024-08-27 13:59:28 +02:00
parent a66c296206
commit 756d66c654

View file

@ -8,13 +8,14 @@ if (!process.singleThreaded) {
try { try {
// Import cluster module // Import cluster module
cluster = require("cluster"); cluster = require("cluster");
// eslint-disable-next-line no-unused-vars
} catch (err) { } catch (err) {
// Clustering is not supported! // Clustering is not supported!
} }
// Cluster & IPC shim for Bun // Cluster & IPC shim for Bun
cluster.bunShim = function () { cluster.bunShim = () => {
cluster.isMaster = !process.env.NODE_UNIQUE_ID; cluster.isMaster = !process.env.NODE_UNIQUE_ID;
cluster.isPrimary = cluster.isMaster; cluster.isPrimary = cluster.isMaster;
cluster.isWorker = !cluster.isMaster; cluster.isWorker = !cluster.isMaster;
@ -25,11 +26,11 @@ if (!process.singleThreaded) {
cluster.worker = { cluster.worker = {
id: parseInt(process.env.NODE_UNIQUE_ID), id: parseInt(process.env.NODE_UNIQUE_ID),
process: process, process: process,
isDead: function () { isDead: () => {
return false; return false;
}, },
send: function (message, b, c, d) { send: (message, ...params) => {
process.send(message, b, c, d); process.send(message, ...params);
}, },
}; };
@ -37,14 +38,14 @@ if (!process.singleThreaded) {
// Shim the process.send function for worker processes // Shim the process.send function for worker processes
// Create a fake IPC server to receive messages // Create a fake IPC server to receive messages
let fakeIPCServer = net.createServer(function (socket) { let fakeIPCServer = net.createServer((socket) => {
let receivedData = ""; let receivedData = "";
socket.on("data", function (data) { socket.on("data", (data) => {
receivedData += data.toString(); receivedData += data.toString();
}); });
socket.on("end", function () { socket.on("end", () => {
process.emit("message", receivedData); process.emit("message", receivedData);
}); });
}); });
@ -58,7 +59,7 @@ if (!process.singleThreaded) {
: process.dirname + "/temp/.W" + process.pid + ".ipc", : process.dirname + "/temp/.W" + process.pid + ".ipc",
); );
process.send = function (message) { process.send = (message) => {
// Create a fake IPC connection to send messages // Create a fake IPC connection to send messages
let fakeIPCConnection = net.createConnection( let fakeIPCConnection = net.createConnection(
os.platform() === "win32" os.platform() === "win32"
@ -68,15 +69,15 @@ if (!process.singleThreaded) {
"temp/.P" + process.pid + ".ipc", "temp/.P" + process.pid + ".ipc",
) )
: process.dirname + "/temp/.P" + process.pid + ".ipc", : process.dirname + "/temp/.P" + process.pid + ".ipc",
function () { () => {
fakeIPCConnection.end(message); fakeIPCConnection.end(message);
}, },
); );
}; };
process.removeFakeIPC = function () { process.removeFakeIPC = () => {
// Close IPC server // Close IPC server
process.send = function () {}; process.send = () => {};
fakeIPCServer.close(); fakeIPCServer.close();
}; };
} }
@ -85,11 +86,11 @@ if (!process.singleThreaded) {
// Custom implementation for cluster.fork() // Custom implementation for cluster.fork()
cluster._workersCounter = 1; cluster._workersCounter = 1;
cluster.workers = {}; cluster.workers = {};
cluster.fork = function (env) { cluster.fork = (env) => {
const child_process = require("child_process"); const child_process = require("child_process");
let newEnvironment = Object.assign(env ? env : process.env); let newEnvironment = Object.assign({}, env ? env : process.env);
newEnvironment.NODE_UNIQUE_ID = cluster._workersCounter; newEnvironment.NODE_UNIQUE_ID = cluster._workersCounter;
let newArguments = Object.assign(process.argv); let newArguments = Object.assign([], process.argv);
let command = newArguments.shift(); let command = newArguments.shift();
let newWorker = child_process.spawn(command, newArguments, { let newWorker = child_process.spawn(command, newArguments, {
env: newEnvironment, env: newEnvironment,
@ -97,7 +98,7 @@ if (!process.singleThreaded) {
}); });
newWorker.process = newWorker; newWorker.process = newWorker;
newWorker.isDead = function () { newWorker.isDead = () => {
return newWorker.exitCode !== null || newWorker.killed; return newWorker.exitCode !== null || newWorker.killed;
}; };
newWorker.id = newEnvironment.NODE_UNIQUE_ID; newWorker.id = newEnvironment.NODE_UNIQUE_ID;
@ -117,13 +118,14 @@ if (!process.singleThreaded) {
} }
let oldLog = console.log; let oldLog = console.log;
console.log = function (a, b, c, d, e, f) { console.log = (...params) => {
if ( if (
a == "ChildProcess.prototype.send() - Sorry! Not implemented yet" params[0] ==
"ChildProcess.prototype.send() - Sorry! Not implemented yet"
) { ) {
throw new Error("NOT IMPLEMENTED"); throw new Error("NOT IMPLEMENTED");
} else { } else {
oldLog(a, b, c, d, e, f); oldLog(...params);
} }
}; };
@ -144,14 +146,14 @@ if (!process.singleThreaded) {
if (!checkSendImplementation(newWorker)) { if (!checkSendImplementation(newWorker)) {
// Create a fake IPC server for worker process to receive messages // Create a fake IPC server for worker process to receive messages
let fakeWorkerIPCServer = net.createServer(function (socket) { let fakeWorkerIPCServer = net.createServer((socket) => {
let receivedData = ""; let receivedData = "";
socket.on("data", function (data) { socket.on("data", (data) => {
receivedData += data.toString(); receivedData += data.toString();
}); });
socket.on("end", function () { socket.on("end", () => {
newWorker.emit("message", receivedData); newWorker.emit("message", receivedData);
}); });
}); });
@ -166,7 +168,7 @@ if (!process.singleThreaded) {
); );
// Cleanup when worker process exits // Cleanup when worker process exits
newWorker.on("exit", function () { newWorker.on("exit", () => {
fakeWorkerIPCServer.close(); fakeWorkerIPCServer.close();
delete cluster.workers[newWorker.id]; delete cluster.workers[newWorker.id];
}); });
@ -190,7 +192,7 @@ if (!process.singleThreaded) {
"temp/.W" + newWorker.process.pid + ".ipc", "temp/.W" + newWorker.process.pid + ".ipc",
) )
: process.dirname + "/temp/.W" + newWorker.process.pid + ".ipc", : process.dirname + "/temp/.W" + newWorker.process.pid + ".ipc",
function () { () => {
fakeWorkerIPCConnection.end(message); fakeWorkerIPCConnection.end(message);
}, },
); );
@ -206,7 +208,7 @@ if (!process.singleThreaded) {
} }
}; };
} else { } else {
newWorker.on("exit", function () { newWorker.on("exit", () => {
delete cluster.workers[newWorker.id]; delete cluster.workers[newWorker.id];
}); });
} }