Skip to content

Commit 9f8b4c9

Browse files
committed
feat: websocket server proxying to provider websocket
refs #1024
1 parent d64e7b2 commit 9f8b4c9

File tree

20 files changed

+1172
-147
lines changed

20 files changed

+1172
-147
lines changed

apps/api/package.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
"@cosmjs/tendermint-rpc": "^0.32.4",
5858
"@dotenvx/dotenvx": "^1.9.0",
5959
"@hono/node-server": "1.13.7",
60+
"@hono/node-ws": "^1.2.0",
6061
"@hono/otel": "^0.1.0",
6162
"@hono/swagger-ui": "0.4.1",
6263
"@hono/zod-openapi": "0.18.4",
@@ -90,6 +91,7 @@
9091
"markdown-to-txt": "^2.0.1",
9192
"memory-cache": "^0.2.0",
9293
"murmurhash": "^2.0.1",
94+
"node-forge": "^1.3.1",
9395
"pg": "^8.12.0",
9496
"pg-hstore": "^2.3.4",
9597
"postgres": "^3.4.4",
@@ -125,6 +127,7 @@
125127
"@types/pg": "^8.11.6",
126128
"@types/semver": "^7.5.2",
127129
"@types/uuid": "^8.3.1",
130+
"@types/ws": "^8.18.1",
128131
"@typescript-eslint/eslint-plugin": "^7.12.0",
129132
"alias-hq": "^5.1.6",
130133
"copy-webpack-plugin": "^12.0.2",
@@ -148,6 +151,7 @@
148151
"ts-loader": "^9.5.2",
149152
"type-fest": "^4.26.1",
150153
"typescript": "~5.8.2",
154+
"wait-for-expect": "^4.0.0",
151155
"webpack": "^5.91.0",
152156
"webpack-cli": "4.10.0",
153157
"webpack-node-externals": "^3.0.0"

apps/api/src/app.ts

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ import "reflect-metadata";
33
import { LoggerService } from "@akashnetwork/logging";
44
import { HttpLoggerIntercepter } from "@akashnetwork/logging/hono";
55
import { serve } from "@hono/node-server";
6+
import { createNodeWebSocket } from "@hono/node-ws";
67
import { otel } from "@hono/otel";
78
import { swaggerUI } from "@hono/swagger-ui";
89
import { Hono } from "hono";
910
import { cors } from "hono/cors";
1011
import once from "lodash/once";
12+
import type { AddressInfo } from "net";
1113
import { container } from "tsyringe";
1214

1315
import { AuthInterceptor } from "@src/auth/services/auth.interceptor";
@@ -38,6 +40,7 @@ import { userRouter } from "./routers/userRouter";
3840
import { web3IndexRouter } from "./routers/web3indexRouter";
3941
import { env } from "./utils/env";
4042
import { bytesToHumanReadableSize } from "./utils/files";
43+
import { initLeaseWebsocketRoute } from "./websocket/routes/websocket/websocket.router";
4144
import { addressRouter } from "./address";
4245
import { sendVerificationEmailRouter } from "./auth";
4346
import {
@@ -160,6 +163,9 @@ for (const handler of openApiHonoHandlers) {
160163
appHono.route("/", handler);
161164
}
162165

166+
const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app: appHono });
167+
initLeaseWebsocketRoute(appHono, upgradeWebSocket);
168+
163169
appHono.route("/", notificationsApiProxy);
164170

