Skip to content

Commit ec6f419

Browse files
authored
Merge pull request #31760 from alesj/i31442
2 parents 4859ae3 + 13ce8e3 commit ec6f419

File tree

8 files changed

+161
-1
lines changed

8 files changed

+161
-1
lines changed

integration-tests/grpc-streaming/src/main/java/io/quarkus/grpc/example/streaming/StreamingService.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package io.quarkus.grpc.example.streaming;
22

33
import java.time.Duration;
4+
import java.util.concurrent.atomic.AtomicInteger;
45

56
import io.grpc.examples.streaming.Empty;
67
import io.grpc.examples.streaming.Item;
78
import io.grpc.examples.streaming.MutinyStreamingGrpc;
9+
import io.grpc.examples.streaming.StringReply;
10+
import io.grpc.examples.streaming.StringRequest;
811
import io.quarkus.grpc.GrpcService;
912
import io.smallrye.mutiny.Multi;
1013
import io.smallrye.mutiny.Uni;
@@ -36,4 +39,43 @@ public Multi<Item> pipe(Multi<Item> request) {
3639
.onItem().scan(() -> 0L, Long::sum)
3740
.onItem().transform(l -> Item.newBuilder().setValue(Long.toString(l)).build());
3841
}
42+
43+
@Override
44+
public Uni<StringReply> quickStringStream(Multi<StringRequest> request) {
45+
return request
46+
.call(() -> {
47+
throw new RuntimeException("Any error");
48+
})
49+
.map(x -> {
50+
return StringReply.newBuilder()
51+
.setMessage(x.toString())
52+
.build();
53+
})
54+
.collect().asList()
55+
.replaceWith(StringReply.newBuilder()
56+
.setMessage("DONE")
57+
.build())
58+
.onFailure()
59+
.invoke(th -> System.err.println("Quick: " + th.getMessage()));
60+
}
61+
62+
@Override
63+
public Uni<StringReply> midStringStream(Multi<StringRequest> request) {
64+
AtomicInteger atomicInteger = new AtomicInteger(0);
65+
return request
66+
.map(x -> {
67+
if (atomicInteger.getAndIncrement() == 5) {
68+
throw new RuntimeException("We reached 5, error here");
69+
}
70+
return StringReply.newBuilder()
71+
.setMessage(x.toString())
72+
.build();
73+
})
74+
.collect().asList()
75+
.replaceWith(StringReply.newBuilder()
76+
.setMessage("DONE")
77+
.build())
78+
.onFailure()
79+
.invoke(th -> System.err.println("Mid: " + th.getMessage()));
80+
}
3981
}

integration-tests/grpc-streaming/src/main/proto/streaming.proto

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,21 @@ service Streaming {
1212
rpc Source(Empty) returns (stream Item) {}
1313
rpc Sink(stream Item) returns (Empty) {}
1414
rpc Pipe(stream Item) returns (stream Item) {}
15+
rpc QuickStringStream (stream StringRequest) returns (StringReply) {}
16+
rpc MidStringStream (stream StringRequest) returns (StringReply) {}
1517
}
1618

1719
message Item {
1820
string value = 1;
1921
}
2022

2123
message Empty {
22-
}
24+
}
25+
26+
message StringRequest {
27+
string anyValue = 1;
28+
}
29+
30+
message StringReply {
31+
string message = 1;
32+
}

