Skip to content

Commit 08bd1ac

Browse files
authored
Merge pull request #495 from weaviate/v6-replication
v6: Replication API
2 parents d4b8aac + 09d6aee commit 08bd1ac

18 files changed

+646
-4
lines changed

src/it/java/io/weaviate/integration/ClusterITest.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@
77

88
import io.weaviate.ConcurrentTest;
99
import io.weaviate.client6.v1.api.WeaviateClient;
10+
import io.weaviate.client6.v1.api.cluster.Node;
11+
import io.weaviate.client6.v1.api.cluster.NodeVerbosity;
1012
import io.weaviate.client6.v1.api.cluster.ShardingState;
13+
import io.weaviate.client6.v1.api.cluster.replication.Replication;
14+
import io.weaviate.client6.v1.api.cluster.replication.ReplicationState;
15+
import io.weaviate.client6.v1.api.cluster.replication.ReplicationType;
1116
import io.weaviate.containers.Weaviate;
1217

1318
public class ClusterITest extends ConcurrentTest {
@@ -50,4 +55,80 @@ public void test_listNodes() throws IOException {
5055
// Assert
5156
Assertions.assertThat(allNodes).as("total no. nodes").hasSize(3);
5257
}
58+
59+
@Test
60+
public void test_replicateLifecycle() throws IOException {
61+
// Arrange
62+
63+
// We must create the collection first before any shards exist on the nodes.
64+
var nsThings = ns("Things");
65+
client.collections.create(nsThings);
66+
67+
var nodes = client.cluster.listNodes(opt -> opt.verbosity(NodeVerbosity.VERBOSE));
68+
Assertions.assertThat(nodes)
69+
.as("cluster at least 2 nodes").hasSizeGreaterThanOrEqualTo(2);
70+
71+
Node source = null;
72+
Node target = null;
73+
for (var node : nodes) {
74+
if (source == null && !node.shards().isEmpty()) {
75+
source = node;
76+
} else if (target == null) {
77+
target = node;
78+
}
79+
}
80+
81+
var wantShard = source.shards().get(0).name();
82+
var srcNode = source.name();
83+
var tgtNode = target.name();
84+
85+
// Act: start replication
86+
var replication = client.cluster.replicate(
87+
nsThings,
88+
wantShard,
89+
srcNode,
90+
tgtNode,
91+
ReplicationType.MOVE);
92+
93+
var got = client.cluster.replication.get(replication.uuid());
94+
Assertions.assertThat(got).get()
95+
.as("expected replication status")
96+
.returns(nsThings, Replication::collection)
97+
.returns(wantShard, Replication::shard)
98+
.returns(srcNode, Replication::sourceNode)
99+
.returns(tgtNode, Replication::targetNode)
100+
.returns(ReplicationType.MOVE, Replication::type)
101+
.returns(null, Replication::history)
102+
.extracting(Replication::status).isNotNull();
103+
104+
var withHistory = client.cluster.replication.get(
105+
replication.uuid(),
106+
repl -> repl.includeHistory(true));
107+
Assertions.assertThat(withHistory).get()
108+
.as("includes history")
109+
.extracting(Replication::history).isNotNull();
110+
111+
// Act: query replications
112+
var filtered = client.cluster.replication.list(
113+
repl -> repl
114+
.collection(nsThings)
115+
.shard(wantShard)
116+
.targetNode(tgtNode));
117+
118+
Assertions.assertThat(filtered)
119+
.as("existing replications for %s-%s -> %s", nsThings, wantShard, tgtNode)
120+
.hasSize(1);
121+
122+
// Act: cancel
123+
client.cluster.replication.cancel(replication.uuid());
124+
125+
eventually(() -> client.cluster.replication.get(replication.uuid())
126+
.orElseThrow()
127+
.status().state() == ReplicationState.CANCELED, 1000, 25, "replication must be canceled");
128+
129+
// Act: delete replication
130+
client.cluster.replication.delete(replication.uuid());
131+
132+
eventually(() -> client.cluster.replication.list().isEmpty(), 1000, 15, "replication must be deleted");
133+
}
53134
}

src/main/java/io/weaviate/client6/v1/api/cluster/NodeVerbosity.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,27 @@
22

33
import com.google.gson.annotations.SerializedName;
44

