From 882afcc8c15e90ada5caf9b0cc3c8545b076496f Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Tue, 15 Jul 2025 17:59:58 +0100 Subject: [PATCH 1/2] Add replica movement methods under `client.cluster` --- src/collections/cluster/index.ts | 168 ++++++++++++++++++- src/collections/cluster/integration.test.ts | 2 +- src/collections/cluster/unit.test.ts | 171 ++++++++++++++++++++ src/collections/types/internal.ts | 8 + src/connection/http.ts | 14 +- src/connection/index.ts | 2 +- src/openapi/types.ts | 6 + 7 files changed, 360 insertions(+), 11 deletions(-) create mode 100644 src/collections/cluster/unit.test.ts diff --git a/src/collections/cluster/index.ts b/src/collections/cluster/index.ts index 1f16a844..19cf9452 100644 --- a/src/collections/cluster/index.ts +++ b/src/collections/cluster/index.ts @@ -1,6 +1,16 @@ -import { NodesStatusGetter } from '../../cluster/index.js'; -import Connection from '../../connection/index.js'; -import { BatchStats, NodeShardStatus, NodeStats } from '../../openapi/types.js'; +import { IConnection } from '../../connection/index.js'; +import { + BatchStats, + NodeShardStatus, + NodeStats, + NodesStatusResponse, + WeaviateReplicateRequest, + WeaviateReplicateResponse, + WeaviateReplicationResponse, + WeaviateReplicationType, + WeaviateShardingState, +} from '../../openapi/types.js'; +import { DeepRequired } from '../types/internal.js'; export type Output = 'minimal' | 'verbose' | undefined; @@ -11,6 +21,41 @@ export type NodesOptions = { output: O; }; +export type QueryShardingStateOptions = { + /** The name of the collection to query. */ + collection: string; + /** The name of the shard to query. If not provided, all shards will be queried. */ + shard?: string; +}; + +export type ReplicateArguments = { + /** The name of the collection in which to replicate a shard. */ + collection: string; + /** The name of the shard to replicate. */ + shard: string; + /** The name of the node from which to replicate the shard. */ + sourceNode: string; + /** The name of the node to which to replicate the shard. */ + targetNode: string; + /** The type of replication to perform. */ + replicationType: WeaviateReplicationType; +}; + +export type ShardingState = DeepRequired; + +export type ReplicationOperation = DeepRequired; + +export type QueryReplicationOpsOptions = { + /** The name of the collection to query. */ + collection?: string; + /** The name of the shard to query. */ + shard?: string; + /** The target node of the op to query. */ + targetNode?: string; + /** Whether to include the status history in the response. */ + includeHistory?: boolean; +}; + export type Node = { name: string; status: 'HEALTHY' | 'UNHEALTHY' | 'UNAVAILABLE'; @@ -21,14 +66,65 @@ export type Node = { shards: O extends 'minimal' | undefined ? null : Required[]; }; -const cluster = (connection: Connection) => { +const cluster = (connection: IConnection) => { return { - nodes: (opts?: NodesOptions): Promise[]> => { - let builder = new NodesStatusGetter(connection).withOutput(opts?.output ? opts.output : 'minimal'); + nodes: (opts?: NodesOptions) => { + const params = new URLSearchParams(); + let path = '/nodes'; if (opts?.collection) { - builder = builder.withClassName(opts.collection); + path = path.concat(`/${opts.collection}`); } - return builder.do().then((res) => res.nodes) as Promise[]>; + params.append('output', opts?.output ? opts.output : 'minimal'); + return connection + .get(`${path}?${params.toString()}`) + .then((res) => res.nodes as Node[]); + }, + queryShardingState: (opts: QueryShardingStateOptions) => { + const params = new URLSearchParams(); + params.append('collection', opts.collection); + if (opts.shard) { + params.append('shard', opts.shard); + } + return connection + .get(`/replication/sharding-state?${params.toString()}`) + .then((res) => res as ShardingState); + }, + replicate: (args: ReplicateArguments): Promise => + connection + .postReturn( + `/replication/replicate`, + (({ replicationType, ...rest }) => ({ type: replicationType, ...rest }))(args) + ) + .then((res) => res.id), + replications: { + cancel: (id: string) => connection.postEmpty(`/replication/replicate/${id}/cancel`, {}), + delete: (id: string) => connection.delete(`/replication/replicate/${id}`, {}, false), + deleteAll: () => connection.delete(`/replication/replicate`, {}, false), + get: (id: string, opts?: { includeHistory?: boolean }): Promise => + connection + .get( + `/replication/replicate/${id}?includeHistory=${ + opts?.includeHistory ? opts?.includeHistory : 'false' + }` + ) + .then((res) => (res ? (res as ReplicationOperation) : null)), + query: (opts?: QueryReplicationOpsOptions): Promise => { + const { collection, shard, targetNode, includeHistory } = opts || {}; + const params = new URLSearchParams(); + if (collection) { + params.append('collection', collection); + } + if (shard) { + params.append('shard', shard); + } + if (targetNode) { + params.append('targetNode', targetNode); + } + if (includeHistory) { + params.append('includeHistory', includeHistory.toString()); + } + return connection.get(`/replication/replicate?${params.toString()}`); + }, }, }; }; @@ -43,4 +139,60 @@ export interface Cluster { * @returns {Promise[]>} The status of all nodes in the cluster. */ nodes: (opts?: NodesOptions) => Promise[]>; + /** + * Query the sharding state of a specific collection. + * + * @param {QueryShardingStateOptions} opts The options for the request. + * @returns {Promise} The sharding state of the collection. + */ + queryShardingState: (opts: QueryShardingStateOptions) => Promise; + /** + * Replicate a shard from one node to another. + * + * @param {ReplicateArguments} args The arguments for the replication request. + * @returns {Promise} The ID of the replication request. + */ + replicate: (args: ReplicateArguments) => Promise; + /** + * Access replication operations. + */ + replications: Replciations; +} + +export interface Replciations { + /** + * Cancel a replication operation. + * + * @param {string} id The ID of the replication operation to cancel. + * @returns {Promise} A promise that resolves when the operation is cancelled. + */ + cancel: (id: string) => Promise; + /** + * Delete a replication operation. + * + * @param {string} id The ID of the replication operation to delete. + * @returns {Promise} A promise that resolves when the operation is deleted. + */ + delete: (id: string) => Promise; + /** + * Delete all replication operations. + * + * @returns {Promise} A promise that resolves when all operations are deleted. + */ + deleteAll: () => Promise; + /** + * Get a specific replication operation by ID. + * + * @param {string} id The ID of the replication operation to get. + * @param {boolean} [opts.includeHistory=false] Whether to include the status history in the response. + * @returns {Promise} The replication operation or null if not found. + */ + get: (id: string, opts?: { includeHistory?: boolean }) => Promise; + /** + * Query all replication operations with optional filters. + * + * @param {QueryReplicationOpsOptions} [opts] Optional parameters for filtering the query. + * @returns {Promise} A list of replication operations matching the query. + */ + query: (opts?: QueryReplicationOpsOptions) => Promise; } diff --git a/src/collections/cluster/integration.test.ts b/src/collections/cluster/integration.test.ts index 1ec26095..2a9cc282 100644 --- a/src/collections/cluster/integration.test.ts +++ b/src/collections/cluster/integration.test.ts @@ -1,6 +1,6 @@ import weaviate, { WeaviateClient } from '../../index.js'; -describe('Testing of the client.cluster methods', () => { +describe('Integration testing of the client.cluster methods', () => { let client: WeaviateClient; const one = 'TestClusterCollectionOne'; diff --git a/src/collections/cluster/unit.test.ts b/src/collections/cluster/unit.test.ts new file mode 100644 index 00000000..01bfc16e --- /dev/null +++ b/src/collections/cluster/unit.test.ts @@ -0,0 +1,171 @@ +import { IConnection } from '../../connection'; +import cluster from './index.js'; + +// These tests do not validate the response from Weaviate. This is because the server responses are not mapped at all by the client so are assumed to be correct. +// Instead, these tests validate that the client sends the correct requests to the server and that the responses are handled correctly. +describe('Unit testing of the client.cluster methods', () => { + const clusterMaker = (mock: any) => cluster(mock as unknown as IConnection); + const assert = (expected: any) => (actual: any) => expect(actual).toEqual(expected); + + it('should query the nodes correctly', () => { + const opts = { collection: 'Collection', output: 'minimal' as const }; + const mockResult = { + nodes: [ + { + name: 'node1', + status: 'HEALTHY', + version: '1.0.0', + gitHash: 'abc123', + batchStats: { queueLength: 0, ratePerSecond: 0 }, + stats: undefined, + shards: null, + }, + ], + }; + const mockConnection = { + get: (path: string, expectReturnContent?: boolean | undefined) => { + expect(path).toBe('/nodes/Collection?output=minimal'); + return Promise.resolve(mockResult); + }, + }; + clusterMaker(mockConnection).nodes(opts).then(assert(mockResult.nodes)); + }); + + it('should query the sharding state correctly for a collection with a shard', () => { + const opts = { + collection: 'Collection', + shard: 'shard', + }; + const mockResult = { + collection: 'Collection', + shards: [{ shard: 'shard', replicas: ['node1', 'node2'] }], + }; + const mockConnection = { + get: (path: string, expectReturnContent?: boolean | undefined) => { + expect(path).toBe(`/replication/sharding-state?collection=Collection&shard=shard`); + return Promise.resolve(mockResult); + }, + }; + clusterMaker(mockConnection).queryShardingState(opts).then(assert(mockResult)); + }); + + it('should query the sharding state correctly for a collection without a specific shard', () => { + const opts = { + collection: 'Collection', + }; + const mockResult = { + collection: 'Collection', + shards: [ + { shard: 'shard1', replicas: ['node1'] }, + { shard: 'shard2', replicas: ['node2'] }, + ], + }; + const mockConnection = { + get: (path: string, expectReturnContent?: boolean | undefined) => { + expect(path).toBe('/replication/sharding-state?collection=Collection'); + return Promise.resolve(mockResult); + }, + }; + clusterMaker(mockConnection).queryShardingState(opts).then(assert(mockResult)); + }); + + it('should replicate a shard correctly', () => { + const args = { + collection: 'Collection', + shard: 'shard', + sourceNode: 'sourceNode', + targetNode: 'targetNode', + replicationType: 'COPY' as const, + }; + const mockResult = { id: 'replication-id' }; + const mockConnection = { + postReturn: (path: string, body: any): Promise => { + expect(path).toBe('/replication/replicate'); + expect(body).toEqual({ + collection: 'Collection', + shard: 'shard', + sourceNode: 'sourceNode', + targetNode: 'targetNode', + type: 'COPY', + }); + return Promise.resolve(mockResult); + }, + }; + clusterMaker(mockConnection).replicate(args).then(assert(mockResult.id)); + }); + + it('should get a replication operation by ID without status history', () => { + const id = 'replication-id'; + const mockResult = { id }; + const mockConnection = { + get: (path: string) => { + expect(path).toBe(`/replication/replicate/${id}?includeHistory=false`); + return Promise.resolve(mockResult); + }, + }; + clusterMaker(mockConnection).replications.get(id).then(assert(mockResult)); + }); + + it('should get a replication operation by ID with status history', () => { + const id = 'replication-id'; + const mockResult = { id }; + const mockConnection = { + get: (path: string) => { + expect(path).toBe(`/replication/replicate/${id}?includeHistory=true`); + return Promise.resolve(mockResult); + }, + }; + clusterMaker(mockConnection).replications.get(id, { includeHistory: true }).then(assert(mockResult)); + }); + + it('should cancel a replication operation', () => { + const id = 'replication-id'; + const mockConnection = { + postEmpty: (path: string): Promise => { + expect(path).toBe(`/replication/replicate/${id}/cancel`); + return Promise.resolve(); + }, + }; + clusterMaker(mockConnection).replications.cancel(id).then(assert(undefined)); + }); + + it('should delete a replication operation', () => { + const id = 'replication-id'; + const mockConnection = { + delete: (path: string) => { + expect(path).toBe(`/replication/replicate/${id}`); + return Promise.resolve(); + }, + }; + clusterMaker(mockConnection).replications.delete(id).then(assert(undefined)); + }); + + it('should delete all replication operations', () => { + const mockConnection = { + delete: (path: string) => { + expect(path).toBe(`/replication/replicate`); + return Promise.resolve(); + }, + }; + clusterMaker(mockConnection).replications.deleteAll().then(assert(undefined)); + }); + + it('should query replication operations with various filters', () => { + const opts = { + collection: 'Collection', + shard: 'shard', + targetNode: 'node1', + includeHistory: true, + }; + const mockResult = [{ id: 'replication-id' }]; + const mockConnection = { + get: (path: string) => { + expect(path).toBe( + `/replication/replicate?collection=Collection&shard=shard&targetNode=node1&includeHistory=true` + ); + return Promise.resolve(mockResult); + }, + }; + clusterMaker(mockConnection).replications.query(opts).then(assert(mockResult)); + }); +}); diff --git a/src/collections/types/internal.ts b/src/collections/types/internal.ts index 0227d769..5b4fe0d6 100644 --- a/src/collections/types/internal.ts +++ b/src/collections/types/internal.ts @@ -134,3 +134,11 @@ type AtLeastOne = { }[keyof T]; export type NonEmpty = keyof T extends never ? never : T; + +export type DeepRequired = T extends Function + ? T + : T extends Array + ? Array> + : T extends object + ? { [K in keyof T]-?: DeepRequired> } + : T; diff --git a/src/connection/http.ts b/src/connection/http.ts index c4f5e6a9..db80b97c 100644 --- a/src/connection/http.ts +++ b/src/connection/http.ts @@ -58,7 +58,19 @@ export interface ConnectionDetails { headers?: HeadersInit; } -export default class ConnectionREST { +export interface IConnection { + postReturn: (path: string, payload: B) => Promise; + postEmpty: (path: string, payload: B) => Promise; + put: (path: string, payload: any, expectReturnContent?: boolean) => Promise; + patch: (path: string, payload: any) => Promise; + delete: (path: string, payload: any, expectReturnContent?: boolean) => Promise; + head: (path: string, payload: any) => Promise; + get: (path: string, expectReturnContent?: boolean) => Promise; + login(): Promise; + getDetails(): Promise; +} + +export default class ConnectionREST implements IConnection { private apiKey?: string; private headers?: HeadersInit; protected authEnabled: boolean; diff --git a/src/connection/index.ts b/src/connection/index.ts index 45b60544..f2440149 100644 --- a/src/connection/index.ts +++ b/src/connection/index.ts @@ -11,5 +11,5 @@ export type { ConnectToWCSOptions, ConnectToWeaviateCloudOptions, } from './helpers.js'; -export type { InternalConnectionParams } from './http.js'; +export type { IConnection, InternalConnectionParams } from './http.js'; export { ConnectionGQL, ConnectionGRPC, ConnectionREST }; diff --git a/src/openapi/types.ts b/src/openapi/types.ts index 59e6e7d1..41101239 100644 --- a/src/openapi/types.ts +++ b/src/openapi/types.ts @@ -74,3 +74,9 @@ export type WeaviateUserType = definitions['UserTypeOutput']; export type WeaviateUserTypeInternal = definitions['UserTypeInput']; export type WeaviateUserTypeDB = definitions['DBUserInfo']['dbUserType']; export type WeaviateAssignedUser = operations['getUsersForRole']['responses']['200']['schema'][0]; +// Cluster +export type WeaviateShardingState = definitions['ReplicationShardingState']; +export type WeaviateReplicationType = definitions['ReplicationReplicateDetailsReplicaResponse']['type']; +export type WeaviateReplicateRequest = definitions['ReplicationReplicateReplicaRequest']; +export type WeaviateReplicateResponse = definitions['ReplicationReplicateReplicaResponse']; +export type WeaviateReplicationResponse = definitions['ReplicationReplicateDetailsReplicaResponse']; From 4b1b20e33afd50e053bd16fa7f38353d9c60617d Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Wed, 16 Jul 2025 13:39:35 +0100 Subject: [PATCH 2/2] Make changes in response to review --- src/collections/cluster/index.ts | 34 ++++++++++++++++------------ src/collections/cluster/unit.test.ts | 8 ++----- src/collections/types/internal.ts | 8 ------- src/utils/types.ts | 5 ++++ 4 files changed, 26 insertions(+), 29 deletions(-) create mode 100644 src/utils/types.ts diff --git a/src/collections/cluster/index.ts b/src/collections/cluster/index.ts index 19cf9452..592ebc45 100644 --- a/src/collections/cluster/index.ts +++ b/src/collections/cluster/index.ts @@ -10,7 +10,7 @@ import { WeaviateReplicationType, WeaviateShardingState, } from '../../openapi/types.js'; -import { DeepRequired } from '../types/internal.js'; +import { DeepRequired } from '../../utils/types.js'; export type Output = 'minimal' | 'verbose' | undefined; @@ -22,13 +22,11 @@ export type NodesOptions = { }; export type QueryShardingStateOptions = { - /** The name of the collection to query. */ - collection: string; /** The name of the shard to query. If not provided, all shards will be queried. */ shard?: string; }; -export type ReplicateArguments = { +export type ReplicateArgs = { /** The name of the collection in which to replicate a shard. */ collection: string; /** The name of the shard to replicate. */ @@ -56,6 +54,11 @@ export type QueryReplicationOpsOptions = { includeHistory?: boolean; }; +export type GetReplicationOpOptions = { + /** Whether to include the status history in the response. Defaults to false. */ + includeHistory?: boolean; +}; + export type Node = { name: string; status: 'HEALTHY' | 'UNHEALTHY' | 'UNAVAILABLE'; @@ -79,17 +82,17 @@ const cluster = (connection: IConnection) => { .get(`${path}?${params.toString()}`) .then((res) => res.nodes as Node[]); }, - queryShardingState: (opts: QueryShardingStateOptions) => { + queryShardingState: (collection: string, opts?: QueryShardingStateOptions) => { const params = new URLSearchParams(); - params.append('collection', opts.collection); - if (opts.shard) { + params.append('collection', collection); + if (opts?.shard) { params.append('shard', opts.shard); } return connection .get(`/replication/sharding-state?${params.toString()}`) .then((res) => res as ShardingState); }, - replicate: (args: ReplicateArguments): Promise => + replicate: (args: ReplicateArgs): Promise => connection .postReturn( `/replication/replicate`, @@ -100,7 +103,7 @@ const cluster = (connection: IConnection) => { cancel: (id: string) => connection.postEmpty(`/replication/replicate/${id}/cancel`, {}), delete: (id: string) => connection.delete(`/replication/replicate/${id}`, {}, false), deleteAll: () => connection.delete(`/replication/replicate`, {}, false), - get: (id: string, opts?: { includeHistory?: boolean }): Promise => + get: (id: string, opts?: GetReplicationOpOptions): Promise => connection .get( `/replication/replicate/${id}?includeHistory=${ @@ -142,24 +145,25 @@ export interface Cluster { /** * Query the sharding state of a specific collection. * - * @param {QueryShardingStateOptions} opts The options for the request. + * @param {string} collection The name of the collection to query. + * @param {QueryShardingStateOptions} [opts] The options for the request. * @returns {Promise} The sharding state of the collection. */ - queryShardingState: (opts: QueryShardingStateOptions) => Promise; + queryShardingState: (collection: string, opts?: QueryShardingStateOptions) => Promise; /** * Replicate a shard from one node to another. * - * @param {ReplicateArguments} args The arguments for the replication request. + * @param {ReplicateArgs} args The arguments for the replication request. * @returns {Promise} The ID of the replication request. */ - replicate: (args: ReplicateArguments) => Promise; + replicate: (args: ReplicateArgs) => Promise; /** * Access replication operations. */ - replications: Replciations; + replications: Replications; } -export interface Replciations { +export interface Replications { /** * Cancel a replication operation. * diff --git a/src/collections/cluster/unit.test.ts b/src/collections/cluster/unit.test.ts index 01bfc16e..1fec6ee5 100644 --- a/src/collections/cluster/unit.test.ts +++ b/src/collections/cluster/unit.test.ts @@ -33,7 +33,6 @@ describe('Unit testing of the client.cluster methods', () => { it('should query the sharding state correctly for a collection with a shard', () => { const opts = { - collection: 'Collection', shard: 'shard', }; const mockResult = { @@ -46,13 +45,10 @@ describe('Unit testing of the client.cluster methods', () => { return Promise.resolve(mockResult); }, }; - clusterMaker(mockConnection).queryShardingState(opts).then(assert(mockResult)); + clusterMaker(mockConnection).queryShardingState('Collection', opts).then(assert(mockResult)); }); it('should query the sharding state correctly for a collection without a specific shard', () => { - const opts = { - collection: 'Collection', - }; const mockResult = { collection: 'Collection', shards: [ @@ -66,7 +62,7 @@ describe('Unit testing of the client.cluster methods', () => { return Promise.resolve(mockResult); }, }; - clusterMaker(mockConnection).queryShardingState(opts).then(assert(mockResult)); + clusterMaker(mockConnection).queryShardingState('Collection').then(assert(mockResult)); }); it('should replicate a shard correctly', () => { diff --git a/src/collections/types/internal.ts b/src/collections/types/internal.ts index 5b4fe0d6..0227d769 100644 --- a/src/collections/types/internal.ts +++ b/src/collections/types/internal.ts @@ -134,11 +134,3 @@ type AtLeastOne = { }[keyof T]; export type NonEmpty = keyof T extends never ? never : T; - -export type DeepRequired = T extends Function - ? T - : T extends Array - ? Array> - : T extends object - ? { [K in keyof T]-?: DeepRequired> } - : T; diff --git a/src/utils/types.ts b/src/utils/types.ts new file mode 100644 index 00000000..181decb1 --- /dev/null +++ b/src/utils/types.ts @@ -0,0 +1,5 @@ +export type DeepRequired = T extends Array + ? Array> + : T extends object + ? { [K in keyof T]-?: DeepRequired> } + : T;