Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 164 additions & 8 deletions src/collections/cluster/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
import { NodesStatusGetter } from '../../cluster/index.js';
import Connection from '../../connection/index.js';
import { BatchStats, NodeShardStatus, NodeStats } from '../../openapi/types.js';
import { IConnection } from '../../connection/index.js';
import {
BatchStats,
NodeShardStatus,
NodeStats,
NodesStatusResponse,
WeaviateReplicateRequest,
WeaviateReplicateResponse,
WeaviateReplicationResponse,
WeaviateReplicationType,
WeaviateShardingState,
} from '../../openapi/types.js';
import { DeepRequired } from '../../utils/types.js';

export type Output = 'minimal' | 'verbose' | undefined;

Expand All @@ -11,6 +21,44 @@ export type NodesOptions<O extends Output> = {
output: O;
};

export type QueryShardingStateOptions = {
/** The name of the shard to query. If not provided, all shards will be queried. */
shard?: string;
};

export type ReplicateArgs = {
/** The name of the collection in which to replicate a shard. */
collection: string;
/** The name of the shard to replicate. */
shard: string;
/** The name of the node from which to replicate the shard. */
sourceNode: string;
/** The name of the node to which to replicate the shard. */
targetNode: string;
/** The type of replication to perform. */
replicationType: WeaviateReplicationType;
};

export type ShardingState = DeepRequired<WeaviateShardingState>;

export type ReplicationOperation = DeepRequired<WeaviateReplicationResponse>;

export type QueryReplicationOpsOptions = {
/** The name of the collection to query. */
collection?: string;
/** The name of the shard to query. */
shard?: string;
/** The target node of the op to query. */
targetNode?: string;
/** Whether to include the status history in the response. */
includeHistory?: boolean;
};

export type GetReplicationOpOptions = {
/** Whether to include the status history in the response. Defaults to false. */
includeHistory?: boolean;
};

