Skip to content

Commit f996cff

Browse files
authored
Merge pull request #320 from weaviate/1.32/add-replica-movement-functionality
Add replica movement methods under `client.cluster`
2 parents 43679b7 + b467eb4 commit f996cff

File tree

7 files changed

+357
-11
lines changed

7 files changed

+357
-11
lines changed

src/collections/cluster/index.ts

Lines changed: 164 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
1-
import { NodesStatusGetter } from '../../cluster/index.js';
2-
import Connection from '../../connection/index.js';
3-
import { BatchStats, NodeShardStatus, NodeStats } from '../../openapi/types.js';
1+
import { IConnection } from '../../connection/index.js';
2+
import {
3+
BatchStats,
4+
NodeShardStatus,
5+
NodeStats,
6+
NodesStatusResponse,
7+
WeaviateReplicateRequest,
8+
WeaviateReplicateResponse,
9+
WeaviateReplicationResponse,
10+
WeaviateReplicationType,
11+
WeaviateShardingState,
12+
} from '../../openapi/types.js';
13+
import { DeepRequired } from '../../utils/types.js';
414

515
export type Output = 'minimal' | 'verbose' | undefined;
616

@@ -11,6 +21,44 @@ export type NodesOptions<O extends Output> = {
1121
output: O;
1222
};
1323

