diff --git a/src/collections/cluster/index.ts b/src/collections/cluster/index.ts index 1f16a844..592ebc45 100644 --- a/src/collections/cluster/index.ts +++ b/src/collections/cluster/index.ts @@ -1,6 +1,16 @@ -import { NodesStatusGetter } from '../../cluster/index.js'; -import Connection from '../../connection/index.js'; -import { BatchStats, NodeShardStatus, NodeStats } from '../../openapi/types.js'; +import { IConnection } from '../../connection/index.js'; +import { + BatchStats, + NodeShardStatus, + NodeStats, + NodesStatusResponse, + WeaviateReplicateRequest, + WeaviateReplicateResponse, + WeaviateReplicationResponse, + WeaviateReplicationType, + WeaviateShardingState, +} from '../../openapi/types.js'; +import { DeepRequired } from '../../utils/types.js'; export type Output = 'minimal' | 'verbose' | undefined; @@ -11,6 +21,44 @@ export type NodesOptions = { output: O; }; +export type QueryShardingStateOptions = { + /** The name of the shard to query. If not provided, all shards will be queried. */ + shard?: string; +}; + +export type ReplicateArgs = { + /** The name of the collection in which to replicate a shard. */ + collection: string; + /** The name of the shard to replicate. */ + shard: string; + /** The name of the node from which to replicate the shard. */ + sourceNode: string; + /** The name of the node to which to replicate the shard. */ + targetNode: string; + /** The type of replication to perform. */ + replicationType: WeaviateReplicationType; +}; + +export type ShardingState = DeepRequired; + +export type ReplicationOperation = DeepRequired; + +export type QueryReplicationOpsOptions = { + /** The name of the collection to query. */ + collection?: string; + /** The name of the shard to query. */ + shard?: string; + /** The target node of the op to query. */ + targetNode?: string; + /** Whether to include the status history in the response. */ + includeHistory?: boolean; +}; + +export type GetReplicationOpOptions = { + /** Whether to include the status history in the response. Defaults to false. */ + includeHistory?: boolean; +}; + export type Node = { name: string; status: 'HEALTHY' | 'UNHEALTHY' | 'UNAVAILABLE'; @@ -21,14 +69,65 @@ export type Node = { shards: O extends 'minimal' | undefined ? null : Required[]; }; -const cluster = (connection: Connection) => { +const cluster = (connection: IConnection) => { return { - nodes: (opts?: NodesOptions): Promise[]> => { - let builder = new NodesStatusGetter(connection).withOutput(opts?.output ? opts.output : 'minimal'); + nodes: (opts?: NodesOptions) => { + const params = new URLSearchParams(); + let path = '/nodes'; if (opts?.collection) { - builder = builder.withClassName(opts.collection); + path = path.concat(`/${opts.collection}`); + } + params.append('output', opts?.output ? opts.output : 'minimal'); + return connection + .get(`${path}?${params.toString()}`) + .then((res) => res.nodes as Node[]); + }, + queryShardingState: (collection: string, opts?: QueryShardingStateOptions) => { + const params = new URLSearchParams(); + params.append('collection', collection); + if (opts?.shard) { + params.append('shard', opts.shard); } - return builder.do().then((res) => res.nodes) as Promise[]>; + return connection + .get(`/replication/sharding-state?${params.toString()}`) + .then((res) => res as ShardingState); + }, + replicate: (args: ReplicateArgs): Promise => + connection + .postReturn( + `/replication/replicate`, + (({ replicationType, ...rest }) => ({ type: replicationType, ...rest }))(args) + ) + .then((res) => res.id), + replications: { + cancel: (id: string) => connection.postEmpty(`/replication/replicate/${id}/cancel`, {}), + delete: (id: string) => connection.delete(`/replication/replicate/${id}`, {}, false), + deleteAll: () => connection.delete(`/replication/replicate`, {}, false), + get: (id: string, opts?: GetReplicationOpOptions): Promise => + connection + .get( + `/replication/replicate/${id}?includeHistory=${ + opts?.includeHistory ? opts?.includeHistory : 'false' + }` + ) + .then((res) => (res ? (res as ReplicationOperation) : null)), + query: (opts?: QueryReplicationOpsOptions): Promise => { + const { collection, shard, targetNode, includeHistory } = opts || {}; + const params = new URLSearchParams(); + if (collection) { + params.append('collection', collection); + } + if (shard) { + params.append('shard', shard); + } + if (targetNode) { + params.append('targetNode', targetNode); + } + if (includeHistory) { + params.append('includeHistory', includeHistory.toString()); + } + return connection.get(`/replication/replicate?${params.toString()}`); + }, }, }; }; @@ -43,4 +142,61 @@ export interface Cluster { * @returns {Promise[]>} The status of all nodes in the cluster. */ nodes: (opts?: NodesOptions) => Promise[]>; + /** + * Query the sharding state of a specific collection. + * + * @param {string} collection The name of the collection to query. + * @param {QueryShardingStateOptions} [opts] The options for the request. + * @returns {Promise} The sharding state of the collection. + */ + queryShardingState: (collection: string, opts?: QueryShardingStateOptions) => Promise; + /** + * Replicate a shard from one node to another. + * + * @param {ReplicateArgs} args The arguments for the replication request. + * @returns {Promise} The ID of the replication request. + */ + replicate: (args: ReplicateArgs) => Promise; + /** + * Access replication operations. + */ + replications: Replications; +} + +export interface Replications { + /** + * Cancel a replication operation. + * + * @param {string} id The ID of the replication operation to cancel. + * @returns {Promise} A promise that resolves when the operation is cancelled. + */ + cancel: (id: string) => Promise; + /** + * Delete a replication operation. + * + * @param {string} id The ID of the replication operation to delete. + * @returns {Promise} A promise that resolves when the operation is deleted. + */ + delete: (id: string) => Promise; + /** + * Delete all replication operations. + * + * @returns {Promise} A promise that resolves when all operations are deleted. + */ + deleteAll: () => Promise; + /** + * Get a specific replication operation by ID. + * + * @param {string} id The ID of the replication operation to get. + * @param {boolean} [opts.includeHistory=false] Whether to include the status history in the response. + * @returns {Promise} The replication operation or null if not found. + */ + get: (id: string, opts?: { includeHistory?: boolean }) => Promise; + /** + * Query all replication operations with optional filters. + * + * @param {QueryReplicationOpsOptions} [opts] Optional parameters for filtering the query. + * @returns {Promise} A list of replication operations matching the query. + */ + query: (opts?: QueryReplicationOpsOptions) => Promise; } diff --git a/src/collections/cluster/integration.test.ts b/src/collections/cluster/integration.test.ts index 1ec26095..2a9cc282 100644 --- a/src/collections/cluster/integration.test.ts +++ b/src/collections/cluster/integration.test.ts @@ -1,6 +1,6 @@ import weaviate, { WeaviateClient } from '../../index.js'; -describe('Testing of the client.cluster methods', () => { +describe('Integration testing of the client.cluster methods', () => { let client: WeaviateClient; const one = 'TestClusterCollectionOne'; diff --git a/src/collections/cluster/unit.test.ts b/src/collections/cluster/unit.test.ts new file mode 100644 index 00000000..1fec6ee5 --- /dev/null +++ b/src/collections/cluster/unit.test.ts @@ -0,0 +1,167 @@ +import { IConnection } from '../../connection'; +import cluster from './index.js'; + +// These tests do not validate the response from Weaviate. This is because the server responses are not mapped at all by the client so are assumed to be correct. +// Instead, these tests validate that the client sends the correct requests to the server and that the responses are handled correctly. +describe('Unit testing of the client.cluster methods', () => { + const clusterMaker = (mock: any) => cluster(mock as unknown as IConnection); + const assert = (expected: any) => (actual: any) => expect(actual).toEqual(expected); + + it('should query the nodes correctly', () => { + const opts = { collection: 'Collection', output: 'minimal' as const }; + const mockResult = { + nodes: [ + { + name: 'node1', + status: 'HEALTHY', + version: '1.0.0', + gitHash: 'abc123', + batchStats: { queueLength: 0, ratePerSecond: 0 }, + stats: undefined, + shards: null, + }, + ], + }; + const mockConnection = { + get: (path: string, expectReturnContent?: boolean | undefined) => { + expect(path).toBe('/nodes/Collection?output=minimal'); + return Promise.resolve(mockResult); + }, + }; + clusterMaker(mockConnection).nodes(opts).then(assert(mockResult.nodes)); + }); + + it('should query the sharding state correctly for a collection with a shard', () => { + const opts = { + shard: 'shard', + }; + const mockResult = { + collection: 'Collection', + shards: [{ shard: 'shard', replicas: ['node1', 'node2'] }], + }; + const mockConnection = { + get: (path: string, expectReturnContent?: boolean | undefined) => { + expect(path).toBe(`/replication/sharding-state?collection=Collection&shard=shard`); + return Promise.resolve(mockResult); + }, + }; + clusterMaker(mockConnection).queryShardingState('Collection', opts).then(assert(mockResult)); + }); + + it('should query the sharding state correctly for a collection without a specific shard', () => { + const mockResult = { + collection: 'Collection', + shards: [ + { shard: 'shard1', replicas: ['node1'] }, + { shard: 'shard2', replicas: ['node2'] }, + ], + }; + const mockConnection = { + get: (path: string, expectReturnContent?: boolean | undefined) => { + expect(path).toBe('/replication/sharding-state?collection=Collection'); + return Promise.resolve(mockResult); + }, + }; + clusterMaker(mockConnection).queryShardingState('Collection').then(assert(mockResult)); + }); + + it('should replicate a shard correctly', () => { + const args = { + collection: 'Collection', + shard: 'shard', + sourceNode: 'sourceNode', + targetNode: 'targetNode', + replicationType: 'COPY' as const, + }; + const mockResult = { id: 'replication-id' }; + const mockConnection = { + postReturn: (path: string, body: any): Promise => { + expect(path).toBe('/replication/replicate'); + expect(body).toEqual({ + collection: 'Collection', + shard: 'shard', + sourceNode: 'sourceNode', + targetNode: 'targetNode', + type: 'COPY', + }); + return Promise.resolve(mockResult); + }, + }; + clusterMaker(mockConnection).replicate(args).then(assert(mockResult.id)); + }); + + it('should get a replication operation by ID without status history', () => { + const id = 'replication-id'; + const mockResult = { id }; + const mockConnection = { + get: (path: string) => { + expect(path).toBe(`/replication/replicate/${id}?includeHistory=false`); + return Promise.resolve(mockResult); + }, + }; + clusterMaker(mockConnection).replications.get(id).then(assert(mockResult)); + }); + + it('should get a replication operation by ID with status history', () => { + const id = 'replication-id'; + const mockResult = { id }; + const mockConnection = { + get: (path: string) => { + expect(path).toBe(`/replication/replicate/${id}?includeHistory=true`); + return Promise.resolve(mockResult); + }, + }; + clusterMaker(mockConnection).replications.get(id, { includeHistory: true }).then(assert(mockResult)); + }); + + it('should cancel a replication operation', () => { + const id = 'replication-id'; + const mockConnection = { + postEmpty: (path: string): Promise => { + expect(path).toBe(`/replication/replicate/${id}/cancel`); + return Promise.resolve(); + }, + }; + clusterMaker(mockConnection).replications.cancel(id).then(assert(undefined)); + }); + + it('should delete a replication operation', () => { + const id = 'replication-id'; + const mockConnection = { + delete: (path: string) => { + expect(path).toBe(`/replication/replicate/${id}`); + return Promise.resolve(); + }, + }; + clusterMaker(mockConnection).replications.delete(id).then(assert(undefined)); + }); + + it('should delete all replication operations', () => { + const mockConnection = { + delete: (path: string) => { + expect(path).toBe(`/replication/replicate`); + return Promise.resolve(); + }, + }; + clusterMaker(mockConnection).replications.deleteAll().then(assert(undefined)); + }); + + it('should query replication operations with various filters', () => { + const opts = { + collection: 'Collection', + shard: 'shard', + targetNode: 'node1', + includeHistory: true, + }; + const mockResult = [{ id: 'replication-id' }]; + const mockConnection = { + get: (path: string) => { + expect(path).toBe( + `/replication/replicate?collection=Collection&shard=shard&targetNode=node1&includeHistory=true` + ); + return Promise.resolve(mockResult); + }, + }; + clusterMaker(mockConnection).replications.query(opts).then(assert(mockResult)); + }); +}); diff --git a/src/connection/http.ts b/src/connection/http.ts index c4f5e6a9..db80b97c 100644 --- a/src/connection/http.ts +++ b/src/connection/http.ts @@ -58,7 +58,19 @@ export interface ConnectionDetails { headers?: HeadersInit; } -export default class ConnectionREST { +export interface IConnection { + postReturn: (path: string, payload: B) => Promise; + postEmpty: (path: string, payload: B) => Promise; + put: (path: string, payload: any, expectReturnContent?: boolean) => Promise; + patch: (path: string, payload: any) => Promise; + delete: (path: string, payload: any, expectReturnContent?: boolean) => Promise; + head: (path: string, payload: any) => Promise; + get: (path: string, expectReturnContent?: boolean) => Promise; + login(): Promise; + getDetails(): Promise; +} + +export default class ConnectionREST implements IConnection { private apiKey?: string; private headers?: HeadersInit; protected authEnabled: boolean; diff --git a/src/connection/index.ts b/src/connection/index.ts index 45b60544..f2440149 100644 --- a/src/connection/index.ts +++ b/src/connection/index.ts @@ -11,5 +11,5 @@ export type { ConnectToWCSOptions, ConnectToWeaviateCloudOptions, } from './helpers.js'; -export type { InternalConnectionParams } from './http.js'; +export type { IConnection, InternalConnectionParams } from './http.js'; export { ConnectionGQL, ConnectionGRPC, ConnectionREST }; diff --git a/src/openapi/types.ts b/src/openapi/types.ts index b935baa9..ea746b9a 100644 --- a/src/openapi/types.ts +++ b/src/openapi/types.ts @@ -74,6 +74,12 @@ export type WeaviateUserType = definitions['UserTypeOutput']; export type WeaviateUserTypeInternal = definitions['UserTypeInput']; export type WeaviateUserTypeDB = definitions['DBUserInfo']['dbUserType']; export type WeaviateAssignedUser = operations['getUsersForRole']['responses']['200']['schema'][0]; +// Cluster +export type WeaviateShardingState = definitions['ReplicationShardingState']; +export type WeaviateReplicationType = definitions['ReplicationReplicateDetailsReplicaResponse']['type']; +export type WeaviateReplicateRequest = definitions['ReplicationReplicateReplicaRequest']; +export type WeaviateReplicateResponse = definitions['ReplicationReplicateReplicaResponse']; +export type WeaviateReplicationResponse = definitions['ReplicationReplicateDetailsReplicaResponse']; // Alias export type WeaviateAlias = definitions['Alias']; export type WeaviateAliasResponse = { diff --git a/src/utils/types.ts b/src/utils/types.ts new file mode 100644 index 00000000..181decb1 --- /dev/null +++ b/src/utils/types.ts @@ -0,0 +1,5 @@ +export type DeepRequired = T extends Array + ? Array> + : T extends object + ? { [K in keyof T]-?: DeepRequired> } + : T;