11// SPDX-License-Identifier: Apache-2.0
22package com .hedera .pbj .integration .jmh .grpc ;
33
4+ import com .hedera .pbj .grpc .helidon .PbjGrpcServiceConfig ;
45import com .hedera .pbj .grpc .helidon .PbjRouting ;
56import com .hedera .pbj .integration .grpc .GrpcTestUtils ;
67import com .hedera .pbj .integration .grpc .PortsAllocator ;
78import com .hedera .pbj .runtime .grpc .GrpcClient ;
89import com .hedera .pbj .runtime .grpc .Pipeline ;
910import com .hedera .pbj .runtime .grpc .ServiceInterface ;
1011import io .helidon .webserver .WebServer ;
12+ import java .util .Set ;
1113import java .util .concurrent .CountDownLatch ;
1214import java .util .concurrent .Flow ;
1315import java .util .concurrent .TimeUnit ;
3941 * Global parameters:
4042 * - the warmup and measurement iterations in annotations below
4143 * - INVOCATIONS constant at the top of the class below
42- * Benchmark parameters:
44+ * Benchmark state parameters:
4345 * - PayloadWeight enum
46+ * - encodings (e.g. "gzip", or "gzip,identity")
4447 * - streamCount in StreamingState below
4548 */
4649@ SuppressWarnings ("unused" )
5053@ Measurement (iterations = 5 )
5154@ OutputTimeUnit (TimeUnit .SECONDS )
5255@ BenchmarkMode (Mode .Throughput )
53- public class GrpcBench {
56+ public class PbjGrpcBench {
5457 private static final int INVOCATIONS = 20_000 ;
5558
5659 private record ServerHandle (WebServer server ) implements AutoCloseable {
@@ -59,18 +62,26 @@ public void close() {
5962 server .stop ();
6063 }
6164
62- static ServerHandle start (final int port , final ServiceInterface service ) {
65+ static ServerHandle start (
66+ final int port , final ServiceInterface service , final PbjGrpcServiceConfig serviceConfig ) {
6367 return new ServerHandle (WebServer .builder ()
6468 .port (port )
65- .addRouting (PbjRouting .builder ().service (service ))
69+ .addRouting (PbjRouting .builder ().service (service , serviceConfig ))
6670 .maxPayloadSize (10000 )
6771 .build ()
6872 .start ());
6973 }
7074 }
7175
72- static GreeterInterface .GreeterClient createClient (final int port ) {
73- final GrpcClient grpcClient = GrpcTestUtils .createGrpcClient (port , GrpcTestUtils .PROTO_OPTIONS );
76+ static GreeterInterface .GreeterClient createClient (final int port , final String [] encodings ) {
77+ final GrpcClient grpcClient ;
78+ if (encodings == null || encodings .length == 0 ) {
79+ grpcClient = GrpcTestUtils .createGrpcClient (port , GrpcTestUtils .PROTO_OPTIONS );
80+ } else {
81+ grpcClient =
82+ GrpcTestUtils .createGrpcClient (port , GrpcTestUtils .PROTO_OPTIONS , encodings [0 ], Set .of (encodings ));
83+ }
84+
7485 return new GreeterInterface .GreeterClient (grpcClient , GrpcTestUtils .PROTO_OPTIONS );
7586 }
7687
@@ -79,14 +90,20 @@ public static class UnaryState {
7990 @ Param
8091 PayloadWeight weight ;
8192
93+ @ Param ({"identity" , "gzip" })
94+ String encodings ;
95+
8296 PortsAllocator .Port port ;
8397 ServerHandle server ;
8498 GreeterInterface .GreeterClient client ;
8599
86100 void setup (int streamCount ) {
101+ final String [] splitEncodings = encodings .split ("," );
102+ final PbjGrpcServiceConfig serviceConfig =
103+ new PbjGrpcServiceConfig (splitEncodings [0 ], Set .of (splitEncodings ));
87104 port = GrpcTestUtils .PORTS .acquire ();
88- server = ServerHandle .start (port .port (), new GreeterService (weight , streamCount ));
89- client = createClient (port .port ());
105+ server = ServerHandle .start (port .port (), new GreeterService (weight , streamCount ), serviceConfig );
106+ client = createClient (port .port (), splitEncodings );
90107 }
91108
92109 @ Setup (Level .Invocation )
@@ -96,20 +113,36 @@ public void setup() {
96113
97114 @ TearDown (Level .Invocation )
98115 public void tearDown () {
99- client .close ();
116+ try {
117+ client .close ();
118+ } catch (Exception ex ) {
119+ ex .printStackTrace ();
120+ }
100121 client = null ;
101- server .close ();
122+ try {
123+ server .close ();
124+ } catch (Exception ex ) {
125+ ex .printStackTrace ();
126+ }
102127 server = null ;
103- port .close ();
128+ try {
129+ port .close ();
130+ } catch (Exception ex ) {
131+ ex .printStackTrace ();
132+ }
104133 port = null ;
105134 }
106135 }
107136
137+ // There's code duplicated from UnaryState above. It's because JMH is having troubles when states use inheritance.
108138 @ State (Scope .Thread )
109139 public static class StreamingState {
110140 @ Param
111141 PayloadWeight weight ;
112142
143+ @ Param ({"identity" , "gzip" })
144+ String encodings ;
145+
113146 @ Param ({"1" , "10" })
114147 int streamCount ;
115148
@@ -118,9 +151,12 @@ public static class StreamingState {
118151 GreeterInterface .GreeterClient client ;
119152
120153 void setup (int streamCount ) {
154+ final String [] splitEncodings = encodings .split ("," );
155+ final PbjGrpcServiceConfig serviceConfig =
156+ new PbjGrpcServiceConfig (splitEncodings [0 ], Set .of (splitEncodings ));
121157 port = GrpcTestUtils .PORTS .acquire ();
122- server = ServerHandle .start (port .port (), new GreeterService (weight , streamCount ));
123- client = createClient (port .port ());
158+ server = ServerHandle .start (port .port (), new GreeterService (weight , streamCount ), serviceConfig );
159+ client = createClient (port .port (), splitEncodings );
124160 }
125161
126162 @ Setup (Level .Invocation )
@@ -130,33 +166,45 @@ public void setup() {
130166
131167 @ TearDown (Level .Invocation )
132168 public void tearDown () {
133- client .close ();
169+ try {
170+ client .close ();
171+ } catch (Exception ex ) {
172+ ex .printStackTrace ();
173+ }
134174 client = null ;
135- server .close ();
175+ try {
176+ server .close ();
177+ } catch (Exception ex ) {
178+ ex .printStackTrace ();
179+ }
136180 server = null ;
137- port .close ();
181+ try {
182+ port .close ();
183+ } catch (Exception ex ) {
184+ ex .printStackTrace ();
185+ }
138186 port = null ;
139187 }
140188 }
141189
142190 @ Benchmark
143191 @ OperationsPerInvocation (INVOCATIONS )
144192 public void benchUnary (final UnaryState state , final Blackhole blackhole ) {
145- for ( int i = 1 ; i <= INVOCATIONS ; i ++) {
146- try {
193+ try {
194+ for ( int i = 1 ; i <= INVOCATIONS ; i ++) {
147195 blackhole .consume (state .client .sayHello (state .weight .requestSupplier .get ()));
148- } catch (Exception e ) {
149- // Keep running because network may fail sometimes.
150- new RuntimeException (e ).printStackTrace ();
151196 }
197+ } catch (Exception e ) {
198+ // Keep running because network may fail sometimes.
199+ e .printStackTrace ();
152200 }
153201 }
154202
155203 @ Benchmark
156204 @ OperationsPerInvocation (INVOCATIONS )
157205 public void benchServerStreaming (final StreamingState state , final Blackhole blackhole ) {
158- for ( int i = 1 ; i <= INVOCATIONS ; i ++) {
159- try {
206+ try {
207+ for ( int i = 1 ; i <= INVOCATIONS ; i ++) {
160208 final CountDownLatch latch = new CountDownLatch (1 );
161209 state .client .sayHelloStreamReply (state .weight .requestSupplier .get (), new Pipeline <>() {
162210 @ Override
@@ -168,7 +216,9 @@ public void onNext(HelloReply item) throws RuntimeException {
168216 public void onSubscribe (Flow .Subscription subscription ) {}
169217
170218 @ Override
171- public void onError (Throwable throwable ) {}
219+ public void onError (Throwable throwable ) {
220+ new RuntimeException (throwable ).printStackTrace ();
221+ }
172222
173223 @ Override
174224 public void onComplete () {
@@ -178,20 +228,20 @@ public void onComplete() {
178228 try {
179229 latch .await ();
180230 } catch (InterruptedException e ) {
181- throw new RuntimeException ( e );
231+ Thread . currentThread (). interrupt ( );
182232 }
183- } catch (Exception e ) {
184- // Keep running because network may fail sometimes.
185- new RuntimeException (e ).printStackTrace ();
186233 }
234+ } catch (Exception e ) {
235+ // Keep running because network may fail sometimes.
236+ e .printStackTrace ();
187237 }
188238 }
189239
190240 @ Benchmark
191241 @ OperationsPerInvocation (INVOCATIONS )
192242 public void benchClientStreaming (final StreamingState state , final Blackhole blackhole ) {
193- for ( int i = 1 ; i <= INVOCATIONS ; i ++) {
194- try {
243+ try {
244+ for ( int i = 1 ; i <= INVOCATIONS ; i ++) {
195245 final CountDownLatch latch = new CountDownLatch (1 );
196246 final Pipeline <? super HelloRequest > requests = state .client .sayHelloStreamRequest (new Pipeline <>() {
197247 @ Override
@@ -221,20 +271,20 @@ public void onComplete() {
221271 try {
222272 latch .await ();
223273 } catch (InterruptedException e ) {
224- throw new RuntimeException ( e );
274+ Thread . currentThread (). interrupt ( );
225275 }
226- } catch (Exception e ) {
227- // Keep running because network may fail sometimes.
228- new RuntimeException (e ).printStackTrace ();
229276 }
277+ } catch (Exception e ) {
278+ // Keep running because network may fail sometimes.
279+ e .printStackTrace ();
230280 }
231281 }
232282
233283 @ Benchmark
234284 @ OperationsPerInvocation (INVOCATIONS )
235285 public void benchBidiStreaming (final StreamingState state , final Blackhole blackhole ) {
236- for ( int i = 1 ; i <= INVOCATIONS ; i ++) {
237- try {
286+ try {
287+ for ( int i = 1 ; i <= INVOCATIONS ; i ++) {
238288 final CountDownLatch latch = new CountDownLatch (1 );
239289 final Pipeline <? super HelloRequest > requests = state .client .sayHelloStreamBidi (new Pipeline <>() {
240290 @ Override
@@ -264,18 +314,18 @@ public void onComplete() {
264314 try {
265315 latch .await ();
266316 } catch (InterruptedException e ) {
267- throw new RuntimeException ( e );
317+ Thread . currentThread (). interrupt ( );
268318 }
269- } catch (Exception e ) {
270- // Keep running because network may fail sometimes.
271- new RuntimeException (e ).printStackTrace ();
272319 }
320+ } catch (Exception e ) {
321+ // Keep running because network may fail sometimes.
322+ e .printStackTrace ();
273323 }
274324 }
275325
276326 public static void main (String [] args ) throws Exception {
277327 Options opt =
278- new OptionsBuilder ().include (GrpcBench .class .getSimpleName ()).build ();
328+ new OptionsBuilder ().include (PbjGrpcBench .class .getSimpleName ()).build ();
279329
280330 new Runner (opt ).run ();
281331 }
0 commit comments