Skip to content

Commit 3fc51bc

Browse files
committed
progress with #326
1 parent c9f80c9 commit 3fc51bc

File tree

1 file changed

+14
-5
lines changed
  • grpc-spring-boot-starter-demo/src/kafkaStreamTest/java/org/lognet/springboot/grpc/kafka

1 file changed

+14
-5
lines changed

grpc-spring-boot-starter-demo/src/kafkaStreamTest/java/org/lognet/springboot/grpc/kafka/GrpcKafkaTest.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.springframework.test.context.junit4.SpringRunner;
2424

2525
import java.time.Duration;
26+
import java.util.concurrent.CompletableFuture;
2627
import java.util.function.Consumer;
2728

2829
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE;
@@ -44,11 +45,19 @@ public static class TestConfig {
4445
class MyCustomService extends CustomServiceGrpc.CustomServiceImplBase {
4546
@Override
4647
public void custom(Custom.CustomRequest request, StreamObserver<Custom.CustomReply> responseObserver) {
47-
kafkaTemplate.send(kafkaTemplate.getDefaultTopic(), request.getName().getBytes())
48-
.addCallback(e -> {
49-
responseObserver.onNext(Custom.CustomReply.newBuilder().setMessage(request.getName()).build());
50-
responseObserver.onCompleted();
51-
}, responseObserver::onError);
48+
kafkaTemplate.send(kafkaTemplate.getDefaultTopic(), request.getName().getBytes())
49+
.handle((r, e) -> {
50+
if (null == e) {
51+
responseObserver.onNext(Custom.CustomReply.newBuilder().setMessage(request.getName()).build());
52+
responseObserver.onCompleted();
53+
} else {
54+
responseObserver.onError(e);
55+
}
56+
57+
return true;
58+
}
59+
60+
).join();
5261
}
5362
}
5463

0 commit comments

Comments
 (0)