Skip to content

Commit 4410a88

Browse files
committed
wip
1 parent 87fc924 commit 4410a88

File tree

3 files changed

+381
-13
lines changed

3 files changed

+381
-13
lines changed

yarn-project/txe/src/index.ts

Lines changed: 99 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ import { createHash } from 'crypto';
2020
import { createReadStream } from 'fs';
2121
import { readFile, readdir } from 'fs/promises';
2222
import { join, parse } from 'path';
23+
import { Worker } from 'worker_threads';
2324
import { z } from 'zod';
2425

25-
import { type TXEOracleFunctionName, TXESession } from './txe_session.js';
26+
import type { TXEOracleFunctionName } from './txe_session.js';
2627
import {
2728
type ForeignCallArgs,
2829
ForeignCallArgsSchema,
@@ -34,9 +35,15 @@ import {
3435
fromSingle,
3536
toSingle,
3637
} from './util/encoding.js';
38+
import { serializeForeignCallArgs, serializeProtocolContracts } from './util/serialization.js';
3739
import type { ContractArtifactWithHash } from './util/txe_contract_data_provider.js';
3840

39-
const sessions = new Map<number, TXESession>();
41+
interface SessionWorker {
42+
worker: Worker;
43+
processing: boolean;
44+
}
45+
46+
const sessionWorkers = new Map<number, SessionWorker>();
4047

4148
/*
4249
* TXE typically has to load the same contract artifacts over and over again for multiple tests,
@@ -71,6 +78,90 @@ class TXEDispatcher {
7178

7279
constructor(private logger: Logger) {}
7380

81+
private waitForWorkerReady(worker: Worker): Promise<void> {
82+
return new Promise<void>((resolve, reject) => {
83+
worker.once('message', (msg: any) => {
84+
if (msg.type === 'ready') {
85+
resolve();
86+
} else if (msg.type === 'error') {
87+
reject(new Error(msg.error));
88+
}
89+
});
90+
});
91+
}
92+
93+
private async createSessionWorker(): Promise<SessionWorker> {
94+
// Ensure protocol contracts are loaded
95+
if (!this.protocolContracts) {
96+
this.protocolContracts = await Promise.all(
97+
protocolContractNames.map(name => new BundledProtocolContractsProvider().getProtocolContractArtifact(name)),
98+
);
99+
}
100+
101+
// Serialize protocol contracts for worker
102+
const serializedProtocolContracts = serializeProtocolContracts(this.protocolContracts);
103+
104+
const worker = new Worker('./dest/session_worker.js', {
105+
workerData: { serializedProtocolContracts },
106+
});
107+
108+
// Wait for the worker to signal it's ready
109+
await this.waitForWorkerReady(worker);
110+
111+
return {
112+
worker,
113+
processing: false,
114+
};
115+
}
116+
117+
private async getOrCreateWorker(sessionId: number): Promise<SessionWorker> {
118+
if (!sessionWorkers.has(sessionId)) {
119+
this.logger.debug(`Creating new worker for session ${sessionId}`);
120+
const worker = await this.createSessionWorker();
121+
sessionWorkers.set(sessionId, worker);
122+
}
123+
return sessionWorkers.get(sessionId)!;
124+
}
125+
126+
private sendToWorker(
127+
sessionWorker: SessionWorker,
128+
sessionId: number,
129+
functionName: TXEOracleFunctionName,
130+
inputs: ForeignCallArgs,
131+
): Promise<ForeignCallResult> {
132+
if (sessionWorker.processing) {
133+
throw new Error(
134+
`Session ${sessionId} is already processing a request. This should not happen as RPC requests are sequential.`,
135+
);
136+
}
137+
138+
sessionWorker.processing = true;
139+
140+
return new Promise((resolve, reject) => {
141+
const onMessage = (msg: any) => {
142+
sessionWorker.worker.off('message', onMessage);
143+
sessionWorker.processing = false;
144+
145+
if (msg.type === 'result') {
146+
resolve(msg.result);
147+
} else if (msg.type === 'error') {
148+
reject(new Error(msg.error));
149+
}
150+
};
151+
152+
sessionWorker.worker.on('message', onMessage);
153+
154+
// Serialize the inputs before sending to worker
155+
const serializedInputs = serializeForeignCallArgs(inputs);
156+
157+
sessionWorker.worker.postMessage({
158+
type: 'process',
159+
functionName,
160+
inputs: serializedInputs,
161+
});
162+
});
163+
}
164+
74165
private fastHashFile(path: string) {
75166
return new Promise(resolve => {
76167
const fd = createReadStream(path);
@@ -201,16 +292,7 @@ class TXEDispatcher {
201292
const { session_id: sessionId, function: functionName, inputs } = callData;
202293
this.logger.debug(`Calling ${functionName} on session ${sessionId}`);
203294

204-
if (!sessions.has(sessionId)) {
205-
this.logger.debug(`Creating new session ${sessionId}`);
206-
if (!this.protocolContracts) {
207-
this.protocolContracts = await Promise.all(
208-
protocolContractNames.map(name => new BundledProtocolContractsProvider().getProtocolContractArtifact(name)),
209-
);
210-
}
211-
sessions.set(sessionId, await TXESession.init(this.protocolContracts));
212-
}
213-
295+
// Process special functions that need preprocessing before sending to worker
214296
switch (functionName) {
215297
case 'txeDeploy': {
216298
await this.#processDeployInputs(callData);
@@ -222,7 +304,11 @@ class TXEDispatcher {
222304
}
223305
}
224306

225-
return await sessions.get(sessionId)!.processFunction(functionName, inputs);
307+
// Get or create a worker for this session
308+
const sessionWorker = await this.getOrCreateWorker(sessionId);
309+
310+
// Send the request to the worker and wait for the response
311+
return await this.sendToWorker(sessionWorker, sessionId, functionName, inputs);
226312
}
227313
}
228314

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import type { ProtocolContract } from '@aztec/protocol-contracts';
2+
3+
import { parentPort, workerData } from 'worker_threads';
4+
5+
import { TXESession } from './txe_session.js';
6+
import type { TXEOracleFunctionName } from './txe_session.js';
7+
import type { ForeignCallArgs, ForeignCallResult } from './util/encoding.js';
8+
import { deserializeForeignCallArgs, deserializeProtocolContracts } from './util/serialization.js';
9+
10+
interface WorkerMessage {
11+
type: 'process';
12+
functionName: TXEOracleFunctionName;
13+
inputs: ForeignCallArgs;
14+
}
15+
16+
interface WorkerResponse {
17+
type: 'result' | 'error';
18+
result?: ForeignCallResult;
19+
error?: string;
20+
}
21+
22+
// This worker handles a single session for its entire lifetime
23+
let session: TXESession | undefined;
24+
25+
async function initSession(protocolContracts: ProtocolContract[]): Promise<TXESession> {
26+
if (!session) {
27+
session = await TXESession.init(protocolContracts);
28+
}
29+
return session;
30+
}
31+
32+
async function processMessage(message: WorkerMessage): Promise<WorkerResponse> {
33+
try {
34+
if (!session) {
35+
throw new Error('Session not initialized');
36+
}
37+
38+
// Deserialize the inputs back to their original form
39+
const deserializedInputs = deserializeForeignCallArgs(message.inputs);
40+
41+
const result = await session.processFunction(message.functionName, deserializedInputs);
42+
43+
return {
44+
type: 'result',
45+
result,
46+
};
47+
} catch (error) {
48+
return {
49+
type: 'error',
50+
error: error instanceof Error ? error.message : String(error),
51+
};
52+
}
53+
}
54+
55+
// Worker initialization and main loop
56+
if (parentPort && workerData) {
57+
const { serializedProtocolContracts } = workerData as { serializedProtocolContracts: string };
58+
59+
// Deserialize protocol contracts
60+
const protocolContracts = deserializeProtocolContracts(serializedProtocolContracts);
61+
62+
// Initialize the session once when the worker starts
63+
initSession(protocolContracts)
64+
.then(() => {
65+
parentPort!.postMessage({ type: 'ready' });
66+
67+
// Process messages for this session
68+
parentPort!.on('message', (message: WorkerMessage) => {
69+
processMessage(message)
70+
.then(response => {
71+
parentPort!.postMessage(response);
72+
})
73+
.catch(error => {
74+
parentPort!.postMessage({
75+
type: 'error',
76+
error: error instanceof Error ? error.message : String(error),
77+
});
78+
});
79+
});
80+
})
81+
.catch(error => {
82+
parentPort!.postMessage({
83+
type: 'error',
84+
error: `Failed to initialize session: ${error}`,
85+
});
86+
});
87+
}

0 commit comments

Comments
 (0)