1- import 'dart:io' ;
21import 'dart:typed_data' ;
32
3+ import 'package:universal_io/io.dart' ;
4+ import 'package:web_socket_channel/web_socket_channel.dart' ;
5+
46import 'io/bytes.dart' ;
57import 'rsocket.dart' ;
68
@@ -56,14 +58,16 @@ class TcpDuplexConnection extends DuplexConnection {
5658}
5759
5860class WebSocketDuplexConnection extends DuplexConnection {
59- WebSocket webSocket;
60- bool closed = false ;
61+ WebSocketChannel webSocket;
62+ bool closed = true ;
6163
6264 WebSocketDuplexConnection (this .webSocket);
6365
6466 @override
6567 void init () {
66- webSocket.listen ((message) {
68+
69+
70+ webSocket.stream.listen ((message) {
6771 var data = message as List <int >;
6872 var frameLenBytes = i24ToBytes (data.length);
6973 receiveHandler !(Uint8List .fromList (frameLenBytes + data));
@@ -79,15 +83,14 @@ class WebSocketDuplexConnection extends DuplexConnection {
7983 if (! closed) {
8084 closed = true ;
8185 _availability = 0.0 ;
82- webSocket.close ();
8386 closeHandler? .call ();
8487 }
8588 }
8689
8790 @override
8891 void write (Uint8List chunk) {
8992 //remove frame length: 3 bytes
90- webSocket.add (chunk.sublist (3 ));
93+ webSocket.sink. add (chunk.sublist (3 ));
9194 }
9295}
9396
@@ -97,10 +100,13 @@ Future<DuplexConnection> connectRSocket(String url, TcpChunkHandler handler) {
97100 if (scheme == 'tcp' ) {
98101 var socketFuture = Socket .connect (uri.host, uri.port);
99102 return socketFuture.then ((socket) => TcpDuplexConnection (socket));
100- } else if (scheme == 'ws' || scheme == 'wss' ) {
101- var socketFuture = WebSocket .connect (url);
102- return socketFuture.then ((socket) => WebSocketDuplexConnection (socket));
103+ }if (scheme == 'ws' || scheme == 'wss' ) {
104+ final websocket = WebSocketChannel .connect (
105+ Uri .parse (url),
106+ );
107+ return Future .value (WebSocketDuplexConnection (websocket));
103108 } else {
104109 return Future .error ('${scheme } unsupported' );
105110 }
106111}
112+
0 commit comments