Skip to content
This repository was archived by the owner on Jul 10, 2025. It is now read-only.

Commit 89d4247

Browse files
committed
remove processing queue, merge ParticleProcessor logic into fluence client
1 parent b0ed007 commit 89d4247

File tree

4 files changed

+121
-198
lines changed

4 files changed

+121
-198
lines changed

src/api.unstable.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ export const createClient = async (
8282
}
8383

8484
const client = new ClientImpl(peerId);
85+
await client.initAquamarineRuntime();
8586

8687
if (connectTo) {
8788
let theAddress: Multiaddr;
@@ -96,8 +97,6 @@ export const createClient = async (
9697
if (!(await checkConnection(client))) {
9798
throw new Error('Connection check failed. Check if the node is working or try to connect to another node');
9899
}
99-
} else {
100-
await client.local();
101100
}
102101

103102
return client;

src/internal/ClientImpl.ts

Lines changed: 89 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@ import * as PeerId from 'peer-id';
1818
import Multiaddr from 'multiaddr';
1919
import { FluenceConnection } from './FluenceConnection';
2020

21-
import { ParticleProcessor } from './ParticleProcessor';
22-
import { PeerIdB58, SecurityTetraplet } from './commonTypes';
21+
import { CallServiceResult, ParticleHandler, PeerIdB58, SecurityTetraplet } from './commonTypes';
2322
import { FluenceClient } from 'src';
2423
import { RequestFlow } from './RequestFlow';
2524
import { AquaCallHandler, errorHandler, fnHandler } from './AquaHandler';
2625
import { loadRelayFn, loadVariablesService } from './RequestFlowBuilder';
26+
import { logParticle, Particle } from './particle';
27+
import log from 'loglevel';
28+
import { AquamarineInterpreter } from './aqua/interpreter';
2729

2830
const makeDefaultClientHandler = (): AquaCallHandler => {
2931
const res = new AquaCallHandler();
@@ -35,6 +37,10 @@ const makeDefaultClientHandler = (): AquaCallHandler => {
3537
export class ClientImpl implements FluenceClient {
3638
readonly selfPeerIdFull: PeerId;
3739

40+
private requests: Map<string, RequestFlow> = new Map();
41+
private currentRequestId: string | null = null;
42+
private watchDog;
43+
3844
get relayPeerId(): PeerIdB58 | undefined {
3945
return this.connection?.nodePeerId.toB58String();
4046
}
@@ -48,12 +54,11 @@ export class ClientImpl implements FluenceClient {
4854
}
4955

5056
private connection: FluenceConnection;
51-
protected processor: ParticleProcessor;
57+
private interpreter: AquamarineInterpreter;
5258

5359
constructor(selfPeerIdFull: PeerId) {
5460
this.selfPeerIdFull = selfPeerIdFull;
5561
this.aquaCallHandler = makeDefaultClientHandler();
56-
this.processor = new ParticleProcessor(selfPeerIdFull, this.aquaCallHandler);
5762
}
5863

5964
aquaCallHandler: AquaCallHandler;
@@ -62,20 +67,16 @@ export class ClientImpl implements FluenceClient {
6267
if (this.connection) {
6368
await this.connection.disconnect();
6469
}
65-
await this.processor.destroy();
70+
this.clearWathcDog();
6671
}
6772

68-
// HACK:: this is only needed to fix tests.
69-
// Particle processor should be tested instead
70-
async local(): Promise<void> {
71-
await this.processor.init();
73+
async initAquamarineRuntime(): Promise<void> {
74+
this.interpreter = await AquamarineInterpreter.create({
75+
particleHandler: this.interpreterCallback.bind(this),
76+
peerId: this.selfPeerIdFull,
77+
});
7278
}
7379

74-
/**
75-
* Establish a connection to the node. If the connection is already established, disconnect and reregister all services in a new connection.
76-
*
77-
* @param multiaddr
78-
*/
7980
async connect(multiaddr: string | Multiaddr): Promise<void> {
8081
multiaddr = Multiaddr(multiaddr);
8182

@@ -93,11 +94,11 @@ export class ClientImpl implements FluenceClient {
9394
multiaddr,
9495
node,
9596
this.selfPeerIdFull,
96-
this.processor.executeIncomingParticle.bind(this.processor),
97+
this.executeIncomingParticle.bind(this),
9798
);
9899
await connection.connect();
99100
this.connection = connection;
100-
await this.processor.init(connection);
101+
this.initWatchDog();
101102
}
102103

103104
async initiateFlow(request: RequestFlow): Promise<void> {
@@ -106,6 +107,77 @@ export class ClientImpl implements FluenceClient {
106107
return this.relayPeerId || '';
107108
});
108109
await request.initState(this.selfPeerIdFull);
109-
this.processor.executeLocalParticle(request);
110+
111+
logParticle(log.debug, 'executing local particle', request.getParticle());
112+
request.handler.combineWith(this.aquaCallHandler);
113+
this.requests.set(request.id, request);
114+
115+
await this.processRequest(request);
116+
}
117+
118+
async executeIncomingParticle(particle: Particle) {
119+
logParticle(log.debug, 'external particle received', particle);
120+
121+
let request = this.requests.get(particle.id);
122+
if (request) {
123+
request.receiveUpdate(particle);
124+
} else {
125+
request = RequestFlow.createExternal(particle);
126+
request.handler.combineWith(this.aquaCallHandler);
127+
}
128+
this.requests.set(request.id, request);
129+
130+
await this.processRequest(request);
131+
}
132+
133+
private async processRequest(request: RequestFlow): Promise<void> {
134+
try {
135+
this.currentRequestId = request.id;
136+
request.execute(this.interpreter, this.connection);
137+
} catch (err) {
138+
log.error('particle processing failed: ' + err);
139+
} finally {
140+
this.currentRequestId = null;
141+
}
142+
}
143+
144+
private interpreterCallback: ParticleHandler = (
145+
serviceId: string,
146+
fnName: string,
147+
args: any[],
148+
tetraplets: SecurityTetraplet[][],
149+
): CallServiceResult => {
150+
if (this.currentRequestId === null) {
151+
throw Error('current request can`t be null here');
152+
}
153+
154+
const request = this.requests.get(this.currentRequestId);
155+
const res = request.handler.execute({
156+
serviceId,
157+
fnName,
158+
args,
159+
tetraplets,
160+
particleContext: {
161+
particleId: request.id,
162+
},
163+
});
164+
return {
165+
ret_code: res.retCode,
166+
result: JSON.stringify(res.result || {}),
167+
};
168+
};
169+
170+
private initWatchDog() {
171+
this.watchDog = setInterval(() => {
172+
for (let key in this.requests.keys) {
173+
if (this.requests.get(key).hasExpired()) {
174+
this.requests.delete(key);
175+
}
176+
}
177+
}, 5000);
178+
}
179+
180+
private clearWathcDog() {
181+
clearInterval(this.watchDog);
110182
}
111183
}

src/internal/ParticleProcessor.ts

Lines changed: 0 additions & 178 deletions
This file was deleted.

0 commit comments

Comments
 (0)