@@ -15,7 +15,7 @@ Future<void> voidFuture() async {}
1515const MAX_REQUEST_N_SIZE = 0x7FFFFFFF ;
1616
1717abstract class Subscriber {
18- void onNext (Payload value);
18+ void onNext (Payload ? value);
1919
2020 void onError (dynamic error);
2121
@@ -24,12 +24,12 @@ abstract class Subscriber {
2424
2525class CompleterSubscriber implements Subscriber {
2626 Completer completer;
27- Payload payload;
27+ Payload ? payload;
2828
2929 CompleterSubscriber (this .completer);
3030
3131 @override
32- void onNext (Payload payload) {
32+ void onNext (Payload ? payload) {
3333 this .payload = payload;
3434 }
3535
@@ -47,11 +47,11 @@ class CompleterSubscriber implements Subscriber {
4747class StreamSubscriber implements Subscriber {
4848 final StreamController controller;
4949
50- StreamSubscriber ({FutureOr <void > onCancel () = null })
50+ StreamSubscriber ({FutureOr <void > onCancel ()? = null })
5151 : controller = StreamController (onCancel: onCancel);
5252
5353 @override
54- void onNext (Payload value) {
54+ void onNext (Payload ? value) {
5555 controller.add (value);
5656 }
5757
@@ -65,26 +65,26 @@ class StreamSubscriber implements Subscriber {
6565 controller.close ().then ((value) => {});
6666 }
6767
68- Stream <Payload > payloadStream () {
69- return controller.stream.map ((item) => item as Payload );
68+ Stream <Payload ? > payloadStream () {
69+ return controller.stream.map ((item) => item as Payload ? );
7070 }
7171}
7272
7373class RSocketRequester extends RSocket {
7474 bool closed = false ;
7575 double _availability = 1.0 ;
76- Timer keepAliveTimer;
77- StreamIdSupplier streamIdSupplier;
78- ConnectionSetupPayload connectionSetupPayload;
79- DuplexConnection connection;
76+ Timer ? keepAliveTimer;
77+ late StreamIdSupplier streamIdSupplier;
78+ ConnectionSetupPayload ? connectionSetupPayload;
79+ late DuplexConnection connection;
8080
8181 //buffer for data chunk
82- List <int > chunkBuffer;
82+ List <int >? chunkBuffer;
8383
8484 Map <int , Subscriber > senders = {};
85- RSocket responder;
85+ RSocket ? responder;
8686 String mode = 'requester' ;
87- ErrorConsumer errorConsumer;
87+ ErrorConsumer ? errorConsumer;
8888
8989 RSocketRequester (String mode, ConnectionSetupPayload connectionSetupPayload,
9090 DuplexConnection connection) {
@@ -109,23 +109,23 @@ class RSocketRequester extends RSocket {
109109 //RSocket requestResponse
110110 requestResponse = (payload) {
111111 var completer = Completer <Payload >();
112- var streamId = streamIdSupplier.nextStreamId (senders);
112+ var streamId = streamIdSupplier.nextStreamId (senders)! ;
113113 connection
114- .write (FrameCodec .encodeRequestResponseFrame (streamId, payload));
114+ .write (FrameCodec .encodeRequestResponseFrame (streamId, payload! ));
115115 senders[streamId] = CompleterSubscriber (completer);
116116 return completer.future;
117117 };
118118 //RSocket fireAndForget
119119 fireAndForget = (payload) {
120- var streamId = streamIdSupplier.nextStreamId (senders);
121- connection.write (FrameCodec .encodeFireAndForgetFrame (streamId, payload));
120+ var streamId = streamIdSupplier.nextStreamId (senders)! ;
121+ connection.write (FrameCodec .encodeFireAndForgetFrame (streamId, payload! ));
122122 return Future .value (() {});
123123 };
124124 //RSocket requestStream
125125 requestStream = (payload) {
126- var streamId = streamIdSupplier.nextStreamId (senders);
126+ var streamId = streamIdSupplier.nextStreamId (senders)! ;
127127 connection.write (FrameCodec .encodeRequestStreamFrame (
128- streamId, MAX_REQUEST_N_SIZE , payload));
128+ streamId, MAX_REQUEST_N_SIZE , payload! ));
129129 var streamSubscriber = StreamSubscriber (onCancel: () {
130130 connection.write (FrameCodec .encodeCancelFrame (streamId));
131131 senders.remove (streamId);
@@ -135,7 +135,7 @@ class RSocketRequester extends RSocket {
135135 };
136136 //RSocket metadataPush
137137 metadataPush = (payload) {
138- connection.write (FrameCodec .encodeMetadataFrame (0 , payload));
138+ connection.write (FrameCodec .encodeMetadataFrame (0 , payload! ));
139139 return Future .value (() {});
140140 };
141141 //Rsocket Channel
@@ -153,7 +153,7 @@ class RSocketRequester extends RSocket {
153153 connection.write (setupPayloadFrame ());
154154 if (mode == 'requester' ) {
155155 keepAliveTimer = Timer .periodic (
156- Duration (seconds: connectionSetupPayload.keepAliveInterval),
156+ Duration (seconds: connectionSetupPayload! .keepAliveInterval),
157157 (Timer t) {
158158 if (! closed) {
159159 connection.write (FrameCodec .encodeKeepAlive (false , 0 ));
@@ -181,12 +181,12 @@ class RSocketRequester extends RSocket {
181181
182182 void receiveChunk (Uint8List chunk) {
183183 if (this .chunkBuffer != null ) {
184- this .chunkBuffer = this .chunkBuffer + chunk;
185- var chunkDataLength = this .chunkBuffer.length - 3 ;
186- var bytes = this .chunkBuffer.sublist (0 , 3 );
187- var rsocketFrameLength = bytesToNumber (bytes);
184+ this .chunkBuffer = this .chunkBuffer! + chunk;
185+ var chunkDataLength = this .chunkBuffer! .length - 3 ;
186+ var bytes = this .chunkBuffer! .sublist (0 , 3 );
187+ var rsocketFrameLength = bytesToNumber (bytes)! ;
188188 if (rsocketFrameLength <= chunkDataLength) {
189- for (var frame in parseFrames (this .chunkBuffer)) {
189+ for (var frame in parseFrames (this .chunkBuffer! )) {
190190 receiveFrame (frame);
191191 }
192192 this .chunkBuffer = null ;
@@ -196,7 +196,7 @@ class RSocketRequester extends RSocket {
196196 if (chunk.length > 3 ) {
197197 var chunkDataLength = chunk.length - 3 ;
198198 var bytes = chunk.sublist (0 , 3 );
199- var rsocketFrameLength = bytesToNumber (bytes);
199+ var rsocketFrameLength = bytesToNumber (bytes)! ;
200200 if (rsocketFrameLength > chunkDataLength) {
201201 this .chunkBuffer = chunk;
202202 return ;
@@ -219,12 +219,12 @@ class RSocketRequester extends RSocket {
219219 if (payloadFrame.completed) {
220220 senders.remove (streamId);
221221 if (payload? .data != null ) {
222- subscriber.onNext (payload);
222+ subscriber! .onNext (payload);
223223 }
224- subscriber.onComplete ();
224+ subscriber! .onComplete ();
225225 } else {
226226 if (payload? .data != null ) {
227- subscriber.onNext (payload);
227+ subscriber! .onNext (payload);
228228 }
229229 }
230230 }
@@ -241,10 +241,10 @@ class RSocketRequester extends RSocket {
241241 var streamId = header.streamId;
242242 var error = RSocketException (errorFrame.code, errorFrame.message);
243243 if (streamId == 0 && errorConsumer != null ) {
244- errorConsumer (error);
244+ errorConsumer ! (error);
245245 } else {
246246 if (senders.containsKey (streamId)) {
247- var subscriber = senders[streamId];
247+ var subscriber = senders[streamId]! ;
248248 senders.remove (streamId);
249249 subscriber.onError (error);
250250 }
@@ -261,37 +261,37 @@ class RSocketRequester extends RSocket {
261261 case frame_types.REQUEST_RESPONSE :
262262 var requestResponseFrame = frame as RequestResponseFrame ;
263263 if (responder != null && requestResponseFrame.payload != null ) {
264- responder
265- .requestResponse (requestResponseFrame.payload)
264+ responder! .requestResponse !(requestResponseFrame.payload)
266265 .then ((payload) {
267266 connection.write (
268267 FrameCodec .encodePayloadFrame (header.streamId, true , payload));
269268 }).catchError ((error) {
270269 var rsocketError = convertToRSocketException (error);
271270 connection.write (FrameCodec .encodeErrorFrame (
272- header.streamId, rsocketError.code, rsocketError.message));
271+ header.streamId, rsocketError.code! , rsocketError.message));
273272 });
274273 }
275274 break ;
276275 case frame_types.REQUEST_FNF :
277276 var fireAndForgetFrame = frame as RequestFNFFrame ;
278277 if (responder != null && fireAndForgetFrame.payload != null ) {
279- responder
280- .fireAndForget (fireAndForgetFrame.payload)
278+ responder! .fireAndForget !(fireAndForgetFrame.payload)
281279 .then ((value) => {});
282280 }
283281 break ;
284282 case frame_types.METADATA_PUSH :
285283 var metadataPushFrame = frame as MetadataPushFrame ;
286284 if (responder != null && metadataPushFrame.payload != null ) {
287- responder.metadataPush (metadataPushFrame.payload).then ((value) => {});
285+ responder! .metadataPush !(metadataPushFrame.payload)
286+ .then ((value) => {});
288287 }
289288 break ;
290289 case frame_types.REQUEST_STREAM :
291290 var requestStreamFrame = frame as RequestStreamFrame ;
292291 var requesterStreamId = header.streamId;
293292 if (responder != null && requestStreamFrame.payload != null ) {
294- responder.requestStream (requestStreamFrame.payload).listen ((payload) {
293+ responder! .requestStream !(requestStreamFrame.payload).listen (
294+ (payload) {
295295 connection.write (FrameCodec .encodePayloadFrame (
296296 requesterStreamId, false , payload));
297297 }, onDone: () {
@@ -301,7 +301,7 @@ class RSocketRequester extends RSocket {
301301 if (error is RSocketException ) {
302302 var e = error;
303303 connection.write (FrameCodec .encodeErrorFrame (
304- requesterStreamId, e.code, e.message));
304+ requesterStreamId, e.code! , e.message));
305305 } else {
306306 connection.write (FrameCodec .encodeErrorFrame (requesterStreamId,
307307 RSocketErrorCode .APPLICATION_ERROR , error.toString ()));
@@ -315,10 +315,10 @@ class RSocketRequester extends RSocket {
315315
316316 Uint8List setupPayloadFrame () {
317317 return FrameCodec .encodeSetupFrame (
318- connectionSetupPayload.keepAliveInterval,
319- connectionSetupPayload.keepAliveMaxLifetime,
320- connectionSetupPayload.metadataMimeType,
321- connectionSetupPayload.dataMimeType,
318+ connectionSetupPayload! .keepAliveInterval,
319+ connectionSetupPayload! .keepAliveMaxLifetime,
320+ connectionSetupPayload! .metadataMimeType,
321+ connectionSetupPayload! .dataMimeType,
322322 connectionSetupPayload);
323323 }
324324}
0 commit comments