Skip to content
This repository was archived by the owner on Jul 10, 2025. It is now read-only.

Commit defe961

Browse files
authored
Implement peer.timeout built-in function (#101)
1 parent 2de8191 commit defe961

File tree

7 files changed

+181
-78
lines changed

7 files changed

+181
-78
lines changed

package-lock.json

Lines changed: 4 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/__test__/integration/avm.spec.ts

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { FluencePeer } from '../../index';
1+
import { FluencePeer, setLogLevel } from '../../index';
22
import { Particle } from '../../internal/Particle';
33
import { handleTimeout } from '../../internal/utils';
44
import { registerHandlersHelper } from '../util';
@@ -72,4 +72,88 @@ describe('Avm spec', () => {
7272

7373
await peer.stop();
7474
});
75+
76+
it('Timeout in par call: race', async () => {
77+
// arrange
78+
const peer = new FluencePeer();
79+
await peer.start();
80+
81+
// act
82+
const promise = new Promise((resolve, reject) => {
83+
const script = `
84+
(seq
85+
(call %init_peer_id% ("op" "identity") ["slow_result"] arg)
86+
(seq
87+
(par
88+
(call %init_peer_id% ("peer" "timeout") [1000 arg] $result)
89+
(call %init_peer_id% ("op" "identity") ["fast_result"] $result)
90+
)
91+
(call %init_peer_id% ("return" "return") [$result.$[0]])
92+
)
93+
)
94+
`;
95+
const particle = Particle.createNew(script);
96+
registerHandlersHelper(peer, particle, {
97+
return: {
98+
return: (args) => {
99+
resolve(args[0]);
100+
},
101+
},
102+
});
103+
104+
peer.internals.initiateParticle(particle, handleTimeout(reject));
105+
});
106+
107+
// assert
108+
const res = await promise;
109+
expect(res).toBe('fast_result');
110+
111+
await peer.stop();
112+
});
113+
114+
it('Timeout in par call: wait', async () => {
115+
// arrange
116+
const peer = new FluencePeer();
117+
await peer.start();
118+
119+
// act
120+
const promise = new Promise((resolve, reject) => {
121+
const script = `
122+
(seq
123+
(call %init_peer_id% ("op" "identity") ["timeout_msg"] arg)
124+
(seq
125+
(seq
126+
(par
127+
(call %init_peer_id% ("peer" "timeout") [1000 arg] $ok_or_err)
128+
(call "invalid_peer" ("op" "identity") ["never"] $ok_or_err)
129+
)
130+
(xor
131+
(match $ok_or_err.$[0] "timeout_msg"
132+
(ap "failed_with_timeout" $result)
133+
)
134+
(ap "impossible happened" $result)
135+
)
136+
)
137+
(call %init_peer_id% ("return" "return") [$result.$[0]])
138+
)
139+
)
140+
`;
141+
const particle = Particle.createNew(script);
142+
registerHandlersHelper(peer, particle, {
143+
return: {
144+
return: (args) => {
145+
resolve(args[0]);
146+
},
147+
},
148+
});
149+
150+
peer.internals.initiateParticle(particle, handleTimeout(reject));
151+
});
152+
153+
// assert
154+
const res = await promise;
155+
expect(res).toBe('failed_with_timeout');
156+
157+
await peer.stop();
158+
});
75159
});