export type Node<O extends Output> = {
name: string;
status: 'HEALTHY' | 'UNHEALTHY' | 'UNAVAILABLE';
Expand All @@ -21,14 +69,65 @@ export type Node<O extends Output> = {
shards: O extends 'minimal' | undefined ? null : Required<NodeShardStatus>[];
};

const cluster = (connection: Connection) => {
const cluster = (connection: IConnection) => {
return {
nodes: <O extends Output = undefined>(opts?: NodesOptions<O>): Promise<Node<O>[]> => {
let builder = new NodesStatusGetter(connection).withOutput(opts?.output ? opts.output : 'minimal');
nodes: <O extends Output = undefined>(opts?: NodesOptions<O>) => {
const params = new URLSearchParams();
let path = '/nodes';
if (opts?.collection) {
builder = builder.withClassName(opts.collection);
path = path.concat(`/${opts.collection}`);
}
params.append('output', opts?.output ? opts.output : 'minimal');
return connection
.get<NodesStatusResponse>(`${path}?${params.toString()}`)
.then((res) => res.nodes as Node<O>[]);
},
queryShardingState: (collection: string, opts?: QueryShardingStateOptions) => {
const params = new URLSearchParams();
params.append('collection', collection);
if (opts?.shard) {
params.append('shard', opts.shard);
}
return builder.do().then((res) => res.nodes) as Promise<Node<O>[]>;
return connection
.get<ShardingState | undefined>(`/replication/sharding-state?${params.toString()}`)
.then((res) => res as ShardingState);
},
replicate: (args: ReplicateArgs): Promise<string> =>
connection
.postReturn<WeaviateReplicateRequest, WeaviateReplicateResponse>(
`/replication/replicate`,
(({ replicationType, ...rest }) => ({ type: replicationType, ...rest }))(args)
)
.then((res) => res.id),
replications: {
cancel: (id: string) => connection.postEmpty(`/replication/replicate/${id}/cancel`, {}),
delete: (id: string) => connection.delete(`/replication/replicate/${id}`, {}, false),
deleteAll: () => connection.delete(`/replication/replicate`, {}, false),
get: (id: string, opts?: GetReplicationOpOptions): Promise<ReplicationOperation | null> =>
connection
.get<ReplicationOperation | undefined>(
`/replication/replicate/${id}?includeHistory=${
opts?.includeHistory ? opts?.includeHistory : 'false'
}`
)
.then((res) => (res ? (res as ReplicationOperation) : null)),
query: (opts?: QueryReplicationOpsOptions): Promise<ReplicationOperation[]> => {
const { collection, shard, targetNode, includeHistory } = opts || {};
const params = new URLSearchParams();
if (collection) {
params.append('collection', collection);
}
if (shard) {
params.append('shard', shard);
}
if (targetNode) {
params.append('targetNode', targetNode);
}
if (includeHistory) {
params.append('includeHistory', includeHistory.toString());
}
return connection.get<ReplicationOperation[]>(`/replication/replicate?${params.toString()}`);
},
},
};
};
Expand All @@ -43,4 +142,61 @@ export interface Cluster {
* @returns {Promise<Node<O>[]>} The status of all nodes in the cluster.
*/
nodes: <O extends Output = undefined>(opts?: NodesOptions<O>) => Promise<Node<O>[]>;
/**
* Query the sharding state of a specific collection.
*
* @param {string} collection The name of the collection to query.
* @param {QueryShardingStateOptions} [opts] The options for the request.
* @returns {Promise<ShardingState>} The sharding state of the collection.
*/
queryShardingState: (collection: string, opts?: QueryShardingStateOptions) => Promise<ShardingState>;
/**
* Replicate a shard from one node to another.
*
* @param {ReplicateArgs} args The arguments for the replication request.
* @returns {Promise<string>} The ID of the replication request.
*/
replicate: (args: ReplicateArgs) => Promise<string>;
/**
* Access replication operations.
*/
replications: Replications;
}

export interface Replications {
/**
* Cancel a replication operation.
*
* @param {string} id The ID of the replication operation to cancel.
* @returns {Promise<void>} A promise that resolves when the operation is cancelled.
*/
cancel: (id: string) => Promise<void>;
/**
* Delete a replication operation.
*
* @param {string} id The ID of the replication operation to delete.
* @returns {Promise<void>} A promise that resolves when the operation is deleted.
*/
delete: (id: string) => Promise<void>;
/**
* Delete all replication operations.
*
* @returns {Promise<void>} A promise that resolves when all operations are deleted.
*/
deleteAll: () => Promise<void>;
/**
* Get a specific replication operation by ID.
*
* @param {string} id The ID of the replication operation to get.
* @param {boolean} [opts.includeHistory=false] Whether to include the status history in the response.
* @returns {Promise<ReplicationOperation | null>} The replication operation or null if not found.
*/
get: (id: string, opts?: { includeHistory?: boolean }) => Promise<ReplicationOperation | null>;
/**
* Query all replication operations with optional filters.
*
* @param {QueryReplicationOpsOptions} [opts] Optional parameters for filtering the query.
* @returns {Promise<ReplicationOperation[]>} A list of replication operations matching the query.
*/
query: (opts?: QueryReplicationOpsOptions) => Promise<ReplicationOperation[]>;
}
2 changes: 1 addition & 1 deletion src/collections/cluster/integration.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import weaviate, { WeaviateClient } from '../../index.js';

describe('Testing of the client.cluster methods', () => {
describe('Integration testing of the client.cluster methods', () => {
let client: WeaviateClient;

const one = 'TestClusterCollectionOne';
Expand Down
167 changes: 167 additions & 0 deletions src/collections/cluster/unit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import { IConnection } from '../../connection';
import cluster from './index.js';

// These tests do not validate the response from Weaviate. This is because the server responses are not mapped at all by the client so are assumed to be correct.
// Instead, these tests validate that the client sends the correct requests to the server and that the responses are handled correctly.
describe('Unit testing of the client.cluster methods', () => {
const clusterMaker = (mock: any) => cluster(mock as unknown as IConnection);
const assert = (expected: any) => (actual: any) => expect(actual).toEqual(expected);

it('should query the nodes correctly', () => {
const opts = { collection: 'Collection', output: 'minimal' as const };
const mockResult = {
nodes: [
{
name: 'node1',
status: 'HEALTHY',
version: '1.0.0',
gitHash: 'abc123',
batchStats: { queueLength: 0, ratePerSecond: 0 },
stats: undefined,
shards: null,
},
],
};
const mockConnection = {
get: (path: string, expectReturnContent?: boolean | undefined) => {
expect(path).toBe('/nodes/Collection?output=minimal');
return Promise.resolve(mockResult);
},
};
clusterMaker(mockConnection).nodes(opts).then(assert(mockResult.nodes));
});

it('should query the sharding state correctly for a collection with a shard', () => {
const opts = {
shard: 'shard',
};
const mockResult = {
collection: 'Collection',
shards: [{ shard: 'shard', replicas: ['node1', 'node2'] }],
};
const mockConnection = {
get: (path: string, expectReturnContent?: boolean | undefined) => {
expect(path).toBe(`/replication/sharding-state?collection=Collection&shard=shard`);
return Promise.resolve(mockResult);
},
};
clusterMaker(mockConnection).queryShardingState('Collection', opts).then(assert(mockResult));
});

it('should query the sharding state correctly for a collection without a specific shard', () => {
const mockResult = {
collection: 'Collection',
shards: [
{ shard: 'shard1', replicas: ['node1'] },
{ shard: 'shard2', replicas: ['node2'] },
],
};
const mockConnection = {
get: (path: string, expectReturnContent?: boolean | undefined) => {
expect(path).toBe('/replication/sharding-state?collection=Collection');
return Promise.resolve(mockResult);
},
};
clusterMaker(mockConnection).queryShardingState('Collection').then(assert(mockResult));
});

it('should replicate a shard correctly', () => {
const args = {
collection: 'Collection',
shard: 'shard',
sourceNode: 'sourceNode',
targetNode: 'targetNode',
replicationType: 'COPY' as const,
};
const mockResult = { id: 'replication-id' };
const mockConnection = {
postReturn: (path: string, body: any): Promise<any> => {
expect(path).toBe('/replication/replicate');
expect(body).toEqual({
collection: 'Collection',
shard: 'shard',
sourceNode: 'sourceNode',
targetNode: 'targetNode',
type: 'COPY',
});
return Promise.resolve(mockResult);
},
};
clusterMaker(mockConnection).replicate(args).then(assert(mockResult.id));
});

it('should get a replication operation by ID without status history', () => {
const id = 'replication-id';
const mockResult = { id };
const mockConnection = {
get: (path: string) => {
expect(path).toBe(`/replication/replicate/${id}?includeHistory=false`);
return Promise.resolve(mockResult);
},
};
clusterMaker(mockConnection).replications.get(id).then(assert(mockResult));
});

it('should get a replication operation by ID with status history', () => {
const id = 'replication-id';
const mockResult = { id };
const mockConnection = {
get: (path: string) => {
expect(path).toBe(`/replication/replicate/${id}?includeHistory=true`);
return Promise.resolve(mockResult);
},
};
clusterMaker(mockConnection).replications.get(id, { includeHistory: true }).then(assert(mockResult));
});

it('should cancel a replication operation', () => {
const id = 'replication-id';
const mockConnection = {
postEmpty: (path: string): Promise<void> => {
expect(path).toBe(`/replication/replicate/${id}/cancel`);
return Promise.resolve();
},
};
clusterMaker(mockConnection).replications.cancel(id).then(assert(undefined));
});

it('should delete a replication operation', () => {
const id = 'replication-id';
const mockConnection = {
delete: (path: string) => {
expect(path).toBe(`/replication/replicate/${id}`);
return Promise.resolve();
},
};
clusterMaker(mockConnection).replications.delete(id).then(assert(undefined));
});

it('should delete all replication operations', () => {
const mockConnection = {
delete: (path: string) => {
expect(path).toBe(`/replication/replicate`);
return Promise.resolve();
},
};
clusterMaker(mockConnection).replications.deleteAll().then(assert(undefined));
});

it('should query replication operations with various filters', () => {
const opts = {
collection: 'Collection',
shard: 'shard',
targetNode: 'node1',
includeHistory: true,
};
const mockResult = [{ id: 'replication-id' }];
const mockConnection = {
get: (path: string) => {
expect(path).toBe(
`/replication/replicate?collection=Collection&shard=shard&targetNode=node1&includeHistory=true`
);
return Promise.resolve(mockResult);
},
};
clusterMaker(mockConnection).replications.query(opts).then(assert(mockResult));
});
});
14 changes: 13 additions & 1 deletion src/connection/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,19 @@ export interface ConnectionDetails {
headers?: HeadersInit;
}

export default class ConnectionREST {
export interface IConnection {
postReturn: <B, T>(path: string, payload: B) => Promise<T>;
postEmpty: <B>(path: string, payload: B) => Promise<void>;
put: (path: string, payload: any, expectReturnContent?: boolean) => Promise<any>;
patch: (path: string, payload: any) => Promise<any>;
delete: (path: string, payload: any, expectReturnContent?: boolean) => Promise<any>;
head: (path: string, payload: any) => Promise<boolean>;
get: <T>(path: string, expectReturnContent?: boolean) => Promise<T>;
login(): Promise<string>;
getDetails(): Promise<ConnectionDetails>;
}

export default class ConnectionREST implements IConnection {
private apiKey?: string;
private headers?: HeadersInit;
protected authEnabled: boolean;
Expand Down
Loading