24+
export type QueryShardingStateOptions = {
25+
/** The name of the shard to query. If not provided, all shards will be queried. */
26+
shard?: string;
27+
};
28+
29+
export type ReplicateArgs = {
30+
/** The name of the collection in which to replicate a shard. */
31+
collection: string;
32+
/** The name of the shard to replicate. */
33+
shard: string;
34+
/** The name of the node from which to replicate the shard. */
35+
sourceNode: string;
36+
/** The name of the node to which to replicate the shard. */
37+
targetNode: string;
38+
/** The type of replication to perform. */
39+
replicationType: WeaviateReplicationType;
40+
};
41+
42+
export type ShardingState = DeepRequired<WeaviateShardingState>;
43+
44+
export type ReplicationOperation = DeepRequired<WeaviateReplicationResponse>;
45+
46+
export type QueryReplicationOpsOptions = {
47+
/** The name of the collection to query. */
48+
collection?: string;
49+
/** The name of the shard to query. */
50+
shard?: string;
51+
/** The target node of the op to query. */
52+
targetNode?: string;
53+
/** Whether to include the status history in the response. */
54+
includeHistory?: boolean;
55+
};
56+
57+
export type GetReplicationOpOptions = {
58+
/** Whether to include the status history in the response. Defaults to false. */
59+
includeHistory?: boolean;
60+
};
61+
1462
export type Node<O extends Output> = {
1563
name: string;
1664
status: 'HEALTHY' | 'UNHEALTHY' | 'UNAVAILABLE';
@@ -21,14 +69,65 @@ export type Node<O extends Output> = {
2169
shards: O extends 'minimal' | undefined ? null : Required<NodeShardStatus>[];
2270
};
2371

24-
const cluster = (connection: Connection) => {
72+
const cluster = (connection: IConnection) => {
2573
return {
26-
nodes: <O extends Output = undefined>(opts?: NodesOptions<O>): Promise<Node<O>[]> => {
27-
let builder = new NodesStatusGetter(connection).withOutput(opts?.output ? opts.output : 'minimal');
74+
nodes: <O extends Output = undefined>(opts?: NodesOptions<O>) => {
75+
const params = new URLSearchParams();
76+
let path = '/nodes';
2877
if (opts?.collection) {
29-
builder = builder.withClassName(opts.collection);
78+
path = path.concat(`/${opts.collection}`);
79+
}
80+
params.append('output', opts?.output ? opts.output : 'minimal');
81+
return connection
82+
.get<NodesStatusResponse>(`${path}?${params.toString()}`)
83+
.then((res) => res.nodes as Node<O>[]);
84+
},
85+
queryShardingState: (collection: string, opts?: QueryShardingStateOptions) => {
86+
const params = new URLSearchParams();
87+
params.append('collection', collection);
88+
if (opts?.shard) {
89+
params.append('shard', opts.shard);
3090
}
31-
return builder.do().then((res) => res.nodes) as Promise<Node<O>[]>;
91+
return connection
92+
.get<ShardingState | undefined>(`/replication/sharding-state?${params.toString()}`)
93+
.then((res) => res as ShardingState);
94+
},
95+
replicate: (args: ReplicateArgs): Promise<string> =>
96+
connection
97+
.postReturn<WeaviateReplicateRequest, WeaviateReplicateResponse>(
98+
`/replication/replicate`,
99+
(({ replicationType, ...rest }) => ({ type: replicationType, ...rest }))(args)
100+
)
101+
.then((res) => res.id),
102+
replications: {
103+
cancel: (id: string) => connection.postEmpty(`/replication/replicate/${id}/cancel`, {}),
104+
delete: (id: string) => connection.delete(`/replication/replicate/${id}`, {}, false),
105+
deleteAll: () => connection.delete(`/replication/replicate`, {}, false),
106+
get: (id: string, opts?: GetReplicationOpOptions): Promise<ReplicationOperation | null> =>
107+
connection
108+
.get<ReplicationOperation | undefined>(
109+
`/replication/replicate/${id}?includeHistory=${
110+
opts?.includeHistory ? opts?.includeHistory : 'false'
111+
}`
112+
)
113+
.then((res) => (res ? (res as ReplicationOperation) : null)),
114+
query: (opts?: QueryReplicationOpsOptions): Promise<ReplicationOperation[]> => {
115+
const { collection, shard, targetNode, includeHistory } = opts || {};
116+
const params = new URLSearchParams();
117+
if (collection) {
118+
params.append('collection', collection);
119+
}
120+
if (shard) {
121+
params.append('shard', shard);
122+
}
123+
if (targetNode) {
124+
params.append('targetNode', targetNode);
125+
}
126+
if (includeHistory) {
127+
params.append('includeHistory', includeHistory.toString());
128+
}
129+
return connection.get<ReplicationOperation[]>(`/replication/replicate?${params.toString()}`);
130+
},
32131
},
33132
};
34133
};
@@ -43,4 +142,61 @@ export interface Cluster {
43142
* @returns {Promise<Node<O>[]>} The status of all nodes in the cluster.
44143
*/
45144
nodes: <O extends Output = undefined>(opts?: NodesOptions<O>) => Promise<Node<O>[]>;
145+
/**
146+
* Query the sharding state of a specific collection.
147+
*
148+
* @param {string} collection The name of the collection to query.
149+
* @param {QueryShardingStateOptions} [opts] The options for the request.
150+
* @returns {Promise<ShardingState>} The sharding state of the collection.
151+
*/
152+
queryShardingState: (collection: string, opts?: QueryShardingStateOptions) => Promise<ShardingState>;
153+
/**
154+
* Replicate a shard from one node to another.
155+
*
156+
* @param {ReplicateArgs} args The arguments for the replication request.
157+
* @returns {Promise<string>} The ID of the replication request.
158+
*/
159+
replicate: (args: ReplicateArgs) => Promise<string>;
160+
/**
161+
* Access replication operations.
162+
*/
163+
replications: Replications;
164+
}
165+
166+
export interface Replications {
167+
/**
168+
* Cancel a replication operation.
169+
*
170+
* @param {string} id The ID of the replication operation to cancel.
171+
* @returns {Promise<void>} A promise that resolves when the operation is cancelled.
172+
*/
173+
cancel: (id: string) => Promise<void>;
174+
/**
175+
* Delete a replication operation.
176+
*
177+
* @param {string} id The ID of the replication operation to delete.
178+
* @returns {Promise<void>} A promise that resolves when the operation is deleted.
179+
*/
180+
delete: (id: string) => Promise<void>;
181+
/**
182+
* Delete all replication operations.
183+
*
184+
* @returns {Promise<void>} A promise that resolves when all operations are deleted.
185+
*/
186+
deleteAll: () => Promise<void>;
187+
/**
188+
* Get a specific replication operation by ID.
189+
*
190+
* @param {string} id The ID of the replication operation to get.
191+
* @param {boolean} [opts.includeHistory=false] Whether to include the status history in the response.
192+
* @returns {Promise<ReplicationOperation | null>} The replication operation or null if not found.
193+
*/
194+
get: (id: string, opts?: { includeHistory?: boolean }) => Promise<ReplicationOperation | null>;
195+
/**
196+
* Query all replication operations with optional filters.
197+
*
198+
* @param {QueryReplicationOpsOptions} [opts] Optional parameters for filtering the query.
199+
* @returns {Promise<ReplicationOperation[]>} A list of replication operations matching the query.
200+
*/
201+
query: (opts?: QueryReplicationOpsOptions) => Promise<ReplicationOperation[]>;
46202
}

src/collections/cluster/integration.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import weaviate, { WeaviateClient } from '../../index.js';
22

3-
describe('Testing of the client.cluster methods', () => {
3+
describe('Integration testing of the client.cluster methods', () => {
44
let client: WeaviateClient;
55

66
const one = 'TestClusterCollectionOne';
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
import { IConnection } from '../../connection';
2+
import cluster from './index.js';
3+
4+
// These tests do not validate the response from Weaviate. This is because the server responses are not mapped at all by the client so are assumed to be correct.
5+
// Instead, these tests validate that the client sends the correct requests to the server and that the responses are handled correctly.
6+
describe('Unit testing of the client.cluster methods', () => {
7+
const clusterMaker = (mock: any) => cluster(mock as unknown as IConnection);
8+
const assert = (expected: any) => (actual: any) => expect(actual).toEqual(expected);
9+
10+
it('should query the nodes correctly', () => {
11+
const opts = { collection: 'Collection', output: 'minimal' as const };
12+
const mockResult = {
13+
nodes: [
14+
{
15+
name: 'node1',
16+
status: 'HEALTHY',
17+
version: '1.0.0',
18+
gitHash: 'abc123',
19+
batchStats: { queueLength: 0, ratePerSecond: 0 },
20+
stats: undefined,
21+
shards: null,
22+
},
23+
],
24+
};
25+
const mockConnection = {
26+
get: (path: string, expectReturnContent?: boolean | undefined) => {
27+
expect(path).toBe('/nodes/Collection?output=minimal');
28+
return Promise.resolve(mockResult);
29+
},
30+
};
31+
clusterMaker(mockConnection).nodes(opts).then(assert(mockResult.nodes));
32+
});
33+
34+
it('should query the sharding state correctly for a collection with a shard', () => {
35+
const opts = {
36+
shard: 'shard',
37+
};
38+
const mockResult = {
39+
collection: 'Collection',
40+
shards: [{ shard: 'shard', replicas: ['node1', 'node2'] }],
41+
};
42+
const mockConnection = {
43+
get: (path: string, expectReturnContent?: boolean | undefined) => {
44+
expect(path).toBe(`/replication/sharding-state?collection=Collection&shard=shard`);
45+
return Promise.resolve(mockResult);
46+
},
47+
};
48+
clusterMaker(mockConnection).queryShardingState('Collection', opts).then(assert(mockResult));
49+
});
50+
51+
it('should query the sharding state correctly for a collection without a specific shard', () => {
52+
const mockResult = {
53+
collection: 'Collection',
54+
shards: [
55+
{ shard: 'shard1', replicas: ['node1'] },
56+
{ shard: 'shard2', replicas: ['node2'] },
57+
],
58+
};
59+
const mockConnection = {
60+
get: (path: string, expectReturnContent?: boolean | undefined) => {
61+
expect(path).toBe('/replication/sharding-state?collection=Collection');
62+
return Promise.resolve(mockResult);
63+
},
64+
};
65+
clusterMaker(mockConnection).queryShardingState('Collection').then(assert(mockResult));
66+
});
67+
68+
it('should replicate a shard correctly', () => {
69+
const args = {
70+
collection: 'Collection',
71+
shard: 'shard',
72+
sourceNode: 'sourceNode',
73+
targetNode: 'targetNode',
74+
replicationType: 'COPY' as const,
75+
};
76+
const mockResult = { id: 'replication-id' };
77+
const mockConnection = {
78+
postReturn: (path: string, body: any): Promise<any> => {
79+
expect(path).toBe('/replication/replicate');
80+
expect(body).toEqual({
81+
collection: 'Collection',
82+
shard: 'shard',
83+
sourceNode: 'sourceNode',
84+
targetNode: 'targetNode',
85+
type: 'COPY',
86+
});
87+
return Promise.resolve(mockResult);
88+
},
89+
};
90+
clusterMaker(mockConnection).replicate(args).then(assert(mockResult.id));
91+
});
92+
93+
it('should get a replication operation by ID without status history', () => {
94+
const id = 'replication-id';
95+
const mockResult = { id };
96+
const mockConnection = {
97+
get: (path: string) => {
98+
expect(path).toBe(`/replication/replicate/${id}?includeHistory=false`);
99+
return Promise.resolve(mockResult);
100+
},
101+
};
102+
clusterMaker(mockConnection).replications.get(id).then(assert(mockResult));
103+
});
104+
105+
it('should get a replication operation by ID with status history', () => {
106+
const id = 'replication-id';
107+
const mockResult = { id };
108+
const mockConnection = {
109+
get: (path: string) => {
110+
expect(path).toBe(`/replication/replicate/${id}?includeHistory=true`);
111+
return Promise.resolve(mockResult);
112+
},
113+
};
114+
clusterMaker(mockConnection).replications.get(id, { includeHistory: true }).then(assert(mockResult));
115+
});
116+
117+
it('should cancel a replication operation', () => {
118+
const id = 'replication-id';
119+
const mockConnection = {
120+
postEmpty: (path: string): Promise<void> => {
121+
expect(path).toBe(`/replication/replicate/${id}/cancel`);
122+
return Promise.resolve();
123+
},
124+
};
125+
clusterMaker(mockConnection).replications.cancel(id).then(assert(undefined));
126+
});
127+
128+
it('should delete a replication operation', () => {
129+
const id = 'replication-id';
130+
const mockConnection = {
131+
delete: (path: string) => {
132+
expect(path).toBe(`/replication/replicate/${id}`);
133+
return Promise.resolve();
134+
},
135+
};
136+
clusterMaker(mockConnection).replications.delete(id).then(assert(undefined));
137+
});
138+
139+
it('should delete all replication operations', () => {
140+
const mockConnection = {
141+
delete: (path: string) => {
142+
expect(path).toBe(`/replication/replicate`);
143+
return Promise.resolve();
144+
},
145+
};
146+
clusterMaker(mockConnection).replications.deleteAll().then(assert(undefined));
147+
});
148+
149+
it('should query replication operations with various filters', () => {
150+
const opts = {
151+
collection: 'Collection',
152+
shard: 'shard',
153+
targetNode: 'node1',
154+
includeHistory: true,
155+
};
156+
const mockResult = [{ id: 'replication-id' }];
157+
const mockConnection = {
158+
get: (path: string) => {
159+
expect(path).toBe(
160+
`/replication/replicate?collection=Collection&shard=shard&targetNode=node1&includeHistory=true`
161+
);
162+
return Promise.resolve(mockResult);
163+
},
164+
};
165+
clusterMaker(mockConnection).replications.query(opts).then(assert(mockResult));
166+
});
167+
});

src/connection/http.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,19 @@ export interface ConnectionDetails {
5858
headers?: HeadersInit;
5959
}
6060

61-
export default class ConnectionREST {
61+
export interface IConnection {
62+
postReturn: <B, T>(path: string, payload: B) => Promise<T>;
63+
postEmpty: <B>(path: string, payload: B) => Promise<void>;
64+
put: (path: string, payload: any, expectReturnContent?: boolean) => Promise<any>;
65+
patch: (path: string, payload: any) => Promise<any>;
66+
delete: (path: string, payload: any, expectReturnContent?: boolean) => Promise<any>;
67+
head: (path: string, payload: any) => Promise<boolean>;
68+
get: <T>(path: string, expectReturnContent?: boolean) => Promise<T>;
69+
login(): Promise<string>;
70+
getDetails(): Promise<ConnectionDetails>;
71+
}
72+
73+
export default class ConnectionREST implements IConnection {
6274
private apiKey?: string;
6375
private headers?: HeadersInit;
6476
protected authEnabled: boolean;

0 commit comments

Comments
 (0)