Skip to content

Commit 701259d

Browse files
jonathan-buttnerelasticsearchmachine
authored andcommitted
[ML] Transition EIS auth polling to persistent task on a single node (#136713)
* Creating new cluster state listener to kick off polling logic * Update docs/changelog/136713.yaml * [CI] Auto commit changes from spotless * Starting persistent tasks * Switching to a persistent task, need to create the action though * Adding master action * Successful task creation * Starting tests * More tests * Even more tests * [CI] Auto commit changes from spotless * Starting integration tests * Adding test stub * [CI] Auto commit changes from spotless * Adding integration test * Fixing relocation test * [CI] Auto commit changes from spotless * working test * Some clean up * Removing unneeded tests * [CI] Auto commit changes from spotless * Refactoring tests * updating transport version * [CI] Auto commit changes from spotless * Fixing transport version * Fixing check for preconfigured endpoints * [CI] Auto commit changes from spotless * Fixing tests * Fixing text embedding test * Addressing feedback * Marking task as failed * Fixing flaky test --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent 03a514c commit 701259d

File tree

44 files changed

+3076
-1606
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+3076
-1606
lines changed

docs/changelog/136713.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 136713
2+
summary: Transition EIS auth polling to persistent task on a single node
3+
area: Machine Learning
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/inference/Model.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,14 @@
99

1010
package org.elasticsearch.inference;
1111

12+
import org.elasticsearch.common.io.stream.StreamInput;
13+
import org.elasticsearch.common.io.stream.StreamOutput;
14+
import org.elasticsearch.common.io.stream.Writeable;
15+
16+
import java.io.IOException;
1217
import java.util.Objects;
1318

14-
public class Model {
19+
public class Model implements Writeable {
1520
public static String documentId(String modelId) {
1621
return "model_" + modelId;
1722
}
@@ -42,6 +47,11 @@ public Model(ModelConfigurations configurations) {
4247
this(configurations, new ModelSecrets());
4348
}
4449

50+
public Model(StreamInput in) throws IOException {
51+
this.configurations = new ModelConfigurations(in);
52+
this.secrets = new ModelSecrets(in);
53+
}
54+
4555
public String getInferenceEntityId() {
4656
return configurations.getInferenceEntityId();
4757
}
@@ -111,4 +121,10 @@ public boolean equals(Object o) {
111121
public int hashCode() {
112122
return Objects.hash(configurations, secrets);
113123
}
124+
125+
@Override
126+
public void writeTo(StreamOutput out) throws IOException {
127+
configurations.writeTo(out);
128+
secrets.writeTo(out);
129+
}
114130
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9215000
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
transform_preview_as_index_request,9214000
1+
inference_api_eis_authorization_persistent_task,9215000
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.inference.action;
9+
10+
import org.elasticsearch.action.ActionResponse;
11+
import org.elasticsearch.action.ActionType;
12+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
13+
import org.elasticsearch.common.io.stream.StreamInput;
14+
import org.elasticsearch.common.io.stream.StreamOutput;
15+
import org.elasticsearch.core.TimeValue;
16+
import org.elasticsearch.inference.Model;
17+
import org.elasticsearch.xpack.core.inference.results.ModelStoreResponse;
18+
19+
import java.io.IOException;
20+
import java.util.List;
21+
import java.util.Objects;
22+
23+
/**
24+
* Internal action to store inference endpoints and return the results of the store operation. This should only be used internally and not
25+
* exposed via a REST API.
26+
* For the exposed REST API action see {@link PutInferenceModelAction}.
27+
*/
28+
public class StoreInferenceEndpointsAction extends ActionType<StoreInferenceEndpointsAction.Response> {
29+
30+
public static final StoreInferenceEndpointsAction INSTANCE = new StoreInferenceEndpointsAction();
31+
public static final String NAME = "cluster:internal/xpack/inference/create_endpoints";
32+
33+
public StoreInferenceEndpointsAction() {
34+
super(NAME);
35+
}
36+
37+
public static class Request extends AcknowledgedRequest<Request> {
38+
private final List<Model> models;
39+
40+
public Request(List<Model> models, TimeValue timeout) {
41+
super(timeout, DEFAULT_ACK_TIMEOUT);
42+
this.models = Objects.requireNonNull(models);
43+
}
44+
45+
public Request(StreamInput in) throws IOException {
46+
super(in);
47+
models = in.readCollectionAsImmutableList(Model::new);
48+
}
49+
50+
@Override
51+
public void writeTo(StreamOutput out) throws IOException {
52+
super.writeTo(out);
53+
out.writeCollection(models);
54+
}
55+
56+
public List<Model> getModels() {
57+
return models;
58+
}
59+
60+
@Override
61+
public boolean equals(Object o) {
62+
if (o == null || getClass() != o.getClass()) return false;
63+
Request request = (Request) o;
64+
return Objects.equals(models, request.models);
65+
}
66+
67+
@Override
68+
public int hashCode() {
69+
return Objects.hashCode(models);
70+
}
71+
}
72+
73+
public static class Response extends ActionResponse {
74+
private final List<ModelStoreResponse> results;
75+
76+
public Response(List<ModelStoreResponse> results) {
77+
this.results = results;
78+
}
79+
80+
public Response(StreamInput in) throws IOException {
81+
results = in.readCollectionAsImmutableList(ModelStoreResponse::new);
82+
}
83+
84+
public List<ModelStoreResponse> getResults() {
85+
return results;
86+
}
87+
88+
@Override
89+
public void writeTo(StreamOutput out) throws IOException {
90+
out.writeCollection(results);
91+
}
92+
93+
@Override
94+
public boolean equals(Object o) {
95+
if (o == null || getClass() != o.getClass()) return false;
96+
Response response = (Response) o;
97+
return Objects.equals(results, response.results);
98+
}
99+
100+
@Override
101+
public int hashCode() {
102+
return Objects.hash(results);
103+
}
104+
}
105+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.inference.results;
9+
10+
import org.elasticsearch.common.io.stream.StreamInput;
11+
import org.elasticsearch.common.io.stream.StreamOutput;
12+
import org.elasticsearch.common.io.stream.Writeable;
13+
import org.elasticsearch.core.Nullable;
14+
import org.elasticsearch.rest.RestStatus;
15+
16+
import java.io.IOException;
17+
import java.util.Objects;
18+
19+
/**
20+
* Response for storing a model in the model registry using the bulk API.
21+
*/
22+
public record ModelStoreResponse(String inferenceId, RestStatus status, @Nullable Exception failureCause) implements Writeable {
23+
24+
public ModelStoreResponse(StreamInput in) throws IOException {
25+
this(in.readString(), RestStatus.readFrom(in), in.readException());
26+
}
27+
28+
public boolean failed() {
29+
return failureCause != null;
30+
}
31+
32+
@Override
33+
public void writeTo(StreamOutput out) throws IOException {
34+
out.writeString(inferenceId);
35+
RestStatus.writeTo(out, status);
36+
out.writeException(failureCause);
37+
}
38+
39+
@Override
40+
public boolean equals(Object o) {
41+
if (o == null || getClass() != o.getClass()) return false;
42+
ModelStoreResponse that = (ModelStoreResponse) o;
43+
return status == that.status && Objects.equals(inferenceId, that.inferenceId)
44+
// Exception does not have hashCode() or equals() so assume errors are equal iff class and message are equal
45+
&& Objects.equals(
46+
failureCause == null ? null : failureCause.getMessage(),
47+
that.failureCause == null ? null : that.failureCause.getMessage()
48+
)
49+
&& Objects.equals(
50+
failureCause == null ? null : failureCause.getClass(),
51+
that.failureCause == null ? null : that.failureCause.getClass()
52+
);
53+
}
54+
55+
@Override
56+
public int hashCode() {
57+
return Objects.hash(
58+
inferenceId,
59+
status,
60+
// Exception does not have hashCode() or equals() so assume errors are equal iff class and message are equal
61+
failureCause == null ? null : failureCause.getMessage(),
62+
failureCause == null ? null : failureCause.getClass()
63+
);
64+
}
65+
}

0 commit comments

Comments
 (0)