integration-tests/grpc-streaming/src/main/resources/application.properties

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,12 @@ quarkus.grpc.clients.streaming.port=9001
44
%vertx.quarkus.grpc.clients.streaming.port=8081
55
%vertx.quarkus.grpc.clients.streaming.use-quarkus-grpc-client=true
66
%vertx.quarkus.grpc.server.use-separate-server=false
7+
8+
%n2o.quarkus.grpc.server.use-separate-server=true
9+
%o2n.quarkus.grpc.server.use-separate-server=false
10+
11+
%n2o.quarkus.grpc.clients.streaming.port=9001
12+
%n2o.quarkus.grpc.clients.streaming.use-quarkus-grpc-client=true
13+
14+
%o2n.quarkus.grpc.clients.streaming.port=8081
15+
%o2n.quarkus.grpc.clients.streaming.use-quarkus-grpc-client=false
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.quarkus.grpc.example.streaming;
2+
3+
import io.quarkus.test.junit.QuarkusTest;
4+
5+
@QuarkusTest
6+
public class LongStreamTest extends LongStreamTestBase {
7+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package io.quarkus.grpc.example.streaming;
2+
3+
import java.time.Duration;
4+
import java.time.temporal.ChronoUnit;
5+
6+
import org.junit.jupiter.api.Test;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
import io.grpc.StatusRuntimeException;
11+
import io.grpc.examples.streaming.Streaming;
12+
import io.grpc.examples.streaming.StringReply;
13+
import io.grpc.examples.streaming.StringRequest;
14+
import io.quarkus.grpc.GrpcClient;
15+
import io.smallrye.mutiny.Multi;
16+
import io.smallrye.mutiny.Uni;
17+
import io.smallrye.mutiny.helpers.test.UniAssertSubscriber;
18+
19+
@SuppressWarnings("NewClassNamingConvention")
20+
public class LongStreamTestBase {
21+
private final Logger log = LoggerFactory.getLogger(getClass());
22+
23+
@GrpcClient("streaming")
24+
Streaming streamSvc;
25+
26+
@Test
27+
public void testQuickFailure() {
28+
Multi<StringRequest> multi = Multi.createFrom().range(1, 10)
29+
// delaying stream to make it a bit longer
30+
.call(() -> Uni.createFrom().nullItem().onItem().delayIt().by(Duration.of(1000, ChronoUnit.NANOS)))
31+
.map(x -> StringRequest.newBuilder()
32+
.setAnyValue(x.toString())
33+
.build());
34+
// .invoke(x -> log.info("Stream piece number is: " + x.getAnyValue()));
35+
36+
UniAssertSubscriber<StringReply> subscriber = streamSvc.quickStringStream(multi)
37+
.subscribe().withSubscriber(UniAssertSubscriber.create());
38+
39+
subscriber
40+
.awaitFailure()
41+
.assertFailedWith(StatusRuntimeException.class);
42+
}
43+
44+
@Test
45+
public void testMidFailure() {
46+
Multi<StringRequest> multi = Multi.createFrom().range(1, 10)
47+
// delaying stream to make it a bit longer
48+
.call(() -> Uni.createFrom().nullItem().onItem().delayIt().by(Duration.of(1000, ChronoUnit.NANOS)))
49+
.map(x -> StringRequest.newBuilder()
50+
.setAnyValue(x.toString())
51+
.build());
52+
// .invoke(x -> log.info("Stream piece number is: " + x.getAnyValue()));
53+
54+
UniAssertSubscriber<StringReply> subscriber = streamSvc.midStringStream(multi)
55+
.subscribe().withSubscriber(UniAssertSubscriber.create());
56+
57+
subscriber
58+
.awaitFailure()
59+
.assertFailedWith(StatusRuntimeException.class);
60+
}
61+
62+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.quarkus.grpc.example.streaming;
2+
3+
import io.quarkus.grpc.test.utils.N2OGRPCTestProfile;
4+
import io.quarkus.test.junit.QuarkusTest;
5+
import io.quarkus.test.junit.TestProfile;
6+
7+
@QuarkusTest
8+
@TestProfile(N2OGRPCTestProfile.class)
9+
public class N2OLongStreamTest extends LongStreamTestBase {
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.quarkus.grpc.example.streaming;
2+
3+
import io.quarkus.grpc.test.utils.O2NGRPCTestProfile;
4+
import io.quarkus.test.junit.QuarkusTest;
5+
import io.quarkus.test.junit.TestProfile;
6+
7+
@QuarkusTest
8+
@TestProfile(O2NGRPCTestProfile.class)
9+
public class O2NLongStreamTest extends LongStreamTestBase {
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.quarkus.grpc.example.streaming;
2+
3+
import io.quarkus.grpc.test.utils.VertxGRPCTestProfile;
4+
import io.quarkus.test.junit.QuarkusTest;
5+
import io.quarkus.test.junit.TestProfile;
6+
7+
@QuarkusTest
8+
@TestProfile(VertxGRPCTestProfile.class)
9+
public class VertxLongStreamTest extends LongStreamTestBase {
10+
}

0 commit comments

Comments
 (0)