160 lines
6.3 KiB
JavaScript
160 lines
6.3 KiB
JavaScript
|
import { HttpResponse } from "@smithy/protocol-http";
|
||
|
import { buildQueryString } from "@smithy/querystring-builder";
|
||
|
import { constants } from "http2";
|
||
|
import { getTransformedHeaders } from "./get-transformed-headers";
|
||
|
import { NodeHttp2ConnectionManager } from "./node-http2-connection-manager";
|
||
|
import { writeRequestBody } from "./write-request-body";
|
||
|
export class NodeHttp2Handler {
|
||
|
static create(instanceOrOptions) {
|
||
|
if (typeof instanceOrOptions?.handle === "function") {
|
||
|
return instanceOrOptions;
|
||
|
}
|
||
|
return new NodeHttp2Handler(instanceOrOptions);
|
||
|
}
|
||
|
constructor(options) {
|
||
|
this.metadata = { handlerProtocol: "h2" };
|
||
|
this.connectionManager = new NodeHttp2ConnectionManager({});
|
||
|
this.configProvider = new Promise((resolve, reject) => {
|
||
|
if (typeof options === "function") {
|
||
|
options()
|
||
|
.then((opts) => {
|
||
|
resolve(opts || {});
|
||
|
})
|
||
|
.catch(reject);
|
||
|
}
|
||
|
else {
|
||
|
resolve(options || {});
|
||
|
}
|
||
|
});
|
||
|
}
|
||
|
destroy() {
|
||
|
this.connectionManager.destroy();
|
||
|
}
|
||
|
async handle(request, { abortSignal } = {}) {
|
||
|
if (!this.config) {
|
||
|
this.config = await this.configProvider;
|
||
|
this.connectionManager.setDisableConcurrentStreams(this.config.disableConcurrentStreams || false);
|
||
|
if (this.config.maxConcurrentStreams) {
|
||
|
this.connectionManager.setMaxConcurrentStreams(this.config.maxConcurrentStreams);
|
||
|
}
|
||
|
}
|
||
|
const { requestTimeout, disableConcurrentStreams } = this.config;
|
||
|
return new Promise((_resolve, _reject) => {
|
||
|
let fulfilled = false;
|
||
|
let writeRequestBodyPromise = undefined;
|
||
|
const resolve = async (arg) => {
|
||
|
await writeRequestBodyPromise;
|
||
|
_resolve(arg);
|
||
|
};
|
||
|
const reject = async (arg) => {
|
||
|
await writeRequestBodyPromise;
|
||
|
_reject(arg);
|
||
|
};
|
||
|
if (abortSignal?.aborted) {
|
||
|
fulfilled = true;
|
||
|
const abortError = new Error("Request aborted");
|
||
|
abortError.name = "AbortError";
|
||
|
reject(abortError);
|
||
|
return;
|
||
|
}
|
||
|
const { hostname, method, port, protocol, query } = request;
|
||
|
let auth = "";
|
||
|
if (request.username != null || request.password != null) {
|
||
|
const username = request.username ?? "";
|
||
|
const password = request.password ?? "";
|
||
|
auth = `${username}:${password}@`;
|
||
|
}
|
||
|
const authority = `${protocol}//${auth}${hostname}${port ? `:${port}` : ""}`;
|
||
|
const requestContext = { destination: new URL(authority) };
|
||
|
const session = this.connectionManager.lease(requestContext, {
|
||
|
requestTimeout: this.config?.sessionTimeout,
|
||
|
disableConcurrentStreams: disableConcurrentStreams || false,
|
||
|
});
|
||
|
const rejectWithDestroy = (err) => {
|
||
|
if (disableConcurrentStreams) {
|
||
|
this.destroySession(session);
|
||
|
}
|
||
|
fulfilled = true;
|
||
|
reject(err);
|
||
|
};
|
||
|
const queryString = buildQueryString(query || {});
|
||
|
let path = request.path;
|
||
|
if (queryString) {
|
||
|
path += `?${queryString}`;
|
||
|
}
|
||
|
if (request.fragment) {
|
||
|
path += `#${request.fragment}`;
|
||
|
}
|
||
|
const req = session.request({
|
||
|
...request.headers,
|
||
|
[constants.HTTP2_HEADER_PATH]: path,
|
||
|
[constants.HTTP2_HEADER_METHOD]: method,
|
||
|
});
|
||
|
session.ref();
|
||
|
req.on("response", (headers) => {
|
||
|
const httpResponse = new HttpResponse({
|
||
|
statusCode: headers[":status"] || -1,
|
||
|
headers: getTransformedHeaders(headers),
|
||
|
body: req,
|
||
|
});
|
||
|
fulfilled = true;
|
||
|
resolve({ response: httpResponse });
|
||
|
if (disableConcurrentStreams) {
|
||
|
session.close();
|
||
|
this.connectionManager.deleteSession(authority, session);
|
||
|
}
|
||
|
});
|
||
|
if (requestTimeout) {
|
||
|
req.setTimeout(requestTimeout, () => {
|
||
|
req.close();
|
||
|
const timeoutError = new Error(`Stream timed out because of no activity for ${requestTimeout} ms`);
|
||
|
timeoutError.name = "TimeoutError";
|
||
|
rejectWithDestroy(timeoutError);
|
||
|
});
|
||
|
}
|
||
|
if (abortSignal) {
|
||
|
abortSignal.onabort = () => {
|
||
|
req.close();
|
||
|
const abortError = new Error("Request aborted");
|
||
|
abortError.name = "AbortError";
|
||
|
rejectWithDestroy(abortError);
|
||
|
};
|
||
|
}
|
||
|
req.on("frameError", (type, code, id) => {
|
||
|
rejectWithDestroy(new Error(`Frame type id ${type} in stream id ${id} has failed with code ${code}.`));
|
||
|
});
|
||
|
req.on("error", rejectWithDestroy);
|
||
|
req.on("aborted", () => {
|
||
|
rejectWithDestroy(new Error(`HTTP/2 stream is abnormally aborted in mid-communication with result code ${req.rstCode}.`));
|
||
|
});
|
||
|
req.on("close", () => {
|
||
|
session.unref();
|
||
|
if (disableConcurrentStreams) {
|
||
|
session.destroy();
|
||
|
}
|
||
|
if (!fulfilled) {
|
||
|
rejectWithDestroy(new Error("Unexpected error: http2 request did not get a response"));
|
||
|
}
|
||
|
});
|
||
|
writeRequestBodyPromise = writeRequestBody(req, request, requestTimeout);
|
||
|
});
|
||
|
}
|
||
|
updateHttpClientConfig(key, value) {
|
||
|
this.config = undefined;
|
||
|
this.configProvider = this.configProvider.then((config) => {
|
||
|
return {
|
||
|
...config,
|
||
|
[key]: value,
|
||
|
};
|
||
|
});
|
||
|
}
|
||
|
httpHandlerConfigs() {
|
||
|
return this.config ?? {};
|
||
|
}
|
||
|
destroySession(session) {
|
||
|
if (!session.destroyed) {
|
||
|
session.destroy();
|
||
|
}
|
||
|
}
|
||
|
}
|