Skip to content

Commit fc28e8c

Browse files
authored
feat(proxy): websocket proxying, CSRF handling (cryostatio#85)
1 parent c3a5ca4 commit fc28e8c

File tree

6 files changed

+265
-91
lines changed

6 files changed

+265
-91
lines changed

backend/package-lock.json

Lines changed: 43 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
"@kubernetes/client-node": "^0.22.1",
1616
"express": "^4.19.2",
1717
"follow-redirects": "^1.15.6",
18+
"http-proxy": "^1.18.1",
1819
"jsonpath-plus": "^8.1.0",
1920
"morgan": "^1.10.0",
2021
"qs": "^6.11.0"
2122
},
2223
"devDependencies": {
2324
"@types/express": "^4.17.21",
2425
"@types/follow-redirects": "^1.14.4",
26+
"@types/http-proxy": "^1.17.15",
2527
"@types/morgan": "^1.9.9",
2628
"@types/node": "^22.0.2",
2729
"@types/qs": "^6.9.15",

backend/src/server.ts

Lines changed: 146 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414
* limitations under the License.
1515
*/
1616
import { http, https } from 'follow-redirects';
17+
import httpProxy from 'http-proxy';
1718
import fs from 'fs';
1819
import * as k8s from '@kubernetes/client-node';
1920
import express from 'express';
2021
import morgan from 'morgan';
21-
import qs from 'qs';
22+
import { ParsedQs } from 'qs';
2223
import { Duplex } from 'stream';
2324

24-
const app = express();
2525
const port = process.env.PORT || 9943;
2626
const skipTlsVerify = process.env.NODE_TLS_REJECT_UNAUTHORIZED == '0';
2727
const htmlDir = process.env.HTML_DIR || './html';
@@ -45,6 +45,9 @@ kc.applyToRequest({
4545

4646
const k8sApi = kc.makeApiClient(k8s.CoreV1Api);
4747

48+
const app = express();
49+
const proxy = httpProxy.createProxyServer({ ws: true });
50+
4851
app.use(morgan('combined'));
4952

5053
let connections: Duplex[] = [];
@@ -55,21 +58,64 @@ app.get('/health', (_, res) => {
5558
res.status(204).send();
5659
});
5760

58-
app.use('/upstream/*', async (req, res) => {
59-
let ns = req.headers['cryostat-svc-ns'];
60-
let name = req.headers['cryostat-svc-name'];
61+
type CryostatInstance = { ns: string; name: string };
62+
63+
const getQuery = (q: ParsedQs, key: string, def?: string): string | undefined => {
64+
let v = q[key];
65+
while (Array.isArray(v)) {
66+
v = v[0];
67+
}
68+
if (!v) {
69+
v = def;
70+
}
71+
if (typeof v === 'string') {
72+
return v;
73+
}
74+
return def;
75+
};
76+
77+
const getSearchParam = (q: URLSearchParams, key: string, def?: string): string | undefined => {
78+
return (q.has(key) ? q.get(key) : def) ?? def;
79+
};
80+
81+
/* eslint-disable @typescript-eslint/no-explicit-any */
82+
const getCryostatInstance = (req: any): CryostatInstance => {
83+
let ns;
84+
let name;
85+
if (req.headers) {
86+
ns = req.headers['cryostat-svc-ns'];
87+
name = req.headers['cryostat-svc-name'];
88+
}
89+
if (!ns && !name && req.query) {
90+
ns = getQuery(req.query, 'ns');
91+
name = getQuery(req.query, 'name');
92+
}
93+
if (!ns && !name && req.searchParams) {
94+
ns = getSearchParam(req.searchParams, 'ns');
95+
name = getSearchParam(req.searchParams, 'name');
96+
}
6197
if (!ns || !name) {
62-
res.status(400).send();
63-
return;
98+
throw new Error(
99+
`Proxy request from ${req.hostname} ${req.url} requested <${ns}, ${name}> - values cannot be falsey`,
100+
);
64101
}
65102
if (Array.isArray(ns)) {
66103
ns = ns[0];
67104
}
68105
if (Array.isArray(name)) {
69106
name = name[0];
70107
}
108+
return {
109+
ns,
110+
name,
111+
};
112+
};
71113

72-
const svc = await k8sApi.readNamespacedService(name, ns).catch((err) => {
114+
const getServicePort = async (instance: CryostatInstance): Promise<{ tls: boolean; port: number }> => {
115+
let tls = true;
116+
let svcPort;
117+
118+
const svc = await k8sApi.readNamespacedService(instance.name, instance.ns).catch((err) => {
73119
console.error(err);
74120
throw err;
75121
});
@@ -78,15 +124,10 @@ app.use('/upstream/*', async (req, res) => {
78124
!(svcLabels['app.kubernetes.io/part-of'] === 'cryostat' && svcLabels['app.kubernetes.io/component'] === 'cryostat')
79125
) {
80126
throw new Error(
81-
`Selected Service "${name}" in namespace "${ns}" does not have the expected Cryostat selector labels`,
127+
`Selected Service "${instance.name}" in namespace "${instance.ns}" does not have the expected Cryostat selector labels`,
82128
);
83129
}
84130

85-
const host = `${name}.${ns}`;
86-
const method = req.method;
87-
88-
let tls;
89-
let svcPort;
90131
// select ports by appProtocol, preferring https over http
91132
for (const port of svc?.body?.spec?.ports ?? []) {
92133
if (port.appProtocol === 'https') {
@@ -116,64 +157,112 @@ app.use('/upstream/*', async (req, res) => {
116157
}
117158
if (!svcPort) {
118159
throw new Error(
119-
`Could not find suitable port with http(s) appProtocol or with name ending in http(s) on <${name}, ${ns}>`,
160+
`Could not find suitable port with http(s) appProtocol or with name ending in http(s) on <${instance.ns}, ${instance.name}>`,
120161
);
121162
}
163+
return {
164+
tls,
165+
port: svcPort,
166+
};
167+
};
122168

123-
const proto = tls ? https : http;
169+
const getProxyTarget = async ({ ns, name }: CryostatInstance): Promise<string> => {
170+
const port = await getServicePort({ ns, name });
171+
const tls = port.tls;
172+
const svcPort = port.port;
173+
const host = `${name}.${ns}`;
124174

125-
let path = (req.baseUrl + req.path).slice('/upstream'.length);
126-
if (path.endsWith('/')) {
127-
path = path.slice(0, -1);
175+
return `http${tls ? 's' : ''}://${host}:${svcPort}`;
176+
};
177+
178+
app.use('/upstream/*', async (req, res) => {
179+
let ns: string;
180+
let name: string;
181+
try {
182+
const instance = getCryostatInstance(req);
183+
ns = instance.ns;
184+
name = instance.name;
185+
} catch (err) {
186+
console.warn(err);
187+
res.status(400).send();
188+
return;
189+
}
190+
191+
const method = req.method;
192+
193+
let tls;
194+
try {
195+
const port = await getServicePort({ ns, name });
196+
tls = port.tls;
197+
} catch (err) {
198+
console.error(err);
199+
res.status(502).send();
200+
return;
128201
}
129-
const query = qs.stringify(req.query);
130-
if (query) {
131-
path += `?${query}`;
202+
203+
/* eslint-disable @typescript-eslint/no-explicit-any */
204+
const headers = {} as any;
205+
for (const [key, value] of Object.entries(req.headers)) {
206+
if (typeof value === 'string') {
207+
headers[key] = value;
208+
} else if (Array.isArray(value)) {
209+
headers[key] = value.join();
210+
}
132211
}
133-
const initOptions = {
134-
host,
212+
const opts: httpProxy.ServerOptions = {
213+
agent: (tls ? https : http).globalAgent,
135214
method,
136-
path,
137-
port: svcPort,
138-
headers: {
139-
Authorization: req.headers.authorization,
140-
Referer: req.headers.referer,
141-
},
142-
};
143-
console.log(
144-
`Proxying <${ns}, ${name}> ${method} ${req.path} -> ${tls ? 'https' : 'http'}://${host}:${svcPort}${path}`,
145-
);
146-
const options = {
147-
...initOptions,
148-
agent: new proto.Agent(initOptions),
215+
target: await getProxyTarget({ ns, name }),
216+
headers,
217+
followRedirects: true,
218+
secure: !skipTlsVerify,
219+
ssl: tlsOpts,
220+
xfwd: true,
149221
};
150-
let body = '';
151-
const upReq = proto.request(options, (upRes) => {
152-
upRes.setEncoding('utf8');
153-
upRes.setTimeout(10_000, () => {
154-
res.status(504).send();
155-
});
156-
upRes.on('data', (chunk) => (body += chunk));
157-
upRes.on('end', () => {
158-
console.log(`${host} ${path} : ${upRes.statusCode} ${body.length}`);
159-
res.status(upRes.statusCode ?? 503).send(body);
160-
});
161-
});
162-
upReq.on('error', (e) => {
163-
console.error(e);
164-
res.status(502).send();
165-
});
166-
upReq.end();
222+
const correctedUrl = (req.baseUrl + req.url).replace(/^\/upstream(\.*)/, '');
223+
req.url = correctedUrl;
224+
console.log(`Proxying <${ns}, ${name}> ${method} ${req.url} -> ${opts.target}`);
225+
proxy.web(req, res, opts);
167226
});
168227

169-
const svc = https.createServer(tlsOpts, app).listen(port, () => {
170-
console.log(`Service started on port ${port} using ${tlsCertPath}`);
171-
});
228+
const svc = https.createServer(tlsOpts, app);
172229

173230
svc.on('connection', (connection) => {
174231
connections.push(connection);
175232
connection.on('close', () => (connections = connections.filter((curr) => curr !== connection)));
176233
});
234+
svc.on('upgrade', async (req, sock, head) => {
235+
console.log(`WebSocket Upgrade: ${req.url}`);
236+
if (!req.url) {
237+
throw new Error(`Cannot upgrade WebSocket connection to: ${req.url}`);
238+
}
239+
const u = URL.parse(req.url, 'http://localhost');
240+
if (!u) {
241+
throw new Error(`Could not parse request URL: ${req.url}`);
242+
}
243+
const r2 = {
244+
...req,
245+
searchParams: u.searchParams,
246+
};
247+
try {
248+
const instance = getCryostatInstance(r2);
249+
const target = await getProxyTarget(instance);
250+
const correctedUrl = req.url.replace(/^\/upstream(\.*)/, '');
251+
req.url = correctedUrl;
252+
console.log(`WebSocket ${req.url} -> ${target}`);
253+
proxy.ws(req, sock, head, {
254+
target,
255+
followRedirects: true,
256+
secure: !skipTlsVerify,
257+
ssl: tlsOpts,
258+
});
259+
} catch (err) {
260+
console.error(err);
261+
}
262+
});
263+
svc.listen(port, () => {
264+
console.log(`Service started on port ${port} using ${tlsCertPath}`);
265+
});
177266

178267
const shutdown = () => {
179268
console.log('Received kill signal, shutting down gracefully');

0 commit comments

Comments
 (0)