Skip to content

Commit 4dafb72

Browse files
committed
Add init check to gRPC connection to retrieve max message size limit if it exists from v1/meta
1 parent 196b074 commit 4dafb72

File tree

3 files changed

+45
-20
lines changed

3 files changed

+45
-20
lines changed

src/connection/grpc.ts

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import TenantsManager, { Tenants } from '../grpc/tenantsManager.js';
1717
import { DbVersionSupport, initDbVersionProvider } from '../utils/dbVersion.js';
1818

1919
import { WeaviateGRPCUnavailableError, WeaviateUnsupportedFeatureError } from '../errors.js';
20+
import { Meta } from '../openapi/types.js';
2021

2122
export interface GrpcConnectionParams extends InternalConnectionParams {
2223
grpcAddress: string;
@@ -31,40 +32,62 @@ const MAX_GRPC_MESSAGE_LENGTH = 104858000; // 10mb, needs to be synchronized wit
3132
// which are tightly coupled to ConnectionGQL
3233
export default class ConnectionGRPC extends ConnectionGQL {
3334
private grpc: GrpcClient;
34-
private grpcAddress: string;
3535

36-
private constructor(params: GrpcConnectionParams) {
36+
private constructor(params: GrpcConnectionParams & { grpcMaxMessageLength: number }) {
3737
super(params);
3838
this.grpc = grpcClient(params);
39-
this.grpcAddress = params.grpcAddress;
4039
}
4140

42-
static use = async (params: GrpcConnectionParams) => {
43-
const connection = new ConnectionGRPC(params);
44-
const dbVersionProvider = initDbVersionProvider(connection);
41+
static use = (params: GrpcConnectionParams) => {
42+
const rest = new ConnectionGQL(params);
43+
const dbVersionProvider = initDbVersionProvider(rest);
4544
const dbVersionSupport = new DbVersionSupport(dbVersionProvider);
4645
if (params.skipInitChecks) {
47-
return { connection, dbVersionProvider, dbVersionSupport };
46+
return {
47+
connection: new ConnectionGRPC({
48+
...params,
49+
grpcMaxMessageLength: MAX_GRPC_MESSAGE_LENGTH,
50+
}),
51+
dbVersionProvider,
52+
dbVersionSupport,
53+
};
4854
}
49-
await Promise.all([
55+
return Promise.all([
56+
ConnectionGRPC.connect(
57+
params,
58+
(rest.get('v1/meta', true) as Promise<Meta>).then(
59+
(res: Meta) => res.grpcMaxMessageSize || MAX_GRPC_MESSAGE_LENGTH
60+
)
61+
),
5062
dbVersionSupport.supportsCompatibleGrpcService().then((check) => {
5163
if (!check.supports) {
5264
throw new WeaviateUnsupportedFeatureError(
5365
`Checking for gRPC compatibility failed with message: ${check.message}`
5466
);
5567
}
5668
}),
57-
connection.connect(),
58-
]);
59-
return { connection, dbVersionProvider, dbVersionSupport };
69+
]).then(([connection]) => {
70+
return { connection, dbVersionProvider, dbVersionSupport };
71+
});
6072
};
6173

62-
private async connect() {
63-
const isHealthy = await this.grpc.health();
74+
private static async connect(
75+
params: GrpcConnectionParams,
76+
grpcMaxLengthPromise: Promise<number>
77+
): Promise<ConnectionGRPC> {
78+
const connection = await grpcMaxLengthPromise.then(
79+
(grpcMaxMessageLength) =>
80+
new ConnectionGRPC({
81+
...params,
82+
grpcMaxMessageLength,
83+
})
84+
);
85+
const isHealthy = await connection.grpc.health();
6486
if (!isHealthy) {
65-
await this.close();
66-
throw new WeaviateGRPCUnavailableError(this.grpcAddress);
87+
await connection.close();
88+
throw new WeaviateGRPCUnavailableError(params.grpcAddress);
6789
}
90+
return connection;
6891
}
6992

7093
search = (collection: string, consistencyLevel?: ConsistencyLevel, tenant?: string) => {
@@ -116,10 +139,10 @@ export interface GrpcClient {
116139
tenants: (collection: string, bearerToken?: string) => Tenants;
117140
}
118141

119-
export const grpcClient = (config: GrpcConnectionParams): GrpcClient => {
142+
export const grpcClient = (config: GrpcConnectionParams & { grpcMaxMessageLength: number }): GrpcClient => {
120143
const channelOptions: ChannelOptions = {
121-
'grpc.max_send_message_length': MAX_GRPC_MESSAGE_LENGTH,
122-
'grpc.max_receive_message_length': MAX_GRPC_MESSAGE_LENGTH,
144+
'grpc.max_send_message_length': config.grpcMaxMessageLength,
145+
'grpc.max_receive_message_length': config.grpcMaxMessageLength,
123146
};
124147
if (config.grpcProxyUrl) {
125148
// grpc.http_proxy is not used by grpc.js under-the-hood

src/openapi/schema.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,8 @@ export interface definitions {
398398
version?: string;
399399
/** @description Module-specific meta information */
400400
modules?: { [key: string]: unknown };
401+
/** @description Max message size for GRPC connection in bytes */
402+
grpcMaxMessageSize?: number;
401403
};
402404
/** @description Multiple instances of references to other objects. */
403405
MultipleRef: definitions['SingleRef'][];

src/utils/dbVersion.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import ConnectionGRPC from '../connection/grpc.js';
1+
import { ConnectionGQL } from '../index.js';
22
import MetaGetter from '../misc/metaGetter.js';
33

44
export class DbVersionSupport {
@@ -239,7 +239,7 @@ export class DbVersionProvider implements VersionProvider {
239239
}
240240
}
241241

242-
export function initDbVersionProvider(conn: ConnectionGRPC) {
242+
export function initDbVersionProvider(conn: ConnectionGQL) {
243243
const metaGetter = new MetaGetter(conn);
244244
const versionGetter = () => {
245245
return metaGetter.do().then((result) => (result.version ? result.version : ''));

0 commit comments

Comments
 (0)