diff --git a/src/it/java/io/weaviate/containers/Weaviate.java b/src/it/java/io/weaviate/containers/Weaviate.java index caaa7df88..1c7fc259e 100644 --- a/src/it/java/io/weaviate/containers/Weaviate.java +++ b/src/it/java/io/weaviate/containers/Weaviate.java @@ -197,6 +197,11 @@ public Builder withApiKeys(String... apiKeys) { return this; } + public Builder withGrpcMaxMessageSize(int bytes) { + environment.put("GRPC_MAX_MESSAGE_SIZE", String.valueOf(bytes)); + return this; + } + public Builder enableTelemetry(boolean enable) { environment.put("DISABLE_TELEMETRY", Boolean.toString(!enable)); return this; diff --git a/src/it/java/io/weaviate/integration/SearchITest.java b/src/it/java/io/weaviate/integration/SearchITest.java index 346a8b2a3..1908eab96 100644 --- a/src/it/java/io/weaviate/integration/SearchITest.java +++ b/src/it/java/io/weaviate/integration/SearchITest.java @@ -13,6 +13,7 @@ import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Test; import org.junit.rules.TestRule; @@ -667,4 +668,32 @@ public void testGenerative_bm25_groupBy() throws IOException { .extracting(TaskOutput::text, InstanceOfAssertFactories.STRING) .isNotBlank(); } + + /** + * Ensure the client respects server's configuration for max gRPC size: + * we create a server with 1-byte message size and try to send a large payload + * there. If the channel is configured correctly, it will refuse to send it. + */ + @Test + @Ignore("Exception thrown by gRPC transport causes a deadlock") + public void test_maxGrpcMessageSize() throws Exception { + var w = Weaviate.custom().withGrpcMaxMessageSize(1).build(); + var nsHugeVectors = ns("HugeVectors"); + + try (final var _client = w.getClient()) { + var huge = _client.collections.create(nsHugeVectors, c -> c + .vectorConfig(VectorConfig.selfProvided())); + + final var vector = randomVector(5000, -.01f, .01f); + final WeaviateObject, Reference, ObjectMetadata> hugeObject = WeaviateObject.of(obj -> obj + .metadata(ObjectMetadata.of(m -> m + .vectors(Vectors.of(vector))))); + + Assertions.assertThatThrownBy(() -> { + // insertMany to route this request through gRPC. + huge.data.insertMany(hugeObject); + }).isInstanceOf(io.grpc.StatusRuntimeException.class); + } + System.out.println("here?"); + } } diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/data/WeaviateDataClientAsync.java b/src/main/java/io/weaviate/client6/v1/api/collections/data/WeaviateDataClientAsync.java index 6aae3bb41..91ed0da39 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/data/WeaviateDataClientAsync.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/data/WeaviateDataClientAsync.java @@ -66,6 +66,12 @@ public final CompletableFuture insertMany(PropertiesT... obj return insertMany(InsertManyRequest.of(objects)); } + @SafeVarargs + public final CompletableFuture insertMany( + WeaviateObject... objects) { + return insertMany(Arrays.asList(objects)); + } + public CompletableFuture insertMany( List> objects) { return insertMany(new InsertManyRequest<>(objects)); diff --git a/src/main/java/io/weaviate/client6/v1/internal/grpc/DefaultGrpcTransport.java b/src/main/java/io/weaviate/client6/v1/internal/grpc/DefaultGrpcTransport.java index 9f8fc8036..d12255d22 100644 --- a/src/main/java/io/weaviate/client6/v1/internal/grpc/DefaultGrpcTransport.java +++ b/src/main/java/io/weaviate/client6/v1/internal/grpc/DefaultGrpcTransport.java @@ -43,8 +43,8 @@ public DefaultGrpcTransport(GrpcChannelOptions transportOptions) { if (transportOptions.maxMessageSize() != null) { var max = transportOptions.maxMessageSize(); - blockingStub.withMaxInboundMessageSize(max).withMaxOutboundMessageSize(max); - futureStub.withMaxInboundMessageSize(max).withMaxOutboundMessageSize(max); + blockingStub = blockingStub.withMaxInboundMessageSize(max).withMaxOutboundMessageSize(max); + futureStub = futureStub.withMaxInboundMessageSize(max).withMaxOutboundMessageSize(max); } if (transportOptions.tokenProvider() != null) {