Skip to content
This repository was archived by the owner on Jul 10, 2025. It is now read-only.

Commit e0a970d

Browse files
Akimshamsartem
andauthored
fix: Fire and forget [fixes DXJ-446] (#336)
* Add test for particle and try to fix it * Fix build * Fix tests * Fix stop order * FluencePeer refactoring * mplex to yamux * Small fixes * Refactor connections * Update packages/core/js-client/src/jsPeer/FluencePeer.ts Co-authored-by: shamsartem <[email protected]> * Remove redundant checks * Update packages/core/js-client/src/jsPeer/FluencePeer.ts Co-authored-by: shamsartem <[email protected]> * Suppress very long output of raw data * Test for parallel execution * Fix test * Misc optimization * Add minRepr * Fix reset error * Latest default nox image --------- Co-authored-by: shamsartem <[email protected]>
1 parent b79039d commit e0a970d

File tree

15 files changed

+698
-446
lines changed

15 files changed

+698
-446
lines changed

.github/workflows/tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ on:
66
nox-image:
77
description: "nox image tag"
88
type: string
9-
default: "fluencelabs/nox:unstable_minimal"
9+
default: "fluencelabs/nox:minimal_0.2.5"
1010
avm-version:
1111
description: "@fluencelabs/avm version"
1212
type: string
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import "@fluencelabs/aqua-lib/builtin.aqua"
2+
export test
3+
4+
func test():
5+
on HOST_PEER_ID:
6+
Op.noop()
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/* eslint-disable */
2+
// @ts-nocheck
3+
/**
4+
*
5+
* This file is auto-generated. Do not edit manually: changes may be erased.
6+
* Generated by Aqua compiler: https://github.com/fluencelabs/aqua/.
7+
* If you find any bugs, please write an issue on GitHub: https://github.com/fluencelabs/aqua/issues
8+
* Aqua version: 0.12.0
9+
*
10+
*/
11+
import type { IFluenceClient as IFluenceClient$$, CallParams as CallParams$$ } from '@fluencelabs/js-client';
12+
import {
13+
v5_callFunction as callFunction$$,
14+
v5_registerService as registerService$$,
15+
} from '@fluencelabs/js-client';
16+
17+
18+
19+
// Services
20+
21+
// Functions
22+
export const test_script = `
23+
(seq
24+
(call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-)
25+
(xor
26+
(xor
27+
(call -relay- ("op" "noop") [])
28+
(fail %last_error%)
29+
)
30+
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 0])
31+
)
32+
)
33+
`
34+
35+
36+
export function test(
37+
config?: {ttl?: number}
38+
): Promise<void>;
39+
40+
export function test(
41+
peer: IFluenceClient$$,
42+
config?: {ttl?: number}
43+
): Promise<void>;
44+
45+
export function test(...args: any) {
46+
47+
48+
return callFunction$$(
49+
args,
50+
{
51+
"functionName" : "test",
52+
"arrow" : {
53+
"tag" : "arrow",
54+
"domain" : {
55+
"tag" : "labeledProduct",
56+
"fields" : {
57+
58+
}
59+
},
60+
"codomain" : {
61+
"tag" : "nil"
62+
}
63+
},
64+
"names" : {
65+
"relay" : "-relay-",
66+
"getDataSrv" : "getDataSrv",
67+
"callbackSrv" : "callbackSrv",
68+
"responseSrv" : "callbackSrv",
69+
"responseFnName" : "response",
70+
"errorHandlingSrv" : "errorHandlingSrv",
71+
"errorFnName" : "error"
72+
}
73+
},
74+
test_script
75+
)
76+
}
77+
78+
/* eslint-enable */

packages/@tests/aqua/src/_aqua/smoke_test.ts

Lines changed: 204 additions & 204 deletions
Large diffs are not rendered by default.

packages/@tests/aqua/src/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { fromByteArray } from 'base64-js';
22
import { Fluence } from '@fluencelabs/js-client';
33
import type { ClientConfig } from '@fluencelabs/js-client';
44
import { registerHelloWorld, helloTest, marineTest, resourceTest } from './_aqua/smoke_test.js';
5+
import { test as particleTest } from './_aqua/finalize_particle.js';
56
import { wasm } from './wasmb64.js';
67

78
const relay = {
@@ -67,6 +68,10 @@ export const runTest = async (): Promise<TestResult> => {
6768

6869
console.log('running marine test...');
6970
const marine = await marineTest(wasm);
71+
72+
console.log('running particle test...');
73+
await particleTest();
74+
7075
console.log('marine test finished, result: ', marine);
7176

7277
const returnVal = {

packages/@tests/aqua/tsconfig.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
{
22
"extends": "../../../tsconfig.json",
33
"compilerOptions": {
4-
"outDir": "./dist"
4+
"outDir": "./dist",
5+
"module": "NodeNext"
56
},
67
"exclude": ["node_modules", "dist"]
78
}

packages/core/js-client/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@
2727
"license": "Apache-2.0",
2828
"dependencies": {
2929
"@chainsafe/libp2p-noise": "13.0.0",
30+
"@chainsafe/libp2p-yamux": "5.0.0",
3031
"@fluencelabs/interfaces": "workspace:*",
3132
"@libp2p/crypto": "2.0.3",
3233
"@libp2p/interface": "0.1.2",
33-
"@libp2p/mplex": "9.0.4",
3434
"@libp2p/peer-id": "3.0.2",
3535
"@libp2p/peer-id-factory": "3.0.3",
3636
"@libp2p/websockets": "7.0.4",

packages/core/js-client/src/clientPeer/ClientPeer.ts

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,20 +55,13 @@ export const makeClientPeerConfig = async (
5555
};
5656

5757
export class ClientPeer extends FluencePeer implements IFluenceClient {
58-
private relayPeerId: PeerIdB58;
59-
private relayConnection: RelayConnection;
60-
6158
constructor(
6259
peerConfig: PeerConfig,
6360
relayConfig: RelayConnectionConfig,
6461
keyPair: KeyPair,
6562
marine: IMarineHost,
6663
) {
67-
const relayConnection = new RelayConnection(relayConfig);
68-
69-
super(peerConfig, keyPair, marine, new JsServiceHost(), relayConnection);
70-
this.relayPeerId = relayConnection.getRelayPeerId();
71-
this.relayConnection = relayConnection;
64+
super(peerConfig, keyPair, marine, new JsServiceHost(), new RelayConnection(relayConfig));
7265
}
7366

7467
getPeerId(): string {
@@ -83,7 +76,7 @@ export class ClientPeer extends FluencePeer implements IFluenceClient {
8376
connectionStateChangeHandler: (state: ConnectionState) => void = () => {};
8477

8578
getRelayPeerId(): string {
86-
return this.relayPeerId;
79+
return this.internals.getRelayPeerId();
8780
}
8881

8982
onConnectionStateChange(handler: (state: ConnectionState) => void): ConnectionState {
@@ -115,7 +108,6 @@ export class ClientPeer extends FluencePeer implements IFluenceClient {
115108
log.trace('connecting to Fluence network');
116109
this.changeConnectionState('connecting');
117110
await super.start();
118-
await this.relayConnection.start();
119111
// TODO: check connection (`checkConnection` function) here
120112
this.changeConnectionState('connected');
121113
log.trace('connected');
@@ -124,7 +116,6 @@ export class ClientPeer extends FluencePeer implements IFluenceClient {
124116
async stop(): Promise<void> {
125117
log.trace('disconnecting from Fluence network');
126118
this.changeConnectionState('disconnecting');
127-
await this.relayConnection.stop();
128119
await super.stop();
129120
this.changeConnectionState('disconnected');
130121
log.trace('disconnected');

packages/core/js-client/src/connection/RelayConnection.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import type { PeerId } from '@libp2p/interface/peer-id';
2020
import { createLibp2p, Libp2p } from 'libp2p';
2121

2222
import { noise } from '@chainsafe/libp2p-noise';
23-
import { mplex } from '@libp2p/mplex';
23+
import { yamux } from '@chainsafe/libp2p-yamux';
2424
import { webSockets } from '@libp2p/websockets';
2525
import { all } from '@libp2p/websockets/filters';
2626
import { multiaddr } from '@multiformats/multiaddr';
@@ -36,7 +36,8 @@ import { throwIfHasNoPeerId } from '../util/libp2pUtils.js';
3636
import { IConnection } from './interfaces.js';
3737
import { IParticle } from '../particle/interfaces.js';
3838
import { Particle, serializeToString } from '../particle/Particle.js';
39-
import { IStartable } from '../util/commonTypes.js';
39+
import { identifyService } from 'libp2p/identify';
40+
import { pingService } from 'libp2p/ping';
4041

4142
const log = logger('connection');
4243

@@ -77,7 +78,7 @@ export interface RelayConnectionConfig {
7778
/**
7879
* Implementation for JS peers which connects to Fluence through relay node
7980
*/
80-
export class RelayConnection implements IStartable, IConnection {
81+
export class RelayConnection implements IConnection {
8182
private relayAddress: Multiaddr;
8283
private lib2p2Peer: Libp2p | null = null;
8384

@@ -110,14 +111,20 @@ export class RelayConnection implements IStartable, IConnection {
110111
filter: all,
111112
}),
112113
],
113-
streamMuxers: [mplex()],
114+
streamMuxers: [yamux()],
114115
connectionEncryption: [noise()],
115116
connectionManager: {
116117
dialTimeout: this.config.dialTimeoutMs,
117118
},
118119
connectionGater: {
119120
// By default, this function forbids connections to private peers. For example multiaddr with ip 127.0.0.1 isn't allowed
120121
denyDialMultiaddr: () => Promise.resolve(false)
122+
},
123+
services: {
124+
identify: identifyService({
125+
runOnConnectionOpen: false,
126+
}),
127+
ping: pingService()
121128
}
122129
});
123130

@@ -158,23 +165,25 @@ export class RelayConnection implements IStartable, IConnection {
158165
const sink = this._connection.streams[0].sink;
159166
*/
160167

168+
log.trace('sending particle...');
161169
const stream = await this.lib2p2Peer.dialProtocol(this.relayAddress, PROTOCOL_NAME);
170+
log.trace('created stream with id ', stream.id);
162171
const sink = stream.sink;
163172

164-
pipe(
173+
await pipe(
165174
[fromString(serializeToString(particle))],
166-
// @ts-ignore
167175
encode(),
168176
sink,
169177
);
178+
log.trace('data written to sink');
170179
}
171180

172181
private async connect() {
173182
if (this.lib2p2Peer === null) {
174183
throw new Error('Relay connection is not started');
175184
}
176185

177-
this.lib2p2Peer.handle(
186+
await this.lib2p2Peer.handle(
178187
[PROTOCOL_NAME],
179188
async ({ connection, stream }) => {
180189
pipe(
@@ -188,6 +197,7 @@ export class RelayConnection implements IStartable, IConnection {
188197
for await (const msg of source) {
189198
try {
190199
const particle = Particle.fromString(msg);
200+
log.trace('got particle from stream with id %s and particle id %s', stream.id, particle.id);
191201
this.particleSource.next(particle);
192202
} catch (e) {
193203
log.error('error on handling a new incoming message: %j', e);

packages/core/js-client/src/connection/interfaces.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@
1616
import type { PeerIdB58 } from '@fluencelabs/interfaces';
1717
import type { Subscribable } from 'rxjs';
1818
import { IParticle } from '../particle/interfaces.js';
19+
import { IStartable } from '../util/commonTypes.js';
1920

2021
/**
2122
* Interface for connection used in Fluence Peer.
2223
*/
23-
export interface IConnection {
24+
export interface IConnection extends IStartable {
2425
/**
2526
* Observable that emits particles received from the connection.
2627
*/

0 commit comments

Comments
 (0)