src/__test__/unit/builtInHandler.spec.ts

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,39 +5,44 @@ import { defaultServices } from '../../internal/defaultServices';
55
describe('Tests for default handler', () => {
66
// prettier-ignore
77
each`
8-
fnName | args | retCode | result
9-
${'identity'} | ${[]} | ${0} | ${{}}
10-
${'identity'} | ${[1]} | ${0} | ${1}
11-
${'identity'} | ${[1, 2]} | ${1} | ${'identity accepts up to 1 arguments, received 2 arguments'}
12-
13-
${'noop'} | ${[1, 2]} | ${0} | ${{}}
14-
15-
${'array'} | ${[1, 2, 3]} | ${0} | ${[1, 2, 3]}
16-
17-
${'concat'} | ${[[1, 2], [3, 4], [5, 6]]} | ${0} | ${[1, 2, 3, 4, 5, 6]}
18-
${'concat'} | ${[[1, 2]]} | ${0} | ${[1, 2]}
19-
${'concat'} | ${[]} | ${0} | ${[]}
20-
${'concat'} | ${[1, [1, 2], 1]} | ${1} | ${"All arguments of 'concat' must be arrays: arguments 0, 2 are not"}
8+
serviceId | fnName | args | retCode | result
9+
${'op'} | ${'identity'} | ${[]} | ${0} | ${{}}
10+
${'op'} | ${'identity'} | ${[1]} | ${0} | ${1}
11+
${'op'} | ${'identity'} | ${[1, 2]} | ${1} | ${'identity accepts up to 1 arguments, received 2 arguments'}
2112
22-
${'string_to_b58'} | ${["test"]} | ${0} | ${"3yZe7d"}
23-
${'string_to_b58'} | ${["test", 1]} | ${1} | ${"string_to_b58 accepts only one string argument"}
24-
25-
${'string_from_b58'} | ${["3yZe7d"]} | ${0} | ${"test"}
26-
${'string_from_b58'} | ${["3yZe7d", 1]} | ${1} | ${"string_from_b58 accepts only one string argument"}
27-
28-
${'bytes_to_b58'} | ${[[116, 101, 115, 116]]} | ${0} | ${"3yZe7d"}
29-
${'bytes_to_b58'} | ${[[116, 101, 115, 116], 1]} | ${1} | ${"bytes_to_b58 accepts only single argument: array of numbers"}
30-
31-
${'bytes_from_b58'} | ${["3yZe7d"]} | ${0} | ${[116, 101, 115, 116]}
32-
${'bytes_from_b58'} | ${["3yZe7d", 1]} | ${1} | ${"bytes_from_b58 accepts only one string argument"}
13+
${'op'} | ${'noop'} | ${[1, 2]} | ${0} | ${{}}
14+
15+
${'op'} | ${'array'} | ${[1, 2, 3]} | ${0} | ${[1, 2, 3]}
16+
17+
${'op'} | ${'concat'} | ${[[1, 2], [3, 4], [5, 6]]} | ${0} | ${[1, 2, 3, 4, 5, 6]}
18+
${'op'} | ${'concat'} | ${[[1, 2]]} | ${0} | ${[1, 2]}
19+
${'op'} | ${'concat'} | ${[]} | ${0} | ${[]}
20+
${'op'} | ${'concat'} | ${[1, [1, 2], 1]} | ${1} | ${"All arguments of 'concat' must be arrays: arguments 0, 2 are not"}
3321
34-
`.test(
22+
${'op'} | ${'string_to_b58'} | ${["test"]} | ${0} | ${"3yZe7d"}
23+
${'op'} | ${'string_to_b58'} | ${["test", 1]} | ${1} | ${"string_to_b58 accepts only one string argument"}
24+
25+
${'op'} | ${'string_from_b58'} | ${["3yZe7d"]} | ${0} | ${"test"}
26+
${'op'} | ${'string_from_b58'} | ${["3yZe7d", 1]} | ${1} | ${"string_from_b58 accepts only one string argument"}
27+
28+
${'op'} | ${'bytes_to_b58'} | ${[[116, 101, 115, 116]]} | ${0} | ${"3yZe7d"}
29+
${'op'} | ${'bytes_to_b58'} | ${[[116, 101, 115, 116], 1]} | ${1} | ${"bytes_to_b58 accepts only single argument: array of numbers"}
30+
31+
${'op'} | ${'bytes_from_b58'} | ${["3yZe7d"]} | ${0} | ${[116, 101, 115, 116]}
32+
${'op'} | ${'bytes_from_b58'} | ${["3yZe7d", 1]} | ${1} | ${"bytes_from_b58 accepts only one string argument"}
33+
34+
${'peer'} | ${'timeout'} | ${[200, []]} | ${0} | ${[]}}
35+
${'peer'} | ${'timeout'} | ${[200, ['test']]} | ${0} | ${['test']}}
36+
${'peer'} | ${'timeout'} | ${[]} | ${1} | ${'timeout accepts exactly two arguments: timeout duration in ms and an optional message string'}}
37+
${'peer'} | ${'timeout'} | ${[200, 'test', 1]} | ${1} | ${'timeout accepts exactly two arguments: timeout duration in ms and an optional message string'}}
38+
39+
`.test(
3540
//
3641
'$fnName with $args expected retcode: $retCode and result: $result',
37-
async ({ fnName, args, retCode, result }) => {
42+
async ({ serviceId, fnName, args, retCode, result }) => {
3843
// arrange
3944
const req: CallServiceData = {
40-
serviceId: 'op',
45+
serviceId: serviceId,
4146
fnName: fnName,
4247
args: args,
4348
tetraplets: [],

src/internal/FluencePeer.ts

Lines changed: 38 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,11 @@ export class FluencePeer {
389389
});
390390

391391
this._outgoingParticles.subscribe(async (item) => {
392+
if (!this._connection) {
393+
item.particle.logTo('error', 'cannot send particle, peer is not connected');
394+
item.onStageChange({ stage: 'sendingError' });
395+
return;
396+
}
392397
await this._connection.sendParticle(item.particle);
393398
item.onStageChange({ stage: 'sent' });
394399
});
@@ -442,13 +447,37 @@ export class FluencePeer {
442447
// execute call requests if needed
443448
// and put particle with the results back to queue
444449
if (result.callRequests.length > 0) {
445-
this._execCallRequests(particle, result.callRequests).then((callResults) => {
446-
const newParticle = particle.clone();
447-
newParticle.callResults = callResults;
448-
newParticle.data = Buffer.from([]);
449-
450-
particlesQueue.next({ ...item, particle: newParticle });
451-
});
450+
for (let [key, cr] of result.callRequests) {
451+
const req = {
452+
fnName: cr.functionName,
453+
args: cr.arguments,
454+
serviceId: cr.serviceId,
455+
tetraplets: cr.tetraplets,
456+
particleContext: particle.getParticleContext(),
457+
};
458+
459+
this._execSingleCallRequest(req)
460+
.catch(
461+
(err): CallServiceResult => ({
462+
retCode: ResultCodes.exceptionInHandler,
463+
result: `Handler failed. fnName="${req.fnName}" serviceId="${
464+
req.serviceId
465+
}" error: ${err.toString()}`,
466+
}),
467+
)
468+
.then((res) => {
469+
const serviceResult = {
470+
result: JSON.stringify(res.result),
471+
retCode: res.retCode,
472+
};
473+
474+
const newParticle = particle.clone();
475+
newParticle.callResults = [[key, serviceResult]];
476+
newParticle.data = Buffer.from([]);
477+
478+
particlesQueue.next({ ...item, particle: newParticle });
479+
});
480+
}
452481
} else {
453482
item.onStageChange({ stage: 'localWorkDone' });
454483
}
@@ -457,44 +486,8 @@ export class FluencePeer {
457486
return particlesQueue;
458487
}
459488

