Requests are now no longer intermittently dropped under load.

This commit is contained in:
Dorian Niemiec 2024-02-21 21:07:01 +01:00
parent f3fb1f928a
commit ea8def69d9

View file

@ -130,6 +130,7 @@ function createFastCGIHandler(options) {
function fastCGISocketHandler(chunk) { function fastCGISocketHandler(chunk) {
var chunkIndex = 0; var chunkIndex = 0;
while (chunkIndex < chunk.length || (headerIndex == 8 && bodyIndex == packetBody.length && paddingLength == 0)) { while (chunkIndex < chunk.length || (headerIndex == 8 && bodyIndex == packetBody.length && paddingLength == 0)) {
if (headerIndex < 8) { if (headerIndex < 8) {
chunk.copy(packetHeader, headerIndex, chunkIndex, Math.min(chunk.length, chunkIndex + 8 - headerIndex)); chunk.copy(packetHeader, headerIndex, chunkIndex, Math.min(chunk.length, chunkIndex + 8 - headerIndex));
@ -167,7 +168,7 @@ function createFastCGIHandler(options) {
if (processedPacket.requestID != requestID) return; //Drop the packet if (processedPacket.requestID != requestID) return; //Drop the packet
if (processedPacket.type == STDOUT) { if (processedPacket.type == STDOUT) {
try { try {
if(processedPacket.content.length > 0) emulatedStdout.push(processedPacket.content); if(processedPacket.content.length > 0) stdoutPush(processedPacket.content);
} catch (err) { } catch (err) {
//STDOUT will be lost instead of crashing the server //STDOUT will be lost instead of crashing the server
} }
@ -195,7 +196,7 @@ function createFastCGIHandler(options) {
} else if (protocolStatus == CANT_MPX_CONN) { } else if (protocolStatus == CANT_MPX_CONN) {
err = new Error("Multiplexed connections not supported by the FastCGI application"); err = new Error("Multiplexed connections not supported by the FastCGI application");
} }
emulatedStdout.push(null); stdoutPush(null);
if (emulatedStdout._readableState && emulatedStdout._readableState.flowing !== null && !emulatedStdout.endEmitted) { if (emulatedStdout._readableState && emulatedStdout._readableState.flowing !== null && !emulatedStdout.endEmitted) {
emulatedStdout.on("end", function() { emulatedStdout.on("end", function() {
emulatedStderr.push(null); emulatedStderr.push(null);
@ -206,7 +207,7 @@ function createFastCGIHandler(options) {
eventEmitter.emit("error", err); eventEmitter.emit("error", err);
} }
} else { } else {
emulatedStdout.push(null); stdoutPush(null);
if (emulatedStdout._readableState && emulatedStdout._readableState.flowing !== null && !emulatedStdout.endEmitted) { if (emulatedStdout._readableState && emulatedStdout._readableState.flowing !== null && !emulatedStdout.endEmitted) {
emulatedStdout.on("end", function() { emulatedStdout.on("end", function() {
emulatedStderr.push(null); emulatedStderr.push(null);
@ -249,9 +250,32 @@ function createFastCGIHandler(options) {
} }
}); });
function stdoutPush(data) {
if(data === null) {
stdoutToEnd = true;
} else {
var toResume = (stdoutBuffer.length == 0);
stdoutBuffer = Buffer.concat([stdoutBuffer, Buffer.from(data)]);
if(toResume) emulatedStdout.resume();
}
}
var stdoutBuffer = Buffer.alloc(0);
var stdoutToEnd = false;
var emulatedStdout = new stream.Readable({ var emulatedStdout = new stream.Readable({
read: function () {} read: function (n) {
if(stdoutBuffer.length == 0 && stdoutToEnd) {
this.push(null);
} else if (stdoutBuffer.length != 0 && n != 0) {
var bytesToPush = Math.min(stdoutBuffer.length, n);
var bufferToPush = stdoutBuffer.subarray(0, bytesToPush);
stdoutBuffer = stdoutBuffer.subarray(bytesToPush);
this.push(bufferToPush);
}
}
}); });
emulatedStdout.pause();
var emulatedStderr = new stream.Readable({ var emulatedStderr = new stream.Readable({
read: function () {} read: function () {}
}); });
@ -281,8 +305,9 @@ function createFastCGIHandler(options) {
var socket = net.createConnection(options, function () { var socket = net.createConnection(options, function () {
eventEmitter.emit("connect"); eventEmitter.emit("connect");
}).on("error", function (err) { }).on("error", function (err) {
emulatedStdout.push(null); stdoutBuffer.push(null);
emulatedStderr.push(null); emulatedStderr.push(null);
eventEmitter.removeAllListeners("exit");
eventEmitter.emit("error", err); eventEmitter.emit("error", err);
}).on("data", fastCGISocketHandler); }).on("data", fastCGISocketHandler);
@ -422,7 +447,7 @@ Mod.prototype.callback = function (req, res, serverconsole, responseEnd, href, e
res.write(buffer.substr(headerendline + eol.length), "latin1"); res.write(buffer.substr(headerendline + eol.length), "latin1");
} catch (ex) { } catch (ex) {
handler.removeAllListeners("exit"); handler.removeAllListeners("exit");
handler.stdout.removeAllListeners("dat"); handler.stdout.removeAllListeners("data");
if (!callServerError) { if (!callServerError) {
res.writeHead(500); res.writeHead(500);
res.end(ex.stack); res.end(ex.stack);
@ -475,11 +500,11 @@ Mod.prototype.callback = function (req, res, serverconsole, responseEnd, href, e
}); });
function handlerConnection() { function handlerConnection() {
handler.init();
handler.stdout.on("data", dataHandler); handler.stdout.on("data", dataHandler);
handler.stderr.on("data", function (data) { handler.stderr.on("data", function (data) {
stderr += data.toString(); stderr += data.toString();
}); });
handler.init();
req.pipe(handler.stdin); req.pipe(handler.stdin);
handler.on("exit", function (code, signal) { handler.on("exit", function (code, signal) {
if (!cned && (signal || code !== 0)) { if (!cned && (signal || code !== 0)) {
@ -496,6 +521,7 @@ Mod.prototype.callback = function (req, res, serverconsole, responseEnd, href, e
serverconsole.errmessage("There were FastCGI application errors:"); serverconsole.errmessage("There were FastCGI application errors:");
serverconsole.errmessage(preparedStderr); serverconsole.errmessage(preparedStderr);
} }
handler.stdout.removeListener("data", dataHandler);
handler.stdout.unpipe(res); //Prevent server crashes with write after the end handler.stdout.unpipe(res); //Prevent server crashes with write after the end
if(!res.finished) res.end(); if(!res.finished) res.end();
} }