Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 160 additions & 8 deletions src/collections/cluster/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
import { NodesStatusGetter } from '../../cluster/index.js';
import Connection from '../../connection/index.js';
import { BatchStats, NodeShardStatus, NodeStats } from '../../openapi/types.js';
import { IConnection } from '../../connection/index.js';
import {
BatchStats,
NodeShardStatus,
NodeStats,
NodesStatusResponse,
WeaviateReplicateRequest,
WeaviateReplicateResponse,
WeaviateReplicationResponse,
WeaviateReplicationType,
WeaviateShardingState,
} from '../../openapi/types.js';
import { DeepRequired } from '../types/internal.js';

export type Output = 'minimal' | 'verbose' | undefined;

Expand All @@ -11,6 +21,41 @@ export type NodesOptions<O extends Output> = {
output: O;
};

export type QueryShardingStateOptions = {
/** The name of the collection to query. */
collection: string;
/** The name of the shard to query. If not provided, all shards will be queried. */
shard?: string;
};

export type ReplicateArguments = {
/** The name of the collection in which to replicate a shard. */
collection: string;
/** The name of the shard to replicate. */
shard: string;
/** The name of the node from which to replicate the shard. */
sourceNode: string;
/** The name of the node to which to replicate the shard. */
targetNode: string;
/** The type of replication to perform. */
replicationType: WeaviateReplicationType;
};

export type ShardingState = DeepRequired<WeaviateShardingState>;

export type ReplicationOperation = DeepRequired<WeaviateReplicationResponse>;

export type QueryReplicationOpsOptions = {
/** The name of the collection to query. */
collection?: string;
/** The name of the shard to query. */
shard?: string;
/** The target node of the op to query. */
targetNode?: string;
/** Whether to include the status history in the response. */
includeHistory?: boolean;
};

