Skip to content
This repository was archived by the owner on Jul 10, 2025. It is now read-only.

Commit ba537c7

Browse files
authored
Big refactoring (#8)
Big codebase refactoring. * Multiple clients are allowed on the same browser instance * Particle queue processing is split from particle handling logic * Public AIP is completely rethought * Updated project file structure. Clean exports for public api methods * Additional unit tests
1 parent c7f9410 commit ba537c7

39 files changed

+1398
-1333
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ bundle/
1212

1313
# Dependency directories
1414
node_modules/
15-
jspm_packages/
15+
jspm_packages/
16+
/dist/

package.json

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,12 @@
22
"name": "@fluencelabs/fluence",
33
"version": "0.8.0",
44
"description": "JS SDK for the Fluence network",
5-
"main": "./dist/fluence.js",
6-
"typings": "./dist/fluence.d.ts",
5+
"main": "./dist/index.js",
6+
"typings": "./dist/index.d.ts",
77
"scripts": {
88
"test": "mocha --timeout 10000 -r esm -r ts-node/register src/**/*.spec.ts",
9-
"test-ts": "ts-mocha --timeout 10000 -r esm -p tsconfig.json src/**/*.spec.ts",
10-
"package:build": "NODE_ENV=production webpack && npm run package",
11-
"package": "tsc && rsync -r src/aqua/*.js dist/aqua",
12-
"start": "webpack-dev-server -p",
13-
"build": "webpack --mode production"
9+
"build": "tsc && rsync -r src/internal/aqua/*.js dist/internal/aqua",
10+
"build:webpack": "webpack --mode production"
1411
},
1512
"repository": "https://github.com/fluencelabs/fluence-js",
1613
"author": "Fluence Labs",
@@ -48,7 +45,7 @@
4845
"text-encoding": "^0.7.0",
4946
"ts-loader": "7.0.5",
5047
"ts-mocha": "8.0.0",
51-
"typescript": "3.9.5",
48+
"typescript": "^3.9.5",
5249
"webpack": "4.43.0",
5350
"webpack-cli": "3.3.11",
5451
"webpack-dev-server": "3.11.0"

src/FluenceClient.ts

Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
/*
2+
* Copyright 2020 Fluence Labs Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import log from 'loglevel';
18+
import PeerId from 'peer-id';
19+
import { SecurityTetraplet, StepperOutcome } from './internal/commonTypes';
20+
import { FluenceClientBase } from './internal/FluenceClientBase';
21+
import { build, genUUID, ParticleDto } from './internal/particle';
22+
import { ParticleProcessor } from './internal/ParticleProcessor';
23+
import { ParticleProcessorStrategy } from './internal/ParticleProcessorStrategy';
24+
25+
const fetchCallbackServiceName = '__callback';
26+
const selfRelayVarName = '__relay';
27+
28+
const wrapRelayBasedCall = (script: string) => {
29+
return `
30+
(seq
31+
(call ${selfRelayVarName} ("op" "identity") [])
32+
${script}
33+
)
34+
`;
35+
};
36+
37+
const wrapFetchCall = (script: string, particleId: string, resultArgNames: string[]) => {
38+
// TODO: sanitize
39+
const resultTogether = resultArgNames.join(' ');
40+
let res = `
41+
(seq
42+
${script}
43+
(seq
44+
(call ${selfRelayVarName} ("op" "identity") [])
45+
(call %init_peer_id% ("${fetchCallbackServiceName}" "${particleId}") [${resultTogether}])
46+
)
47+
)`;
48+
return wrapRelayBasedCall(res);
49+
};
50+
51+
export interface FluenceEvent {
52+
type: string;
53+
args: any[];
54+
}
55+
56+
export type FluenceEventHandler = (event: FluenceEvent) => void;
57+
58+
export class FluenceClient extends FluenceClientBase {
59+
private eventSubscribers: Map<string, FluenceEventHandler[]> = new Map();
60+
private eventValidators: Map<string, Function> = new Map();
61+
private callbacks: Map<string, Function> = new Map();
62+
private fetchParticles: Map<string, { resolve: Function; reject: Function }> = new Map();
63+
64+
constructor(selfPeerId: PeerId) {
65+
super(selfPeerId);
66+
this.processor = new ParticleProcessor(this.strategy, selfPeerId);
67+
}
68+
69+
async fetch<T>(script: string, resultArgNames: string[], data?: Map<string, any>, ttl?: number): Promise<T> {
70+
data = this.addRelayToArgs(data);
71+
const callBackId = genUUID();
72+
script = wrapFetchCall(script, callBackId, resultArgNames);
73+
const particle = await build(this.selfPeerIdFull, script, data, ttl, callBackId);
74+
75+
return new Promise<T>((resolve, reject) => {
76+
this.fetchParticles.set(callBackId, { resolve, reject });
77+
this.processor.executeLocalParticle(particle);
78+
});
79+
}
80+
81+
// TODO:: better naming probably?
82+
async fireAndForget(script: string, data?: Map<string, any>, ttl?: number) {
83+
data = this.addRelayToArgs(data);
84+
script = wrapRelayBasedCall(script);
85+
86+
await this.sendScript(script, data, ttl);
87+
}
88+
89+
registerEvent(
90+
channel: string,
91+
eventName: string,
92+
validate?: (channel: string, eventName: string, args: any[], tetraplets: any[][]) => boolean,
93+
) {
94+
if (!validate) {
95+
validate = (c, e, a, t) => true;
96+
}
97+
98+
this.eventValidators.set(`${channel}/${eventName}`, validate);
99+
}
100+
101+
unregisterEvent(channel: string, eventName: string) {
102+
this.eventValidators.delete(`${channel}/${eventName}`);
103+
}
104+
105+
registerCallback(
106+
serviceId: string,
107+
fnName: string,
108+
callback: (args: any[], tetraplets: SecurityTetraplet[][]) => object,
109+
) {
110+
this.callbacks.set(`${serviceId}/${fnName}`, callback);
111+
}
112+
113+
unregisterCallback(channel: string, eventName: string) {
114+
this.eventValidators.delete(`${channel}/${eventName}`);
115+
}
116+
117+
subscribe(channel: string, handler: FluenceEventHandler) {
118+
if (!this.eventSubscribers.get(channel)) {
119+
this.eventSubscribers.set(channel, []);
120+
}
121+
122+
this.eventSubscribers.get(channel).push(handler);
123+
}
124+
125+
protected strategy: ParticleProcessorStrategy = {
126+
particleHandler: (serviceId: string, fnName: string, args: any[], tetraplets: SecurityTetraplet[][]) => {
127+
// missing built-in op
128+
if (serviceId === 'op' && fnName === 'identity') {
129+
return {
130+
ret_code: 0,
131+
result: JSON.stringify(args),
132+
};
133+
}
134+
135+
// async fetch model handling
136+
if (serviceId === fetchCallbackServiceName) {
137+
const executingParticlePromiseFns = this.fetchParticles.get(fnName);
138+
if (executingParticlePromiseFns) {
139+
// don't block
140+
setImmediate(() => {
141+
this.fetchParticles.delete(fnName);
142+
executingParticlePromiseFns.resolve(args);
143+
});
144+
}
145+
146+
return {
147+
ret_code: 0,
148+
result: JSON.stringify({}),
149+
};
150+
}
151+
152+
// event model handling
153+
const eventPair = `${serviceId}/${fnName}`;
154+
const eventValidator = this.eventValidators.get(eventPair);
155+
if (eventValidator) {
156+
try {
157+
if (!eventValidator(serviceId, fnName, args, tetraplets)) {
158+
return {
159+
ret_code: 1, // TODO:: error codes
160+
result: 'validation failed',
161+
};
162+
}
163+
} catch (e) {
164+
log.error('error running validation function: ' + e);
165+
return {
166+
ret_code: 1, // TODO:: error codes
167+
result: 'validation failed',
168+
};
169+
}
170+
171+
// don't block
172+
setImmediate(() => {
173+
this.pushEvent(serviceId, {
174+
type: fnName,
175+
args: args,
176+
});
177+
});
178+
179+
return {
180+
ret_code: 0,
181+
result: JSON.stringify({}),
182+
};
183+
}
184+
185+
// callback model handling
186+
const callback = this.callbacks.get(eventPair);
187+
if (callback) {
188+
try {
189+
const res = callback(args, tetraplets);
190+
return {
191+
ret_code: 0,
192+
result: JSON.stringify(res),
193+
};
194+
} catch (e) {
195+
return {
196+
ret_code: 1, // TODO:: error codes
197+
result: JSON.stringify(e),
198+
};
199+
}
200+
}
201+
202+
return {
203+
ret_code: 1,
204+
result: `Error. There is no service: ${serviceId}`,
205+
};
206+
},
207+
208+
sendParticleFurther: async (particle: ParticleDto) => {
209+
try {
210+
await this.connection.sendParticle(particle);
211+
} catch (reason) {
212+
log.error(`Error on sending particle with id ${particle.id}: ${reason}`);
213+
}
214+
},
215+
216+
onParticleTimeout: (particle: ParticleDto, now: number) => {
217+
log.info(`Particle expired. Now: ${now}, ttl: ${particle.ttl}, ts: ${particle.timestamp}`);
218+
const executingParticle = this.fetchParticles.get(particle.id);
219+
if (executingParticle) {
220+
executingParticle.reject(new Error(`particle ${particle.id} timed out`));
221+
}
222+
},
223+
onLocalParticleRecieved: (particle: ParticleDto) => {},
224+
onExternalParticleRecieved: (particle: ParticleDto) => {},
225+
onStepperExecuting: (particle: ParticleDto) => {},
226+
onStepperExecuted: (stepperOutcome: StepperOutcome) => {
227+
log.info('inner interpreter outcome:');
228+
log.info(stepperOutcome);
229+
},
230+
};
231+
232+
private pushEvent(channel: string, event: FluenceEvent) {
233+
const subs = this.eventSubscribers.get(channel);
234+
if (subs) {
235+
for (let sub of subs) {
236+
sub(event);
237+
}
238+
}
239+
}
240+
241+
private addRelayToArgs(data: Map<string, any>) {
242+
if (data === undefined) {
243+
data = new Map();
244+
}
245+
246+
if (!data.has(selfRelayVarName)) {
247+
data.set(selfRelayVarName, this.relayPeerId);
248+
}
249+
250+
return data;
251+
}
252+
}

0 commit comments

Comments
 (0)