5-
public enum NodeVerbosity {
5+
import io.weaviate.client6.v1.internal.json.JsonEnum;
6+
7+
public enum NodeVerbosity implements JsonEnum<NodeVerbosity> {
68
@SerializedName("minimal")
7-
MINIMAL,
9+
MINIMAL("minimal"),
810
@SerializedName("verbose")
9-
VERBOSE;
11+
VERBOSE("verbose");
12+
13+
private final String jsonValue;
14+
15+
private NodeVerbosity(String jsonValue) {
16+
this.jsonValue = jsonValue;
17+
}
18+
19+
@Override
20+
public String jsonValue() {
21+
return jsonValue;
22+
}
23+
24+
@Override
25+
public String toString() {
26+
return jsonValue();
27+
}
1028
}

src/main/java/io/weaviate/client6/v1/api/cluster/WeaviateClusterClient.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,25 @@
66
import java.util.function.Function;
77

88
import io.weaviate.client6.v1.api.WeaviateApiException;
9+
import io.weaviate.client6.v1.api.cluster.replication.CreateReplicationRequest;
10+
import io.weaviate.client6.v1.api.cluster.replication.Replication;
11+
import io.weaviate.client6.v1.api.cluster.replication.ReplicationType;
12+
import io.weaviate.client6.v1.api.cluster.replication.WeaviateReplicationClient;
913
import io.weaviate.client6.v1.internal.ObjectBuilder;
1014
import io.weaviate.client6.v1.internal.rest.RestTransport;
1115

1216
public class WeaviateClusterClient {
1317
private final RestTransport restTransport;
1418

19+
/**
20+
* Client for {@code /replication/replicate} endpoints for managing
21+
* replications.
22+
*/
23+
public final WeaviateReplicationClient replication;
24+
1525
public WeaviateClusterClient(RestTransport restTransport) {
1626
this.restTransport = restTransport;
27+
this.replication = new WeaviateReplicationClient(restTransport);
1728
}
1829

1930
/**
@@ -75,4 +86,29 @@ public List<Node> listNodes(Function<ListNodesRequest.Builder, ObjectBuilder<Lis
7586
throws IOException {
7687
return this.restTransport.performRequest(ListNodesRequest.of(fn), ListNodesRequest._ENDPOINT);
7788
}
89+
90+
/**
91+
* Start a replication operation for a collection's shard.
92+
*
93+
* @param collection Collection name.
94+
* @param shard Shard name.
95+
* @param sourceNode Node on which the shard currently resides.
96+
* @param targetNode Node onto which the files will be replicated.
97+
* @throws WeaviateApiException in case the server returned with an
98+
* error status code.
99+
* @throws IOException in case the request was not sent successfully
100+
* due to a malformed request, a networking error
101+
* or the server being unavailable.
102+
*/
103+
public Replication replicate(
104+
String collection,
105+
String shard,
106+
String sourceNode,
107+
String targetNode,
108+
ReplicationType type)
109+
throws IOException {
110+
return this.restTransport.performRequest(
111+
new CreateReplicationRequest(collection, shard, sourceNode, targetNode, type),
112+
CreateReplicationRequest._ENDPOINT);
113+
}
78114
}

src/main/java/io/weaviate/client6/v1/api/cluster/WeaviateClusterClientAsync.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,21 @@
66
import java.util.concurrent.CompletableFuture;
77
import java.util.function.Function;
88

9+
import io.weaviate.client6.v1.api.cluster.replication.CreateReplicationRequest;
10+
import io.weaviate.client6.v1.api.cluster.replication.Replication;
11+
import io.weaviate.client6.v1.api.cluster.replication.ReplicationType;
12+
import io.weaviate.client6.v1.api.cluster.replication.WeaviateReplicationClientAsync;
913
import io.weaviate.client6.v1.internal.ObjectBuilder;
1014
import io.weaviate.client6.v1.internal.rest.RestTransport;
1115

1216
public class WeaviateClusterClientAsync {
1317
private final RestTransport restTransport;
1418

19+
public final WeaviateReplicationClientAsync replication;
20+
1521
public WeaviateClusterClientAsync(RestTransport restTransport) {
1622
this.restTransport = restTransport;
23+
this.replication = new WeaviateReplicationClientAsync(restTransport);
1724
}
1825

1926
/**
@@ -54,4 +61,23 @@ public CompletableFuture<List<Node>> listNodes(Function<ListNodesRequest.Builder
5461
throws IOException {
5562
return this.restTransport.performRequestAsync(ListNodesRequest.of(fn), ListNodesRequest._ENDPOINT);
5663
}
64+
65+
/**
66+
* Start a replication operation for a collection's shard.
67+
*
68+
* @param collection Collection name.
69+
* @param shard Shard name.
70+
* @param sourceNode Node on which the shard currently resides.
71+
* @param targetNode Node onto which the files will be replicated.
72+
*/
73+
public CompletableFuture<Replication> replicate(
74+
String collection,
75+
String shard,
76+
String sourceNode,
77+
String targetNode,
78+
ReplicationType type) {
79+
return this.restTransport.performRequestAsync(
80+
new CreateReplicationRequest(collection, shard, sourceNode, targetNode, type),
81+
CreateReplicationRequest._ENDPOINT);
82+
}
5783
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.weaviate.client6.v1.api.cluster.replication;
2+
3+
import java.util.Collections;
4+
import java.util.UUID;
5+
6+
import io.weaviate.client6.v1.internal.rest.Endpoint;
7+
import io.weaviate.client6.v1.internal.rest.SimpleEndpoint;
8+
9+
public record CancelReplicationRequest(UUID uuid) {
10+
11+
static final Endpoint<CancelReplicationRequest, Void> _ENDPOINT = SimpleEndpoint.sideEffect(
12+
request -> "POST",
13+
request -> "/replication/replicate/" + request.uuid() + "/cancel",
14+
__ -> Collections.emptyMap());
15+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.weaviate.client6.v1.api.cluster.replication;
2+
3+
import java.util.Collections;
4+
5+
import com.google.gson.annotations.SerializedName;
6+
7+
import io.weaviate.client6.v1.internal.json.JSON;
8+
import io.weaviate.client6.v1.internal.rest.Endpoint;
9+
import io.weaviate.client6.v1.internal.rest.SimpleEndpoint;
10+
11+
public record CreateReplicationRequest(
12+
@SerializedName("collection") String collection,
13+
@SerializedName("shard") String shard,
14+
@SerializedName("sourceNode") String sourceNode,
15+
@SerializedName("targetNode") String targetNode,
16+
@SerializedName("type") ReplicationType type) {
17+
18+
public static final Endpoint<CreateReplicationRequest, Replication> _ENDPOINT = new SimpleEndpoint<>(
19+
request -> "POST",
20+
request -> "/replication/replicate",
21+
request -> Collections.emptyMap(),
22+
request -> JSON.serialize(request),
23+
(__, response) -> JSON.deserialize(response, Replication.class));
24+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.weaviate.client6.v1.api.cluster.replication;
2+
3+
import java.util.Collections;
4+
5+
import io.weaviate.client6.v1.internal.rest.Endpoint;
6+
import io.weaviate.client6.v1.internal.rest.SimpleEndpoint;
7+
8+
public record DeleteAllReplicationsRequest() {
9+
10+
static final Endpoint<Void, Void> _ENDPOINT = SimpleEndpoint.sideEffect(
11+
request -> "DELETE",
12+
request -> "/replication/replicate",
13+
__ -> Collections.emptyMap());
14+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.weaviate.client6.v1.api.cluster.replication;
2+
3+
import java.util.Collections;
4+
import java.util.UUID;
5+
6+
import io.weaviate.client6.v1.internal.rest.Endpoint;
7+
import io.weaviate.client6.v1.internal.rest.SimpleEndpoint;
8+
9+
public record DeleteReplicationRequest(UUID uuid) {
10+
11+
static final Endpoint<DeleteReplicationRequest, Void> _ENDPOINT = SimpleEndpoint.sideEffect(
12+
request -> "DELETE",
13+
request -> "/replication/replicate/" + request.uuid(),
14+
__ -> Collections.emptyMap());
15+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package io.weaviate.client6.v1.api.cluster.replication;
2+
3+
import java.util.Collections;
4+
import java.util.Optional;
5+
import java.util.UUID;
6+
import java.util.function.Function;
7+
8+
import io.weaviate.client6.v1.internal.ObjectBuilder;
9+
import io.weaviate.client6.v1.internal.rest.Endpoint;
10+
import io.weaviate.client6.v1.internal.rest.OptionalEndpoint;
11+
12+
public record GetReplicationRequest(UUID uuid, boolean includeHistory) {
13+
14+
static final Endpoint<GetReplicationRequest, Optional<Replication>> _ENDPOINT = OptionalEndpoint.noBodyOptional(
15+
request -> "GET",
16+
request -> "/replication/replicate/" + request.uuid(),
17+
request -> Collections.singletonMap("includeHistory", request.includeHistory()),
18+
Replication.class);
19+
20+
public static GetReplicationRequest of(UUID uuid) {
21+
return of(uuid, ObjectBuilder.identity());
22+
}
23+
24+
public static GetReplicationRequest of(UUID uuid, Function<Builder, ObjectBuilder<GetReplicationRequest>> fn) {
25+
return fn.apply(new Builder(uuid)).build();
26+
}
27+
28+
public GetReplicationRequest(Builder builder) {
29+
this(builder.uuid, builder.includeHistory);
30+
}
31+
32+
public static class Builder implements ObjectBuilder<GetReplicationRequest> {
33+
private final UUID uuid;
34+
private boolean includeHistory = false;
35+
36+
public Builder(UUID uuid) {
37+
this.uuid = uuid;
38+
}
39+
40+
/**
41+
* Include history of statuses for this replication.
42+
*
43+
* @see Replication#history
44+
*/
45+
public Builder includeHistory(boolean includeHistory) {
46+
this.includeHistory = includeHistory;
47+
return this;
48+
}
49+
50+
@Override
51+
public GetReplicationRequest build() {
52+
return new GetReplicationRequest(this);
53+
}
54+
}
55+
56+
}

0 commit comments

Comments
 (0)