Skip to content

Commit 1048b62

Browse files
Add SSH web access support (#5521)
1 parent 679e0b5 commit 1048b62

File tree

15 files changed

+557
-207
lines changed

15 files changed

+557
-207
lines changed
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
import pg from "pg";
2+
3+
import {
4+
TPostgresAccountCredentials,
5+
TPostgresResourceConnectionDetails
6+
} from "@app/ee/services/pam-resource/postgres/postgres-resource-types";
7+
import { logger } from "@app/lib/logger";
8+
9+
import { createPamSqlRepl } from "./pam-web-access-repl";
10+
import {
11+
parseWsClientMessage,
12+
resolveEndReason,
13+
SessionEndReason,
14+
TSessionContext,
15+
TSessionHandlerResult,
16+
WsMessageType
17+
} from "./pam-web-access-types";
18+
19+
type TPostgresSessionParams = {
20+
connectionDetails: TPostgresResourceConnectionDetails;
21+
credentials: TPostgresAccountCredentials;
22+
};
23+
24+
export const handlePostgresSession = async (
25+
ctx: TSessionContext,
26+
params: TPostgresSessionParams
27+
): Promise<TSessionHandlerResult> => {
28+
const { socket, relayPort, resourceName, sessionId, sendMessage, sendSessionEnd, isNearSessionExpiry, onCleanup } =
29+
ctx;
30+
const { connectionDetails, credentials } = params;
31+
32+
const pgClient = new pg.Client({
33+
host: "localhost",
34+
port: relayPort,
35+
user: credentials.username,
36+
database: connectionDetails.database,
37+
password: "",
38+
ssl: false,
39+
connectionTimeoutMillis: 30_000,
40+
statement_timeout: 30_000,
41+
types: {
42+
getTypeParser: () => (val: string | Buffer) => (typeof val === "string" ? val : val.toString("hex"))
43+
}
44+
});
45+
46+
await pgClient.connect();
47+
48+
const repl = createPamSqlRepl(pgClient);
49+
50+
sendMessage({
51+
type: WsMessageType.Ready,
52+
data: `Connected to ${resourceName} (${connectionDetails.database}) as ${credentials.username}\n\n`,
53+
prompt: "=> "
54+
});
55+
56+
logger.info({ sessionId }, "Postgres web access session established");
57+
58+
// Sequential message processing to prevent concurrent query issues
59+
let processingPromise = Promise.resolve();
60+
61+
socket.on("message", (rawData: Buffer | ArrayBuffer | Buffer[]) => {
62+
processingPromise = processingPromise
63+
.then(async () => {
64+
const message = parseWsClientMessage(rawData);
65+
if (!message) {
66+
sendMessage({
67+
type: WsMessageType.Output,
68+
data: "Invalid message format\n",
69+
prompt: repl.getPrompt()
70+
});
71+
return;
72+
}
73+
74+
if (message.type === WsMessageType.Control) {
75+
if (message.data === "quit") {
76+
sendSessionEnd(SessionEndReason.UserQuit);
77+
onCleanup();
78+
socket.close();
79+
return;
80+
}
81+
if (message.data === "clear-buffer") {
82+
repl.clearBuffer();
83+
return;
84+
}
85+
return;
86+
}
87+
88+
if (message.type === WsMessageType.Input) {
89+
const replResult = await repl.processInput(message.data);
90+
91+
if (replResult.shouldClose) {
92+
sendSessionEnd(SessionEndReason.UserQuit);
93+
onCleanup();
94+
socket.close();
95+
return;
96+
}
97+
98+
sendMessage({
99+
type: WsMessageType.Output,
100+
data: replResult.output,
101+
prompt: replResult.prompt
102+
});
103+
}
104+
})
105+
.catch((err) => {
106+
logger.error(err, "Error processing Postgres message");
107+
sendMessage({
108+
type: WsMessageType.Output,
109+
data: "Internal error\n",
110+
prompt: "=> "
111+
});
112+
});
113+
});
114+
115+
// Tunnel drop detection
116+
pgClient.on("error", (err) => {
117+
logger.error(err, "Database connection error");
118+
sendSessionEnd(resolveEndReason(isNearSessionExpiry));
119+
onCleanup();
120+
socket.close();
121+
});
122+
123+
pgClient.on("end", () => {
124+
sendSessionEnd(resolveEndReason(isNearSessionExpiry));
125+
onCleanup();
126+
socket.close();
127+
});
128+
129+
return {
130+
cleanup: async () => {
131+
try {
132+
await pgClient.end();
133+
} catch (err) {
134+
logger.debug(err, "Error closing pg client");
135+
}
136+
}
137+
};
138+
};
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
import { Client, type ClientChannel, type ConnectConfig } from "ssh2";
2+
3+
import {
4+
TSSHAccountCredentials,
5+
TSSHResourceConnectionDetails
6+
} from "@app/ee/services/pam-resource/ssh/ssh-resource-types";
7+
import { logger } from "@app/lib/logger";
8+
9+
import {
10+
parseWsClientMessage,
11+
resolveEndReason,
12+
SessionEndReason,
13+
TSessionContext,
14+
TSessionHandlerResult,
15+
WsMessageType
16+
} from "./pam-web-access-types";
17+
18+
type TSSHSessionParams = {
19+
connectionDetails: TSSHResourceConnectionDetails;
20+
credentials: TSSHAccountCredentials;
21+
};
22+
23+
export const handleSSHSession = async (
24+
ctx: TSessionContext,
25+
params: TSSHSessionParams
26+
): Promise<TSessionHandlerResult> => {
27+
const { socket, relayPort, resourceName, sessionId, sendMessage, sendSessionEnd, isNearSessionExpiry, onCleanup } =
28+
ctx;
29+
const { credentials } = params;
30+
31+
const client = new Client();
32+
let stream: ClientChannel | null = null;
33+
34+
const connectConfig: ConnectConfig = {
35+
host: "localhost",
36+
port: relayPort,
37+
username: credentials.username,
38+
readyTimeout: 30_000,
39+
authHandler: ["none"]
40+
};
41+
42+
return new Promise((resolve, reject) => {
43+
client.on("ready", () => {
44+
client.shell({ term: "xterm-256color", rows: 24, cols: 80 }, (err, shellStream) => {
45+
if (err) {
46+
logger.error(err, "Failed to open SSH shell");
47+
sendSessionEnd(SessionEndReason.SetupFailed);
48+
client.end();
49+
reject(err);
50+
return;
51+
}
52+
53+
stream = shellStream;
54+
55+
// Send Ready message
56+
sendMessage({
57+
type: WsMessageType.Ready,
58+
data: `Connected to ${resourceName} as ${credentials.username}\r\n`
59+
});
60+
61+
logger.info({ sessionId }, "SSH web access session shell opened");
62+
63+
// SSH -> WS: forward output from remote shell to WebSocket
64+
shellStream.on("data", (data: Buffer) => {
65+
sendMessage({
66+
type: WsMessageType.Output,
67+
data: data.toString("utf-8")
68+
});
69+
});
70+
71+
shellStream.stderr.on("data", (data: Buffer) => {
72+
sendMessage({
73+
type: WsMessageType.Output,
74+
data: data.toString("utf-8")
75+
});
76+
});
77+
78+
// WS -> SSH: forward input from WebSocket to remote shell
79+
socket.on("message", (rawData: Buffer | ArrayBuffer | Buffer[]) => {
80+
const message = parseWsClientMessage(rawData);
81+
if (!message) return;
82+
83+
if (message.type === WsMessageType.Input) {
84+
// Raw keystroke forwarding — no buffering, no local echo
85+
shellStream.write(message.data);
86+
} else if (message.type === WsMessageType.Resize) {
87+
try {
88+
const { rows, cols } = JSON.parse(message.data) as { rows: number; cols: number };
89+
shellStream.setWindow(rows, cols, 0, 0);
90+
} catch {
91+
logger.debug("Invalid resize data received");
92+
}
93+
} else if (message.type === WsMessageType.Control) {
94+
if (message.data === "quit") {
95+
shellStream.close();
96+
client.end();
97+
}
98+
}
99+
});
100+
101+
// Shell stream close
102+
shellStream.on("close", () => {
103+
sendSessionEnd(resolveEndReason(isNearSessionExpiry));
104+
onCleanup();
105+
socket.close();
106+
});
107+
108+
resolve({
109+
cleanup: async () => {
110+
try {
111+
stream?.close();
112+
} catch (streamErr) {
113+
logger.debug(streamErr, "Error closing SSH stream");
114+
}
115+
try {
116+
client.end();
117+
} catch (clientErr) {
118+
logger.debug(clientErr, "Error closing SSH client");
119+
}
120+
}
121+
});
122+
});
123+
});
124+
125+
client.on("error", (err) => {
126+
logger.error(err, "SSH client connection error");
127+
if (stream) {
128+
// Session was established, then errored
129+
sendSessionEnd(resolveEndReason(isNearSessionExpiry));
130+
onCleanup();
131+
socket.close();
132+
} else {
133+
// Connection never established
134+
reject(err);
135+
}
136+
});
137+
138+
client.on("end", () => {
139+
logger.debug({ sessionId }, "SSH client connection ended");
140+
});
141+
142+
client.connect(connectConfig);
143+
});
144+
};

0 commit comments

Comments
 (0)