460-
private async _execCallRequests(p: Particle, callRequests: CallRequestsArray): Promise<CallResultsArray> {
461-
// execute all requests asynchronously
462-
const promises = callRequests.map(([key, callRequest]) => {
463-
const req = {
464-
fnName: callRequest.functionName,
465-
args: callRequest.arguments,
466-
serviceId: callRequest.serviceId,
467-
tetraplets: callRequest.tetraplets,
468-
particleContext: p.getParticleContext(),
469-
};
470-
471-
// execute single requests and catch possible errors
472-
const promise = this._execSingleCallRequest(req)
473-
.catch(
474-
(err): CallServiceResult => ({
475-
retCode: ResultCodes.exceptionInHandler,
476-
result: `Handler failed. fnName="${req.fnName}" serviceId="${
477-
req.serviceId
478-
}" error: ${err.toString()}`,
479-
}),
480-
)
481-
.then(
482-
(res): AvmCallServiceResult => ({
483-
result: JSON.stringify(res.result),
484-
retCode: res.retCode,
485-
}),
486-
)
487-
.then((res): [key: number, res: AvmCallServiceResult] => [key, res]);
488-
489-
return promise;
490-
});
491-
// don't block
492-
const res = await Promise.all(promises);
493-
log.debug(`Executed call service for particle id=${p.id}, Call service results: `, res);
494-
return res;
495-
}
496-
497489
private async _execSingleCallRequest(req: CallServiceData): Promise<CallServiceResult> {
490+
log.debug('executing call service handler', req);
498491
const particleId = req.particleContext.particleId;
499492

500493
// trying particle-specific handler
@@ -545,6 +538,7 @@ export class FluencePeer {
545538
res.result = null;
546539
}
547540

541+
log.debug('executed call service handler, req and res are: ', req, res);
548542
return res;
549543
}
550544

src/internal/Particle.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ export type ParticleExecutionStage =
146146
| { stage: 'interpreterError'; errorMessage: string }
147147
| { stage: 'localWorkDone' }
148148
| { stage: 'sent' }
149+
| { stage: 'sendingError' }
149150
| { stage: 'expired' };
150151

151152
export interface ParticleQueueItem {

src/internal/compilerSupport/v2.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,10 @@ export function callFunction(rawFnArgs: Array<any>, def: FunctionCallDef, script
328328
resolve(undefined);
329329
}
330330

331+
if (stage.stage === 'sendingError') {
332+
reject(`Could not send particle for ${def.functionName}: not connected`);
333+
}
334+
331335
if (stage.stage === 'expired') {
332336
reject(`Request timed out after ${particle.ttl} for ${def.functionName}`);
333337
}

src/internal/defaultServices.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,23 @@ export const defaultServices: { [serviceId in string]: { [fnName in string]: Gen
9999
},
100100

101101
peer: {
102+
timeout: (req) => {
103+
if (req.args.length !== 2) {
104+
return error(
105+
'timeout accepts exactly two arguments: timeout duration in ms and an optional message string',
106+
);
107+
}
108+
const durationMs = req.args[0];
109+
const message = req.args[1];
110+
111+
return new Promise((resolve) => {
112+
setTimeout(() => {
113+
const res = success(message);
114+
resolve(res);
115+
}, durationMs);
116+
});
117+
},
118+
102119
identify: (req) => {
103120
return error('The JS implementation of Peer does not support identify');
104121
},

0 commit comments

Comments
 (0)