@@ -7,23 +7,122 @@ import 'package:hive/hive.dart';
77import 'package:lib5/constants.dart' ;
88import 'package:lib5/lib5.dart' ;
99import 'package:lib5/util.dart' ;
10- import 'package:messagepack/messagepack.dart' ;
10+ import 'package:s5_msgpack/s5_msgpack.dart' ;
11+ import 'package:web_socket_channel/web_socket_channel.dart' ;
1112import 'package:s5_server/db/hive_key_value_db.dart' ;
1213import 'package:tint/tint.dart' ;
1314
1415import 'package:s5_server/logger/base.dart' ;
1516import 'package:s5_server/model/signed_message.dart' ;
1617import 'package:s5_server/node.dart' ;
1718
18- class Peer {
19+ abstract class Peer {
1920 late final NodeID id;
21+ final List <Uri > connectionUris;
22+ bool isConnected = false ;
23+ late final Uint8List challenge;
24+
25+ Peer ({required this .connectionUris});
26+
27+ void sendMessage (Uint8List message);
28+
29+ void listenForMessages (
30+ Function callback, {
31+ dynamic onDone,
32+ Function ? onError,
33+ required Logger logger,
34+ });
2035
36+ String renderLocationUri ();
37+ }
38+
39+ class WebSocketChannelPeer extends Peer {
40+ final WebSocketChannel _socket;
41+ final String locationUri;
42+
43+ WebSocketChannelPeer (
44+ this ._socket, {
45+ required this .locationUri,
46+ required super .connectionUris,
47+ });
48+
49+ @override
50+ void sendMessage (Uint8List message) {
51+ _socket.sink.add (message);
52+ }
53+
54+ @override
55+ void listenForMessages (
56+ Function callback, {
57+ dynamic onDone,
58+ Function ? onError,
59+ required Logger logger,
60+ }) {
61+ final sub = _socket.stream.listen (
62+ (event) async {
63+ await callback (event);
64+ },
65+ onDone: onDone,
66+ onError: onError,
67+ cancelOnError: false ,
68+ );
69+ }
70+
71+ @override
72+ String renderLocationUri () {
73+ return locationUri;
74+ }
75+ }
76+
77+ class WebSocketPeer extends Peer {
78+ final WebSocket _socket;
79+
80+ WebSocketPeer (
81+ this ._socket, {
82+ required super .connectionUris,
83+ });
84+
85+ @override
86+ void sendMessage (Uint8List message) {
87+ _socket.add (message);
88+ }
89+
90+ @override
91+ void listenForMessages (
92+ Function callback, {
93+ dynamic onDone,
94+ Function ? onError,
95+ required Logger logger,
96+ }) {
97+ final sub = _socket.listen (
98+ (event) async {
99+ await callback (event);
100+ },
101+ onDone: onDone,
102+ onError: onError,
103+ cancelOnError: false ,
104+ );
105+ }
106+
107+ @override
108+ String renderLocationUri () {
109+ return 'WebSocket client' ;
110+ }
111+ }
112+
113+ class TcpPeer extends Peer {
21114 final Socket _socket;
115+ TcpPeer (
116+ this ._socket, {
117+ required super .connectionUris,
118+ });
22119
120+ @override
23121 void sendMessage (Uint8List message) {
24122 _socket.add (encodeEndian (message.length, 4 ) + message);
25123 }
26124
125+ @override
27126 void listenForMessages (
28127 Function callback, {
29128 dynamic onDone,
@@ -57,17 +156,7 @@ class Peer {
57156 );
58157 }
59158
60- final List <Uri > connectionUris;
61-
62- bool isConnected = false ;
63-
64- late final Uint8List challenge;
65-
66- Peer (
67- this ._socket, {
68- required this .connectionUris,
69- });
70-
159+ @override
71160 String renderLocationUri () {
72161 return connectionUris.isEmpty
73162 ? _socket.remoteAddress.address
@@ -109,7 +198,7 @@ class P2PService {
109198 final socket = await ServerSocket .bind ('0.0.0.0' , networkSelf['port' ]);
110199 socket.listen (
111200 (peerSocket) {
112- final p = Peer (
201+ final p = TcpPeer (
113202 peerSocket,
114203 connectionUris: [],
115204 );
@@ -130,9 +219,17 @@ class P2PService {
130219 selfConnectionUris.add (
131220 Uri .parse ('tcp://${networkSelf ['ip' ]}:${networkSelf ['port' ]}' ),
132221 );
222+ }
133223
134- logger.info ('connection uris: $selfConnectionUris ' );
224+ final String ? domain = node.config['http' ]? ['api' ]? ['domain' ];
225+ if (domain != null && node.config['p2p' ]? ['self' ]? ['disabled' ] != true ) {
226+ selfConnectionUris.add (
227+ Uri .parse ('wss://$domain /s5/p2p' ),
228+ );
135229 }
230+
231+ logger.info ('connection uris: $selfConnectionUris ' );
232+
136233 final initialPeers = node.config['p2p' ]? ['peers' ]? ['initial' ] ?? [];
137234
138235 for (final p in initialPeers) {
@@ -538,38 +635,63 @@ class P2PService {
538635 }
539636
540637 void connectToNode (List <Uri > connectionUris) async {
541- final connectionUri = connectionUris.first;
542- final protocol = connectionUri.scheme;
543- if (protocol != 'tcp' ) {
544- throw 'Protocol $protocol not supported' ;
638+ final connectionUri = connectionUris.firstWhere (
639+ (uri) => ['ws' , 'wss' ].contains (uri.scheme),
640+ orElse: () => connectionUris.firstWhere (
641+ (uri) => uri.scheme == 'tcp' ,
642+ orElse: () => Uri (scheme: 'unsupported' ),
643+ ),
644+ );
645+ if (connectionUri.scheme == 'unsupported' ) {
646+ throw 'None of the available connection URIs are supported ($connectionUris )' ;
545647 }
648+
649+ final protocol = connectionUri.scheme;
650+
546651 if (connectionUri.userInfo.isEmpty) {
547652 throw 'Connection URI does not contain node id' ;
548653 }
549654 final id = NodeID .decode (connectionUri.userInfo);
550655
551656 reconnectDelay[id] = reconnectDelay[id] ?? 1 ;
552657
553- final ip = connectionUri.host;
554- final port = connectionUri.port;
555-
556658 if (id == localNodeId) {
557659 return ;
558660 }
559661 bool retried = false ;
560662 runZonedGuarded (
561663 () async {
562664 logger.verbose ('[connect] $connectionUri ' );
665+ if (protocol == 'tcp' ) {
666+ final ip = connectionUri.host;
667+ final port = connectionUri.port;
668+ final socket = await Socket .connect (ip, port);
669+
670+ await onNewPeer (
671+ TcpPeer (
672+ socket,
673+ connectionUris: [connectionUri],
674+ )..id = id,
675+ verifyId: true ,
676+ );
677+ } else {
678+ final locationUri = connectionUri.replace (
679+ userInfo: '' ,
680+ );
563681
564- final socket = await Socket .connect (ip, port);
682+ final channel = WebSocketChannel .connect (
683+ locationUri,
684+ );
565685
566- await onNewPeer (
567- Peer (
568- socket,
569- connectionUris: [connectionUri],
570- )..id = id,
571- verifyId: true ,
572- );
686+ await onNewPeer (
687+ WebSocketChannelPeer (
688+ channel,
689+ locationUri: locationUri.toString (),
690+ connectionUris: [connectionUri],
691+ )..id = id,
692+ verifyId: true ,
693+ );
694+ }
573695 },
574696 (e, st) async {
575697 if (retried) return ;
0 commit comments