Skip to content

Commit 1cf1e64

Browse files
NiteshKantstevegury
authored andcommitted
int overflow in requestN for Responder (#146)
#### Problem `ReactiveSocket` expects `requestN` to be an `int` whereas `ReactiveStreams` expects it to be a `long`. `Responder` does not correctly convert `long` to `int` for values greater than `Integer.MAX_VALUE`. This sends `-1` as the `requestN` value of the `RequestN` frame. #### Modification If the value is greater than `Integer.MAX_VALUE`, use `Integer.MAX_VALUE` instead. #### Result We will not send invalid values over the wire for `RequestN`.
1 parent 6c1d423 commit 1cf1e64

File tree

3 files changed

+67
-19
lines changed

3 files changed

+67
-19
lines changed

reactivesocket-core/src/main/java/io/reactivesocket/Frame.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ public static Frame from(int streamId, int requestN) {
368368
return frame;
369369
}
370370

371-
public static long requestN(final Frame frame) {
371+
public static int requestN(final Frame frame) {
372372
ensureFrameType(FrameType.REQUEST_N, frame);
373373
return RequestNFrameFlyweight.requestN(frame.directBuffer, frame.offset);
374374
}

reactivesocket-core/src/main/java/io/reactivesocket/internal/Responder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -735,11 +735,11 @@ public void request(long n) {
735735
if (rn.intValue() > 0) {
736736
// initial requestN back to the requester (subtract 1
737737
// for the initial frame which was already sent)
738-
child.onNext(Frame.RequestN.from(streamId, rn.intValue() - 1));
738+
child.onNext(Frame.RequestN.from(streamId, Math.min(Integer.MAX_VALUE, rn.intValue() - 1)));
739739
}
740740
}, r -> {
741741
// requested
742-
child.onNext(Frame.RequestN.from(streamId, r.intValue()));
742+
child.onNext(Frame.RequestN.from(streamId, Math.min(Integer.MAX_VALUE, r.intValue())));
743743
});
744744
synchronized(Responder.this) {
745745
if(channels.get(streamId) != null) {

reactivesocket-core/src/test/java/io/reactivesocket/internal/RequesterTest.java

Lines changed: 64 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,33 @@
1515
*/
1616
package io.reactivesocket.internal;
1717

18-
import static io.reactivesocket.TestUtil.*;
19-
import static org.junit.Assert.*;
20-
import static io.reactivesocket.ConnectionSetupPayload.NO_FLAGS;
21-
import static io.reactivex.Observable.*;
22-
23-
import java.util.Arrays;
24-
import java.util.List;
25-
import java.util.concurrent.TimeUnit;
26-
import java.util.function.Consumer;
27-
28-
import org.junit.Test;
29-
3018
import io.reactivesocket.ConnectionSetupPayload;
3119
import io.reactivesocket.Frame;
3220
import io.reactivesocket.FrameType;
3321
import io.reactivesocket.LatchedCompletable;
3422
import io.reactivesocket.Payload;
3523
import io.reactivesocket.TestConnection;
36-
import io.reactivex.subscribers.TestSubscriber;
24+
import io.reactivesocket.util.PayloadImpl;
3725
import io.reactivex.Observable;
3826
import io.reactivex.subjects.ReplaySubject;
27+
import io.reactivex.subscribers.TestSubscriber;
28+
import org.hamcrest.MatcherAssert;
29+
import org.junit.Test;
3930
import org.reactivestreams.Publisher;
4031
import org.reactivestreams.Subscription;
4132

33+
import java.util.ArrayList;
34+
import java.util.Arrays;
35+
import java.util.List;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.function.Consumer;
38+
39+
import static io.reactivesocket.ConnectionSetupPayload.*;
40+
import static io.reactivesocket.TestUtil.*;
41+
import static io.reactivex.Observable.*;
42+
import static org.hamcrest.Matchers.*;
43+
import static org.junit.Assert.*;
44+
4245
public class RequesterTest
4346
{
4447
final static Consumer<Throwable> ERROR_HANDLER = Throwable::printStackTrace;
@@ -79,6 +82,30 @@ public void testReqMetaPushCancelBeforeRequestN() throws InterruptedException {
7982
testCancelBeforeRequestN(p.metadataPush(utf8EncodedPayload("hello", null)));
8083
}
8184

85+
@Test()
86+
public void testReqStreamRequestLongMax() throws InterruptedException {
87+
TestConnection testConnection = establishConnection();
88+
Requester p = createClientRequester(testConnection);
89+
90+
testRequestLongMaxValue(p.requestStream(new PayloadImpl("")), testConnection);
91+
}
92+
93+
@Test()
94+
public void testReqSubscriptionRequestLongMax() throws InterruptedException {
95+
TestConnection testConnection = establishConnection();
96+
Requester p = createClientRequester(testConnection);
97+
98+
testRequestLongMaxValue(p.requestSubscription(new PayloadImpl("")), testConnection);
99+
}
100+
101+
@Test()
102+
public void testReqChannelRequestLongMax() throws InterruptedException {
103+
TestConnection testConnection = establishConnection();
104+
Requester p = createClientRequester(testConnection);
105+
106+
testRequestLongMaxValue(p.requestChannel(Publishers.just(new PayloadImpl(""))), testConnection);
107+
}
108+
82109
@Test(timeout=2000)
83110
public void testRequestResponseSuccess() throws InterruptedException {
84111
TestConnection conn = establishConnection();
@@ -306,14 +333,35 @@ private static <T> void testCancelBeforeRequestN(Publisher<T> source) {
306333
testSubscriber.assertNotComplete();
307334
}
308335

309-
private static Requester createClientRequester() throws InterruptedException {
310-
TestConnection conn = establishConnection();
336+
private static <T> void testRequestLongMaxValue(Publisher<T> source, TestConnection testConnection) {
337+
List<Integer> requestNs = new ArrayList<>();
338+
testConnection.write.add(frame -> {
339+
if (frame.getType() == FrameType.REQUEST_N) {
340+
requestNs.add(Frame.RequestN.requestN(frame));
341+
}
342+
});
343+
344+
TestSubscriber<T> testSubscriber = new TestSubscriber<T>(1L);
345+
source.subscribe(testSubscriber);
346+
347+
testSubscriber.request(Long.MAX_VALUE);
348+
testSubscriber.assertNoErrors();
349+
testSubscriber.assertNotComplete();
350+
351+
MatcherAssert.assertThat("Negative requestNs received.", requestNs, not(contains(-1)));
352+
}
353+
354+
private static Requester createClientRequester(TestConnection connection) throws InterruptedException {
311355
LatchedCompletable rc = new LatchedCompletable(1);
312-
Requester p = Requester.createClientRequester(conn, ConnectionSetupPayload.create("UTF-8", "UTF-8", NO_FLAGS), ERROR_HANDLER, rc);
356+
Requester p = Requester.createClientRequester(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", NO_FLAGS), ERROR_HANDLER, rc);
313357
rc.await();
314358
return p;
315359
}
316360

361+
private static Requester createClientRequester() throws InterruptedException {
362+
return createClientRequester(establishConnection());
363+
}
364+
317365
private static TestConnection establishConnection() {
318366
return new TestConnection();
319367
}

0 commit comments

Comments
 (0)