165171
appHono.route("/", healthzRouter);
@@ -196,22 +202,31 @@ const appLogger = LoggerService.forContext("APP");
196202
* Start scheduler
197203
* Start server
198204
*/
199-
export async function initApp() {
205+
export async function initApp(port: number = Number(PORT)): Promise<AppServer> {
200206
try {
201207
await Promise.all([initDb(), container.resolve(FeatureFlagsService).initialize()]);
202208
startScheduler();
203209

204-
appLogger.info({ event: "SERVER_STARTING", url: `http://localhost:${PORT}`, NODE_OPTIONS: process.env.NODE_OPTIONS });
210+
appLogger.info({ event: "SERVER_STARTING", url: `http://localhost:${port}`, NODE_OPTIONS: process.env.NODE_OPTIONS });
205211
const server = serve({
206212
fetch: appHono.fetch,
207-
port: typeof PORT === "string" ? parseInt(PORT, 10) : PORT
213+
port: typeof port === "string" ? parseInt(port, 10) : port
208214
});
215+
injectWebSocket(server);
209216
const shutdown = once(() => shutdownServer(server, appLogger, container.dispose.bind(container)));
210217

211218
process.on("SIGTERM", shutdown);
212219
process.on("SIGINT", shutdown);
220+
221+
return {
222+
host: `http://localhost:${(server.address() as AddressInfo).port}`,
223+
async close() {
224+
await shutdown();
225+
}
226+
};
213227
} catch (error) {
214228
appLogger.error({ event: "APP_INIT_ERROR", error });
229+
throw error;
215230
}
216231
}
217232

@@ -236,3 +251,8 @@ export async function initDb() {
236251
}
237252

238253
export { appHono as app };
254+
255+
export interface AppServer {
256+
host: string;
257+
close(): Promise<void>;
258+
}

apps/api/src/core/lib/telemetry.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import type { Span } from "@opentelemetry/api";
2+
import { context } from "@opentelemetry/api";
3+
import { trace } from "@opentelemetry/api";
4+
5+
export function traceActiveSpan<T extends (span: Span) => any>(name: string, callback: T): ReturnType<T> {
6+
return trace.getTracer("default").startActiveSpan(name, callback);
7+
}
8+
9+
export function propagateTracingContext<T extends (...args: any[]) => any>(callback: T): T {
10+
const currentContext = context.active();
11+
return ((...args) => context.with(currentContext, () => callback(...args))) as T;
12+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
export async function shutdownServer(server: ClosableServer | undefined): Promise<void> {
2+
if (!server || !server.listening) return;
3+
return new Promise<void>((resolve, reject) => {
4+
server.close(error => {
5+
if (error) {
6+
reject(error);
7+
} else {
8+
resolve();
9+
}
10+
});
11+
});
12+
}
13+
14+
export interface ClosableServer {
15+
close: (cb?: (error?: Error) => void) => void;
16+
listening: boolean;
17+
}

apps/api/src/provider/services/jwt-token/jwt-token.service.spec.ts

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,12 @@ describe("JwtTokenService", () => {
8282
});
8383
});
8484

