|
25 | 25 | import java.nio.charset.StandardCharsets; |
26 | 26 | import java.util.ArrayList; |
27 | 27 | import java.util.List; |
28 | | -import java.util.concurrent.TimeUnit; |
29 | 28 | import java.util.stream.Stream; |
30 | 29 | import javax.annotation.Nonnull; |
31 | 30 |
|
| 31 | +import io.grpc.ManagedChannel; |
| 32 | +import io.grpc.ManagedChannelBuilder; |
32 | 33 | import org.apache.arrow.flight.CallOption; |
33 | 34 | import org.apache.arrow.flight.CallOptions; |
34 | 35 | import org.apache.arrow.flight.CallStatus; |
|
37 | 38 | import org.apache.arrow.flight.Location; |
38 | 39 | import org.apache.arrow.flight.NoOpFlightProducer; |
39 | 40 | import org.apache.arrow.flight.Ticket; |
| 41 | +import org.apache.arrow.flight.impl.FlightServiceGrpc; |
40 | 42 | import org.apache.arrow.memory.BufferAllocator; |
41 | 43 | import org.apache.arrow.memory.RootAllocator; |
42 | 44 | import org.apache.arrow.vector.VarCharVector; |
@@ -176,28 +178,25 @@ void setInboundMessageSizeLarge() throws Exception { |
176 | 178 |
|
177 | 179 | @Test |
178 | 180 | void grpcCallOption() { |
179 | | - GrpcCallOption.Builder builder = new GrpcCallOption.Builder(); |
180 | | - builder.withMaxInboundMessageSize(1024); |
181 | | - builder.withMaxOutboundMessageSize(1024); |
182 | | - builder.withCompressorName("my-compressor"); |
183 | | - builder.withDeadlineAfter(2, TimeUnit.HOURS); |
184 | | - builder.withExecutor(Runnable::run); |
185 | | - builder.withWaitForReady(); |
186 | | - |
187 | | - GrpcCallOption callOption = builder.build(); |
188 | | - Assertions.assertThat(callOption.getMaxInboundMessageSize()).isEqualTo(1024); |
189 | | - Assertions.assertThat(callOption.getMaxOutboundMessageSize()).isEqualTo(1024); |
190 | | - Assertions.assertThat(callOption.getCompressorName()).isEqualTo("my-compressor"); |
191 | | - Assertions.assertThat(callOption.getDeadlineAfter()).isNotNull(); |
192 | | - Assertions.assertThat(callOption.getExecutor()).isNotNull(); |
193 | | - Assertions.assertThat(callOption.getWaitForReady()).isTrue(); |
194 | | - |
195 | | - CallOption[] callBackArray = callOption.getCallOptionCallback(); |
196 | | - Assertions.assertThat(callBackArray).isNotNull(); |
197 | | - Assertions.assertThat(callBackArray.length).isEqualTo(6); |
198 | | - for (CallOption option : callBackArray) { |
199 | | - Assertions.assertThat(option).isInstanceOf(CallOptions.GrpcCallOption.class); |
| 181 | + GrpcCallOption grpcCallOption = new GrpcCallOption.Builder() |
| 182 | + .withMaxInboundMessageSize(1024) |
| 183 | + .withMaxOutboundMessageSize(1024) |
| 184 | + .withCompressorName("my-compressor") |
| 185 | + .withWaitForReady() |
| 186 | + .build(); |
| 187 | + ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 3333) |
| 188 | + .usePlaintext() |
| 189 | + .build(); |
| 190 | + FlightServiceGrpc.FlightServiceStub stub = FlightServiceGrpc.newStub(channel); |
| 191 | + for (CallOption option : grpcCallOption.getCallOptionCallback()) { |
| 192 | + stub = ((CallOptions.GrpcCallOption) option).wrapStub(stub); |
200 | 193 | } |
| 194 | + |
| 195 | + io.grpc.CallOptions stubCallOptions = stub.getCallOptions(); |
| 196 | + Assertions.assertThat(stubCallOptions.getMaxInboundMessageSize()).isEqualTo(grpcCallOption.getMaxInboundMessageSize()); |
| 197 | + Assertions.assertThat(stubCallOptions.getMaxOutboundMessageSize()).isEqualTo(grpcCallOption.getMaxOutboundMessageSize()); |
| 198 | + Assertions.assertThat(stubCallOptions.getCompressor()).isEqualTo(grpcCallOption.getCompressorName()); |
| 199 | + Assertions.assertThat(stubCallOptions.isWaitForReady()).isEqualTo(grpcCallOption.getWaitForReady()); |
201 | 200 | } |
202 | 201 |
|
203 | 202 | private FlightServer simpleFlightServer(@Nonnull final URI uri, |
|
0 commit comments