export type Node<O extends Output> = {
name: string;
status: 'HEALTHY' | 'UNHEALTHY' | 'UNAVAILABLE';
Expand All @@ -21,14 +66,65 @@ export type Node<O extends Output> = {
shards: O extends 'minimal' | undefined ? null : Required<NodeShardStatus>[];
};

const cluster = (connection: Connection) => {
const cluster = (connection: IConnection) => {
return {
nodes: <O extends Output = undefined>(opts?: NodesOptions<O>): Promise<Node<O>[]> => {
let builder = new NodesStatusGetter(connection).withOutput(opts?.output ? opts.output : 'minimal');
nodes: <O extends Output = undefined>(opts?: NodesOptions<O>) => {
const params = new URLSearchParams();
let path = '/nodes';
if (opts?.collection) {
builder = builder.withClassName(opts.collection);
path = path.concat(`/${opts.collection}`);
}
return builder.do().then((res) => res.nodes) as Promise<Node<O>[]>;
params.append('output', opts?.output ? opts.output : 'minimal');
return connection
.get<NodesStatusResponse>(`${path}?${params.toString()}`)
.then((res) => res.nodes as Node<O>[]);
},
queryShardingState: (opts: QueryShardingStateOptions) => {
const params = new URLSearchParams();
params.append('collection', opts.collection);
if (opts.shard) {
params.append('shard', opts.shard);
}
return connection
.get<ShardingState | undefined>(`/replication/sharding-state?${params.toString()}`)
.then((res) => res as ShardingState);
},
replicate: (args: ReplicateArguments): Promise<string> =>
connection
.postReturn<WeaviateReplicateRequest, WeaviateReplicateResponse>(
`/replication/replicate`,
(({ replicationType, ...rest }) => ({ type: replicationType, ...rest }))(args)
)
.then((res) => res.id),
replications: {
cancel: (id: string) => connection.postEmpty(`/replication/replicate/${id}/cancel`, {}),
delete: (id: string) => connection.delete(`/replication/replicate/${id}`, {}, false),
deleteAll: () => connection.delete(`/replication/replicate`, {}, false),
get: (id: string, opts?: { includeHistory?: boolean }): Promise<ReplicationOperation | null> =>
connection
.get<ReplicationOperation | undefined>(
`/replication/replicate/${id}?includeHistory=${
opts?.includeHistory ? opts?.includeHistory : 'false'
}`
)
.then((res) => (res ? (res as ReplicationOperation) : null)),
query: (opts?: QueryReplicationOpsOptions): Promise<ReplicationOperation[]> => {
const { collection, shard, targetNode, includeHistory } = opts || {};
const params = new URLSearchParams();
if (collection) {
params.append('collection', collection);
}
if (shard) {
params.append('shard', shard);
}
if (targetNode) {
params.append('targetNode', targetNode);
}
if (includeHistory) {
params.append('includeHistory', includeHistory.toString());
}
return connection.get<ReplicationOperation[]>(`/replication/replicate?${params.toString()}`);
},
},
};
};
Expand All @@ -43,4 +139,60 @@ export interface Cluster {
* @returns {Promise<Node<O>[]>} The status of all nodes in the cluster.
*/
nodes: <O extends Output = undefined>(opts?: NodesOptions<O>) => Promise<Node<O>[]>;
/**
* Query the sharding state of a specific collection.
*
* @param {QueryShardingStateOptions} opts The options for the request.
* @returns {Promise<ShardingState>} The sharding state of the collection.
*/
queryShardingState: (opts: QueryShardingStateOptions) => Promise<ShardingState>;
/**
* Replicate a shard from one node to another.
*
* @param {ReplicateArguments} args The arguments for the replication request.
* @returns {Promise<string>} The ID of the replication request.
*/
replicate: (args: ReplicateArguments) => Promise<string>;
/**
* Access replication operations.
*/
replications: Replciations;
}

export interface Replciations {
/**
* Cancel a replication operation.
*
* @param {string} id The ID of the replication operation to cancel.
* @returns {Promise<void>} A promise that resolves when the operation is cancelled.
*/
cancel: (id: string) => Promise<void>;
/**
* Delete a replication operation.
*
* @param {string} id The ID of the replication operation to delete.
* @returns {Promise<void>} A promise that resolves when the operation is deleted.
*/
delete: (id: string) => Promise<void>;
/**
* Delete all replication operations.
*
* @returns {Promise<void>} A promise that resolves when all operations are deleted.
*/
deleteAll: () => Promise<void>;
/**
* Get a specific replication operation by ID.
*
* @param {string} id The ID of the replication operation to get.
* @param {boolean} [opts.includeHistory=false] Whether to include the status history in the response.
* @returns {Promise<ReplicationOperation | null>} The replication operation or null if not found.
*/
get: (id: string, opts?: { includeHistory?: boolean }) => Promise<ReplicationOperation | null>;
/**
* Query all replication operations with optional filters.
*
* @param {QueryReplicationOpsOptions} [opts] Optional parameters for filtering the query.
* @returns {Promise<ReplicationOperation[]>} A list of replication operations matching the query.
*/
query: (opts?: QueryReplicationOpsOptions) => Promise<ReplicationOperation[]>;
}
2 changes: 1 addition & 1 deletion src/collections/cluster/integration.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import weaviate, { WeaviateClient } from '../../index.js';

describe('Testing of the client.cluster methods', () => {
describe('Integration testing of the client.cluster methods', () => {
let client: WeaviateClient;

const one = 'TestClusterCollectionOne';
Expand Down
171 changes: 171 additions & 0 deletions src/collections/cluster/unit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import { IConnection } from '../../connection';
import cluster from './index.js';

// These tests do not validate the response from Weaviate. This is because the server responses are not mapped at all by the client so are assumed to be correct.
// Instead, these tests validate that the client sends the correct requests to the server and that the responses are handled correctly.
describe('Unit testing of the client.cluster methods', () => {
const clusterMaker = (mock: any) => cluster(mock as unknown as IConnection);
const assert = (expected: any) => (actual: any) => expect(actual).toEqual(expected);

it('should query the nodes correctly', () => {
const opts = { collection: 'Collection', output: 'minimal' as const };
const mockResult = {
nodes: [
{
name: 'node1',
status: 'HEALTHY',
version: '1.0.0',
gitHash: 'abc123',
batchStats: { queueLength: 0, ratePerSecond: 0 },
stats: undefined,
shards: null,
},
],
};
const mockConnection = {
get: (path: string, expectReturnContent?: boolean | undefined) => {
expect(path).toBe('/nodes/Collection?output=minimal');
return Promise.resolve(mockResult);
},
};
clusterMaker(mockConnection).nodes(opts).then(assert(mockResult.nodes));
});

it('should query the sharding state correctly for a collection with a shard', () => {
const opts = {
collection: 'Collection',
shard: 'shard',
};
const mockResult = {
collection: 'Collection',
shards: [{ shard: 'shard', replicas: ['node1', 'node2'] }],
};
const mockConnection = {
get: (path: string, expectReturnContent?: boolean | undefined) => {
expect(path).toBe(`/replication/sharding-state?collection=Collection&shard=shard`);
return Promise.resolve(mockResult);
},
};
clusterMaker(mockConnection).queryShardingState(opts).then(assert(mockResult));
});

it('should query the sharding state correctly for a collection without a specific shard', () => {
const opts = {
collection: 'Collection',
};
const mockResult = {
collection: 'Collection',
shards: [
{ shard: 'shard1', replicas: ['node1'] },
{ shard: 'shard2', replicas: ['node2'] },
],
};
const mockConnection = {
get: (path: string, expectReturnContent?: boolean | undefined) => {
expect(path).toBe('/replication/sharding-state?collection=Collection');
return Promise.resolve(mockResult);
},
};
clusterMaker(mockConnection).queryShardingState(opts).then(assert(mockResult));
});

it('should replicate a shard correctly', () => {
const args = {
collection: 'Collection',
shard: 'shard',
sourceNode: 'sourceNode',
targetNode: 'targetNode',
replicationType: 'COPY' as const,
};
const mockResult = { id: 'replication-id' };
const mockConnection = {
postReturn: (path: string, body: any): Promise<any> => {
expect(path).toBe('/replication/replicate');
expect(body).toEqual({
collection: 'Collection',
shard: 'shard',
sourceNode: 'sourceNode',
targetNode: 'targetNode',
type: 'COPY',
});
return Promise.resolve(mockResult);
},
};
clusterMaker(mockConnection).replicate(args).then(assert(mockResult.id));
});

it('should get a replication operation by ID without status history', () => {
const id = 'replication-id';
const mockResult = { id };
const mockConnection = {
get: (path: string) => {
expect(path).toBe(`/replication/replicate/${id}?includeHistory=false`);
return Promise.resolve(mockResult);
},
};
clusterMaker(mockConnection).replications.get(id).then(assert(mockResult));
});

it('should get a replication operation by ID with status history', () => {
const id = 'replication-id';
const mockResult = { id };
const mockConnection = {
get: (path: string) => {
expect(path).toBe(`/replication/replicate/${id}?includeHistory=true`);
return Promise.resolve(mockResult);
},
};
clusterMaker(mockConnection).replications.get(id, { includeHistory: true }).then(assert(mockResult));
});

it('should cancel a replication operation', () => {
const id = 'replication-id';
const mockConnection = {
postEmpty: (path: string): Promise<void> => {
expect(path).toBe(`/replication/replicate/${id}/cancel`);
return Promise.resolve();
},
};
clusterMaker(mockConnection).replications.cancel(id).then(assert(undefined));
});

it('should delete a replication operation', () => {
const id = 'replication-id';
const mockConnection = {
delete: (path: string) => {
expect(path).toBe(`/replication/replicate/${id}`);
return Promise.resolve();
},
};
clusterMaker(mockConnection).replications.delete(id).then(assert(undefined));
});

it('should delete all replication operations', () => {
const mockConnection = {
delete: (path: string) => {
expect(path).toBe(`/replication/replicate`);
return Promise.resolve();
},
};
clusterMaker(mockConnection).replications.deleteAll().then(assert(undefined));
});

it('should query replication operations with various filters', () => {
const opts = {
collection: 'Collection',
shard: 'shard',
targetNode: 'node1',
includeHistory: true,
};
const mockResult = [{ id: 'replication-id' }];
const mockConnection = {
get: (path: string) => {
expect(path).toBe(
`/replication/replicate?collection=Collection&shard=shard&targetNode=node1&includeHistory=true`
);
return Promise.resolve(mockResult);
},
};
clusterMaker(mockConnection).replications.query(opts).then(assert(mockResult));
});
});
8 changes: 8 additions & 0 deletions src/collections/types/internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,11 @@ type AtLeastOne<T> = {
}[keyof T];

export type NonEmpty<T> = keyof T extends never ? never : T;

export type DeepRequired<T> = T extends Function
? T
: T extends Array<infer U>
? Array<DeepRequired<U>>
: T extends object
? { [K in keyof T]-?: DeepRequired<NonNullable<T[K]>> }
: T;
Loading
Loading