85-
describe("getGranularLeases", () => {
85+
describe("getScopedLeases", () => {
8686
it("returns leases for a provider, scoped", () => {
8787
const { jwtTokenService } = setup();
8888

8989
const provider = createAkashAddress();
90-
const result = jwtTokenService.getGranularLeases({ provider, scope: ["status"] });
90+
const result = jwtTokenService.getScopedLeases({ provider, scope: ["status"] });
9191

9292
expect(result).toEqual({
9393
access: "granular",
@@ -102,6 +102,25 @@ describe("JwtTokenService", () => {
102102
});
103103
});
104104

105+
describe("getFullAccessLeases", () => {
106+
it("returns leases for a provider, with full access", () => {
107+
const { jwtTokenService } = setup();
108+
109+
const provider = createAkashAddress();
110+
const result = jwtTokenService.getFullAccessLeases({ provider });
111+
112+
expect(result).toEqual({
113+
access: "granular",
114+
permissions: [
115+
{
116+
provider,
117+
access: "full"
118+
}
119+
]
120+
});
121+
});
122+
});
123+
105124
function setup(): {
106125
jwtTokenService: JwtTokenService;
107126
mockBillingConfig: MockProxy<BillingConfig>;

apps/api/src/provider/services/jwt-token/jwt-token.service.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,17 @@ export class JwtTokenService {
5151
return { jwtToken, address: akashWallet.address };
5252
}
5353

54-
getGranularLeases({ provider, scope }: { provider: string; scope: AccessScope[] }): JwtTokenPayload["leases"] {
54+
getScopedLeases({ provider, scope }: { provider: string; scope: AccessScope[] }): JwtTokenPayload["leases"] {
5555
return {
5656
access: "granular",
5757
permissions: [{ provider, access: "scoped", scope }]
5858
};
5959
}
60+
61+
getFullAccessLeases({ provider }: { provider: string }): JwtTokenPayload["leases"] {
62+
return {
63+
access: "granular",
64+
permissions: [{ provider, access: "full" }]
65+
};
66+
}
6067
}

apps/api/src/provider/services/provider/provider.service.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ describe(ProviderService.name, () => {
3535

3636
providerHttpService.getProvider.mockResolvedValue(mockProviderResponse);
3737
jwtTokenService.generateJwtToken.mockResolvedValue(jwtToken);
38-
jwtTokenService.getGranularLeases.mockReturnValue(leases);
38+
jwtTokenService.getScopedLeases.mockReturnValue(leases);
3939
providerHttpService.sendManifest.mockResolvedValue({ success: true });
4040

4141
const result = await service.sendManifest({ provider: providerAddress, dseq, manifest, walletId });
@@ -187,7 +187,7 @@ describe(ProviderService.name, () => {
187187

188188
providerHttpService.getProvider.mockResolvedValue(mockProviderResponse);
189189
jwtTokenService.generateJwtToken.mockResolvedValue(jwtToken);
190-
jwtTokenService.getGranularLeases.mockReturnValue(leases);
190+
jwtTokenService.getScopedLeases.mockReturnValue(leases);
191191
providerHttpService.getLeaseStatus.mockResolvedValue(mockLeaseStatus);
192192

193193
const result = await service.getLeaseStatus(providerAddress, dseq, gseq, oseq, walletId);

apps/api/src/provider/services/provider/provider.service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ export class ProviderService {
6161
try {
6262
const jwtToken = await this.jwtTokenService.generateJwtToken({
6363
walletId,
64-
leases: this.jwtTokenService.getGranularLeases({
64+
leases: this.jwtTokenService.getScopedLeases({
6565
provider: providerIdentity.owner,
6666
scope: ["send-manifest"]
6767
})
@@ -89,7 +89,7 @@ export class ProviderService {
8989

9090
const jwtToken = await this.jwtTokenService.generateJwtToken({
9191
walletId,
92-
leases: this.jwtTokenService.getGranularLeases({
92+
leases: this.jwtTokenService.getScopedLeases({
9393
provider,
9494
scope: ["status"]
9595
})
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
import { LoggerService } from "@akashnetwork/logging";
2+
import { Attributes, Span } from "@opentelemetry/api";
3+
import { WSContext } from "hono/ws";
4+
import { singleton } from "tsyringe";
5+
6+
import { AuthService } from "@src/auth/services/auth.service";
7+
import { traceActiveSpan } from "@src/core/lib/telemetry";
8+
import { MESSAGE_SCHEMA, WsMessage } from "@src/websocket/http-schemas/websocket.schema";
9+
import { ProviderWebsocketService } from "@src/websocket/services/provider-websocket/provider-websocket.service";
10+
import { ClientWebSocketStats, WebSocketUsage } from "@src/websocket/services/websocket-stats/websocket-stats.service";
11+
12+
const logger = LoggerService.forContext("LeaseWebsocketController");
13+
14+
@singleton()
15+
export class WebsocketController {
16+
constructor(
17+
private readonly authService: AuthService,
18+
private readonly providerWebsocketService: ProviderWebsocketService
19+
) {}
20+
21+
async handleOpen(stats: ClientWebSocketStats) {
22+
const { currentUser, ability } = this.authService;
23+
24+
logger.info({
25+
event: "WEBSOCKET_CONNECTION_OPENED",
26+
userId: currentUser?.id,
27+
hasAbility: !!ability
28+
});
29+
30+
stats.setUserIfExists(currentUser, ability);
31+
}
32+
33+
async handleMessage(messageStr: string, ws: WSContext, stats: ClientWebSocketStats) {
34+
logger.debug({
35+
event: "WEBSOCKET_MESSAGE_RECEIVED",
36+
messageLength: messageStr?.length
37+
});
38+
39+
const userInfo = stats.getUser();
40+
if (userInfo) {
41+
logger.debug({
42+
event: "WEBSOCKET_USER_INFO_RETRIEVED",
43+
userId: userInfo.currentUser?.id
44+
});
45+
}
46+
47+
traceActiveSpan("ws.message", async span => {
48+
let message: WsMessage | undefined;
49+
try {
50+
message = typeof messageStr === "string" ? JSON.parse(messageStr) : undefined;
51+
} catch (error) {
52+
logger.error({
53+
event: "CLIENT_MESSAGE_INVALID_JSON",
54+
message: "Received message is not a JSON string",
55+
messageLength: messageStr?.length,
56+
messageType: typeof messageStr,
57+
messageConstructor: messageStr?.constructor?.name
58+
});
59+
}
60+
61+
if (!message) {
62+
return ws.send(
63+
JSON.stringify({
64+
type: "websocket",
65+
message: "Message is not a JSON string",
66+
error: "Invalid message format"
67+
})
68+
);
69+
}
70+
71+
const parsedMessage = MESSAGE_SCHEMA.safeParse(message);
72+
if (parsedMessage.error) {
73+
logger.error({
74+
event: "CLIENT_MESSAGE_INVALID_JSON",
75+
message: "Message doesn't match expected schema",
76+
error: parsedMessage.error
77+
});
78+
return ws.send(
79+
JSON.stringify({
80+
type: "websocket",
81+
message: "Message doesn't match expected schema",
82+
error: "Invalid message format"
83+
})
84+
);
85+
}
86+
87+
const attributes: Attributes = {
88+
type: message.type
89+
};
90+
if (message.type === "websocket") {
91+
attributes.providerUrl = message.url;
92+
attributes.providerAddress = message.providerAddress;
93+
attributes.chainNetwork = message.chainNetwork;
94+
attributes.function = getWebSocketUsage(message);
95+
}
96+
97+
span.setAttributes(attributes);
98+
logger.info({
99+
event: "NEW_WEBSOCKET_MESSAGE",
100+
attributes
101+
});
102+
103+
stats.setUsage(getWebSocketUsage(message));
104+
105+
try {
106+
if (message.type === "ping") {
107+
ws.send(
108+
JSON.stringify({
109+
type: "pong"
110+
})
111+
);
112+
} else if (message.type === "websocket") {
113+
await this.providerWebsocketService.proxyMessageToProvider(message, ws, stats);
114+
}
115+
} catch (err) {
116+
logger.error({
117+
event: "CLIENT_MESSAGE_SEND_ERROR",
118+
error: err
119+
});
120+
ws.send(
121+
JSON.stringify({
122+
id: message.id,
123+
error: "Unable to send message to provider socket",
124+
type: message.type
125+
})
126+
);
127+
}
128+
129+
span.end();
130+
});
131+
}
132+
133+
async handleClose(_event: any, stats: ClientWebSocketStats, span?: Span) {
134+
logger.info("Closing socket");
135+
stats.close();
136+
137+
this.providerWebsocketService.closeProviderSocket(stats.id);
138+
139+
span?.end();
140+
}
141+
142+
async handleError(event: Event) {
143+
logger.error({
144+
event: "CLIENT_WEBSOCKET_ERROR",
145+
error: event
146+
});
147+
}
148+
}
149+
150+
function getWebSocketUsage(message: any): WebSocketUsage {
151+
if (message.type === "websocket") {
152+
if (message.url.includes("logs?follow=false&tail=10000000")) return "DownloadLogs";
153+
if (message.url.includes("logs?follow=true")) return "StreamLogs";
154+
if (message.url.includes("kubeevents?follow=true")) return "StreamEvents";
155+
if (message.url.includes("/shell?stdin=")) return "Shell";
156+
}
157+
158+
return "Unknown";
159+
}

0 commit comments

Comments
 (0)