Skip to content

Commit 882afcc

Browse files
committed
Add replica movement methods under client.cluster
1 parent edb8dea commit 882afcc

File tree

7 files changed

+360
-11
lines changed

7 files changed

+360
-11
lines changed

src/collections/cluster/index.ts

Lines changed: 160 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
1-
import { NodesStatusGetter } from '../../cluster/index.js';
2-
import Connection from '../../connection/index.js';
3-
import { BatchStats, NodeShardStatus, NodeStats } from '../../openapi/types.js';
1+
import { IConnection } from '../../connection/index.js';
2+
import {
3+
BatchStats,
4+
NodeShardStatus,
5+
NodeStats,
6+
NodesStatusResponse,
7+
WeaviateReplicateRequest,
8+
WeaviateReplicateResponse,
9+
WeaviateReplicationResponse,
10+
WeaviateReplicationType,
11+
WeaviateShardingState,
12+
} from '../../openapi/types.js';
13+
import { DeepRequired } from '../types/internal.js';
414

515
export type Output = 'minimal' | 'verbose' | undefined;
616

@@ -11,6 +21,41 @@ export type NodesOptions<O extends Output> = {
1121
output: O;
1222
};
1323

24+
export type QueryShardingStateOptions = {
25+
/** The name of the collection to query. */
26+
collection: string;
27+
/** The name of the shard to query. If not provided, all shards will be queried. */
28+
shard?: string;
29+
};
30+
31+
export type ReplicateArguments = {
32+
/** The name of the collection in which to replicate a shard. */
33+
collection: string;
34+
/** The name of the shard to replicate. */
35+
shard: string;
36+
/** The name of the node from which to replicate the shard. */
37+
sourceNode: string;
38+
/** The name of the node to which to replicate the shard. */
39+
targetNode: string;
40+
/** The type of replication to perform. */
41+
replicationType: WeaviateReplicationType;
42+
};
43+
44+
export type ShardingState = DeepRequired<WeaviateShardingState>;
45+
46+
export type ReplicationOperation = DeepRequired<WeaviateReplicationResponse>;
47+
48+
export type QueryReplicationOpsOptions = {
49+
/** The name of the collection to query. */
50+
collection?: string;
51+
/** The name of the shard to query. */
52+
shard?: string;
53+
/** The target node of the op to query. */
54+
targetNode?: string;
55+
/** Whether to include the status history in the response. */
56+
includeHistory?: boolean;
57+
};
58+
1459
export type Node<O extends Output> = {
1560
name: string;
1661
status: 'HEALTHY' | 'UNHEALTHY' | 'UNAVAILABLE';
@@ -21,14 +66,65 @@ export type Node<O extends Output> = {
2166
shards: O extends 'minimal' | undefined ? null : Required<NodeShardStatus>[];
2267
};
2368

24-
const cluster = (connection: Connection) => {
69+
const cluster = (connection: IConnection) => {
2570
return {
26-
nodes: <O extends Output = undefined>(opts?: NodesOptions<O>): Promise<Node<O>[]> => {
27-
let builder = new NodesStatusGetter(connection).withOutput(opts?.output ? opts.output : 'minimal');
71+
nodes: <O extends Output = undefined>(opts?: NodesOptions<O>) => {
72+
const params = new URLSearchParams();
73+
let path = '/nodes';
2874
if (opts?.collection) {
29-
builder = builder.withClassName(opts.collection);
75+
path = path.concat(`/${opts.collection}`);
3076
}
31-
return builder.do().then((res) => res.nodes) as Promise<Node<O>[]>;
77+
params.append('output', opts?.output ? opts.output : 'minimal');
78+
return connection
79+
.get<NodesStatusResponse>(`${path}?${params.toString()}`)
80+
.then((res) => res.nodes as Node<O>[]);
81+
},
82+
queryShardingState: (opts: QueryShardingStateOptions) => {
83+
const params = new URLSearchParams();
84+
params.append('collection', opts.collection);
85+
if (opts.shard) {
86+
params.append('shard', opts.shard);
87+
}
88+
return connection
89+
.get<ShardingState | undefined>(`/replication/sharding-state?${params.toString()}`)
90+
.then((res) => res as ShardingState);
91+
},
92+
replicate: (args: ReplicateArguments): Promise<string> =>
93+
connection
94+
.postReturn<WeaviateReplicateRequest, WeaviateReplicateResponse>(
95+
`/replication/replicate`,
96+
(({ replicationType, ...rest }) => ({ type: replicationType, ...rest }))(args)
97+
)
98+
.then((res) => res.id),
99+
replications: {
100+
cancel: (id: string) => connection.postEmpty(`/replication/replicate/${id}/cancel`, {}),
101+
delete: (id: string) => connection.delete(`/replication/replicate/${id}`, {}, false),
102+
deleteAll: () => connection.delete(`/replication/replicate`, {}, false),
103+
get: (id: string, opts?: { includeHistory?: boolean }): Promise<ReplicationOperation | null> =>
104+
connection
105+
.get<ReplicationOperation | undefined>(
106+
`/replication/replicate/${id}?includeHistory=${
107+
opts?.includeHistory ? opts?.includeHistory : 'false'
108+
}`
109+
)
110+
.then((res) => (res ? (res as ReplicationOperation) : null)),
111+
query: (opts?: QueryReplicationOpsOptions): Promise<ReplicationOperation[]> => {
112+
const { collection, shard, targetNode, includeHistory } = opts || {};
113+
const params = new URLSearchParams();
114+
if (collection) {
115+
params.append('collection', collection);
116+
}
117+
if (shard) {
118+
params.append('shard', shard);
119+
}
120+
if (targetNode) {
121+
params.append('targetNode', targetNode);
122+
}
123+
if (includeHistory) {
124+
params.append('includeHistory', includeHistory.toString());
125+
}
126+
return connection.get<ReplicationOperation[]>(`/replication/replicate?${params.toString()}`);
127+
},
32128
},
33129
};
34130
};
@@ -43,4 +139,60 @@ export interface Cluster {
43139
* @returns {Promise<Node<O>[]>} The status of all nodes in the cluster.
44140
*/
45141
nodes: <O extends Output = undefined>(opts?: NodesOptions<O>) => Promise<Node<O>[]>;
142+
/**
143+
* Query the sharding state of a specific collection.
144+
*
145+
* @param {QueryShardingStateOptions} opts The options for the request.
146+
* @returns {Promise<ShardingState>} The sharding state of the collection.
147+
*/
148+
queryShardingState: (opts: QueryShardingStateOptions) => Promise<ShardingState>;
149+
/**
150+
* Replicate a shard from one node to another.
151+
*
152+
* @param {ReplicateArguments} args The arguments for the replication request.
153+
* @returns {Promise<string>} The ID of the replication request.
154+
*/
155+
replicate: (args: ReplicateArguments) => Promise<string>;
156+
/**
157+
* Access replication operations.
158+
*/
159+
replications: Replciations;
160+
}
161+
162+
export interface Replciations {
163+
/**
164+
* Cancel a replication operation.
165+
*
166+
* @param {string} id The ID of the replication operation to cancel.
167+
* @returns {Promise<void>} A promise that resolves when the operation is cancelled.
168+
*/
169+
cancel: (id: string) => Promise<void>;
170+
/**
171+
* Delete a replication operation.
172+
*
173+
* @param {string} id The ID of the replication operation to delete.
174+
* @returns {Promise<void>} A promise that resolves when the operation is deleted.
175+
*/
176+
delete: (id: string) => Promise<void>;
177+
/**
178+
* Delete all replication operations.
179+
*
180+
* @returns {Promise<void>} A promise that resolves when all operations are deleted.
181+
*/
182+
deleteAll: () => Promise<void>;
183+
/**
184+
* Get a specific replication operation by ID.
185+
*
186+
* @param {string} id The ID of the replication operation to get.
187+
* @param {boolean} [opts.includeHistory=false] Whether to include the status history in the response.
188+
* @returns {Promise<ReplicationOperation | null>} The replication operation or null if not found.
189+
*/
190+
get: (id: string, opts?: { includeHistory?: boolean }) => Promise<ReplicationOperation | null>;
191+
/**
192+
* Query all replication operations with optional filters.
193+
*
194+
* @param {QueryReplicationOpsOptions} [opts] Optional parameters for filtering the query.
195+
* @returns {Promise<ReplicationOperation[]>} A list of replication operations matching the query.
196+
*/
197+
query: (opts?: QueryReplicationOpsOptions) => Promise<ReplicationOperation[]>;
46198
}

src/collections/cluster/integration.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import weaviate, { WeaviateClient } from '../../index.js';
22

3-
describe('Testing of the client.cluster methods', () => {
3+
describe('Integration testing of the client.cluster methods', () => {
44
let client: WeaviateClient;
55

66
const one = 'TestClusterCollectionOne';
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
import { IConnection } from '../../connection';
2+
import cluster from './index.js';
3+
4+
// These tests do not validate the response from Weaviate. This is because the server responses are not mapped at all by the client so are assumed to be correct.
5+
// Instead, these tests validate that the client sends the correct requests to the server and that the responses are handled correctly.
6+
describe('Unit testing of the client.cluster methods', () => {
7+
const clusterMaker = (mock: any) => cluster(mock as unknown as IConnection);
8+
const assert = (expected: any) => (actual: any) => expect(actual).toEqual(expected);
9+
10+
it('should query the nodes correctly', () => {
11+
const opts = { collection: 'Collection', output: 'minimal' as const };
12+
const mockResult = {
13+
nodes: [
14+
{
15+
name: 'node1',
16+
status: 'HEALTHY',
17+
version: '1.0.0',
18+
gitHash: 'abc123',
19+
batchStats: { queueLength: 0, ratePerSecond: 0 },
20+
stats: undefined,
21+
shards: null,
22+
},
23+
],
24+
};
25+
const mockConnection = {
26+
get: (path: string, expectReturnContent?: boolean | undefined) => {
27+
expect(path).toBe('/nodes/Collection?output=minimal');
28+
return Promise.resolve(mockResult);
29+
},
30+
};
31+
clusterMaker(mockConnection).nodes(opts).then(assert(mockResult.nodes));
32+
});
33+
34+
it('should query the sharding state correctly for a collection with a shard', () => {
35+
const opts = {
36+
collection: 'Collection',
37+
shard: 'shard',
38+
};
39+
const mockResult = {
40+
collection: 'Collection',
41+
shards: [{ shard: 'shard', replicas: ['node1', 'node2'] }],
42+
};
43+
const mockConnection = {
44+
get: (path: string, expectReturnContent?: boolean | undefined) => {
45+
expect(path).toBe(`/replication/sharding-state?collection=Collection&shard=shard`);
46+
return Promise.resolve(mockResult);
47+
},
48+
};
49+
clusterMaker(mockConnection).queryShardingState(opts).then(assert(mockResult));
50+
});
51+
52+
it('should query the sharding state correctly for a collection without a specific shard', () => {
53+
const opts = {
54+
collection: 'Collection',
55+
};
56+
const mockResult = {
57+
collection: 'Collection',
58+
shards: [
59+
{ shard: 'shard1', replicas: ['node1'] },
60+
{ shard: 'shard2', replicas: ['node2'] },
61+
],
62+
};
63+
const mockConnection = {
64+
get: (path: string, expectReturnContent?: boolean | undefined) => {
65+
expect(path).toBe('/replication/sharding-state?collection=Collection');
66+
return Promise.resolve(mockResult);
67+
},
68+
};
69+
clusterMaker(mockConnection).queryShardingState(opts).then(assert(mockResult));
70+
});
71+
72+
it('should replicate a shard correctly', () => {
73+
const args = {
74+
collection: 'Collection',
75+
shard: 'shard',
76+
sourceNode: 'sourceNode',
77+
targetNode: 'targetNode',
78+
replicationType: 'COPY' as const,
79+
};
80+
const mockResult = { id: 'replication-id' };
81+
const mockConnection = {
82+
postReturn: (path: string, body: any): Promise<any> => {
83+
expect(path).toBe('/replication/replicate');
84+
expect(body).toEqual({
85+
collection: 'Collection',
86+
shard: 'shard',
87+
sourceNode: 'sourceNode',
88+
targetNode: 'targetNode',
89+
type: 'COPY',
90+
});
91+
return Promise.resolve(mockResult);
92+
},
93+
};
94+
clusterMaker(mockConnection).replicate(args).then(assert(mockResult.id));
95+
});
96+
97+
it('should get a replication operation by ID without status history', () => {
98+
const id = 'replication-id';
99+
const mockResult = { id };
100+
const mockConnection = {
101+
get: (path: string) => {
102+
expect(path).toBe(`/replication/replicate/${id}?includeHistory=false`);
103+
return Promise.resolve(mockResult);
104+
},
105+
};
106+
clusterMaker(mockConnection).replications.get(id).then(assert(mockResult));
107+
});
108+
109+
it('should get a replication operation by ID with status history', () => {
110+
const id = 'replication-id';
111+
const mockResult = { id };
112+
const mockConnection = {
113+
get: (path: string) => {
114+
expect(path).toBe(`/replication/replicate/${id}?includeHistory=true`);
115+
return Promise.resolve(mockResult);
116+
},
117+
};
118+
clusterMaker(mockConnection).replications.get(id, { includeHistory: true }).then(assert(mockResult));
119+
});
120+
121+
it('should cancel a replication operation', () => {
122+
const id = 'replication-id';
123+
const mockConnection = {
124+
postEmpty: (path: string): Promise<void> => {
125+
expect(path).toBe(`/replication/replicate/${id}/cancel`);
126+
return Promise.resolve();
127+
},
128+
};
129+
clusterMaker(mockConnection).replications.cancel(id).then(assert(undefined));
130+
});
131+
132+
it('should delete a replication operation', () => {
133+
const id = 'replication-id';
134+
const mockConnection = {
135+
delete: (path: string) => {
136+
expect(path).toBe(`/replication/replicate/${id}`);
137+
return Promise.resolve();
138+
},
139+
};
140+
clusterMaker(mockConnection).replications.delete(id).then(assert(undefined));
141+
});
142+
143+
it('should delete all replication operations', () => {
144+
const mockConnection = {
145+
delete: (path: string) => {
146+
expect(path).toBe(`/replication/replicate`);
147+
return Promise.resolve();
148+
},
149+
};
150+
clusterMaker(mockConnection).replications.deleteAll().then(assert(undefined));
151+
});
152+
153+
it('should query replication operations with various filters', () => {
154+
const opts = {
155+
collection: 'Collection',
156+
shard: 'shard',
157+
targetNode: 'node1',
158+
includeHistory: true,
159+
};
160+
const mockResult = [{ id: 'replication-id' }];
161+
const mockConnection = {
162+
get: (path: string) => {
163+
expect(path).toBe(
164+
`/replication/replicate?collection=Collection&shard=shard&targetNode=node1&includeHistory=true`
165+
);
166+
return Promise.resolve(mockResult);
167+
},
168+
};
169+
clusterMaker(mockConnection).replications.query(opts).then(assert(mockResult));
170+
});
171+
});

src/collections/types/internal.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,3 +134,11 @@ type AtLeastOne<T> = {
134134
}[keyof T];
135135

136136
export type NonEmpty<T> = keyof T extends never ? never : T;
137+
138+
export type DeepRequired<T> = T extends Function
139+
? T
140+
: T extends Array<infer U>
141+
? Array<DeepRequired<U>>
142+
: T extends object
143+
? { [K in keyof T]-?: DeepRequired<NonNullable<T[K]>> }
144+
: T;

0 commit comments

Comments
 (0)