Skip to content

Commit 07aa9cd

Browse files
authored
[Connector API] Support cleaning up sync jobs when deleting a connector (#107253)
1 parent 4707dee commit 07aa9cd

File tree

13 files changed

+289
-18
lines changed

13 files changed

+289
-18
lines changed

docs/changelog/107253.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 107253
2+
summary: "[Connector API] Support cleaning up sync jobs when deleting a connector"
3+
area: Application
4+
type: feature
5+
issues: []

rest-api-spec/src/main/resources/rest-api-spec/api/connector.delete.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@
2626
}
2727
}
2828
]
29+
},
30+
"params": {
31+
"delete_sync_jobs": {
32+
"type": "boolean",
33+
"default": false,
34+
"description": "Determines whether associated sync jobs are also deleted."
35+
}
2936
}
3037
}
3138
}

x-pack/plugin/ent-search/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ dependencies {
3030
testImplementation(testArtifact(project(xpackModule('core'))))
3131
testImplementation project(":test:framework")
3232
testImplementation(project(':modules:lang-mustache'))
33+
testImplementation(project(':modules:reindex'))
3334

3435
javaRestTestImplementation(project(path: xpackModule('core')))
3536
javaRestTestImplementation(testArtifact(project(xpackModule('core'))))

x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/320_connector_delete.yml

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,80 @@ setup:
2626
connector.get:
2727
connector_id: test-connector-to-delete
2828

29+
30+
---
31+
"Delete Connector - deletes associated sync jobs":
32+
33+
- do:
34+
connector_sync_job.post:
35+
body:
36+
id: test-connector-to-delete
37+
job_type: full
38+
trigger_method: on_demand
39+
- do:
40+
connector_sync_job.post:
41+
body:
42+
id: test-connector-to-delete
43+
job_type: full
44+
trigger_method: on_demand
45+
- do:
46+
connector_sync_job.post:
47+
body:
48+
id: test-connector-to-delete
49+
job_type: full
50+
trigger_method: on_demand
51+
52+
- do:
53+
connector_sync_job.list:
54+
connector_id: test-connector-to-delete
55+
56+
- match: { count: 3 }
57+
58+
- do:
59+
connector.delete:
60+
connector_id: test-connector-to-delete
61+
delete_sync_jobs: true
62+
63+
- match: { acknowledged: true }
64+
65+
66+
- do:
67+
connector_sync_job.list:
68+
connector_id: test-connector-to-delete
69+
70+
- match: { count: 0 }
71+
72+
73+
---
74+
"Delete Connector - doesn't associated sync jobs when delete_sync_jobs is false":
75+
76+
- do:
77+
connector_sync_job.post:
78+
body:
79+
id: test-connector-to-delete
80+
job_type: full
81+
trigger_method: on_demand
82+
83+
- do:
84+
connector_sync_job.list:
85+
connector_id: test-connector-to-delete
86+
87+
- match: { count: 1 }
88+
89+
- do:
90+
connector.delete:
91+
connector_id: test-connector-to-delete
92+
delete_sync_jobs: false
93+
94+
- match: { acknowledged: true }
95+
96+
97+
- do:
98+
connector_sync_job.list:
99+
connector_id: test-connector-to-delete
100+
101+
- match: { count: 1 }
102+
29103
---
30104
"Delete Connector - Connector does not exist":
31105
- do:

x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/ConnectorIndexService.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorSchedulingAction;
5555
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorServiceTypeAction;
5656
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorStatusAction;
57+
import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJob;
58+
import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobIndexService;
5759

5860
import java.time.Instant;
5961
import java.util.Arrays;
@@ -253,12 +255,13 @@ public void getConnector(String connectorId, ActionListener<ConnectorSearchResul
253255
}
254256

255257
/**
256-
* Deletes the {@link Connector} in the underlying index.
258+
* Deletes the {@link Connector} and the related instances of {@link ConnectorSyncJob} in the underlying index.
257259
*
258-
* @param connectorId The id of the connector object.
259-
* @param listener The action listener to invoke on response/failure.
260+
* @param connectorId The id of the {@link Connector}.
261+
* @param shouldDeleteSyncJobs The flag indicating if {@link ConnectorSyncJob} should also be deleted.
262+
* @param listener The action listener to invoke on response/failure.
260263
*/
261-
public void deleteConnector(String connectorId, ActionListener<DeleteResponse> listener) {
264+
public void deleteConnector(String connectorId, boolean shouldDeleteSyncJobs, ActionListener<DeleteResponse> listener) {
262265

263266
final DeleteRequest deleteRequest = new DeleteRequest(CONNECTOR_INDEX_NAME).id(connectorId)
264267
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
@@ -269,7 +272,11 @@ public void deleteConnector(String connectorId, ActionListener<DeleteResponse> l
269272
l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
270273
return;
271274
}
272-
l.onResponse(deleteResponse);
275+
if (shouldDeleteSyncJobs) {
276+
new ConnectorSyncJobIndexService(client).deleteAllSyncJobsByConnectorId(connectorId, l.map(r -> deleteResponse));
277+
} else {
278+
l.onResponse(deleteResponse);
279+
}
273280
}));
274281
} catch (Exception e) {
275282
listener.onFailure(e);

x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/DeleteConnectorAction.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.xcontent.ToXContentObject;
1919
import org.elasticsearch.xcontent.XContentBuilder;
2020
import org.elasticsearch.xcontent.XContentParser;
21+
import org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry;
2122

2223
import java.io.IOException;
2324
import java.util.Objects;
@@ -35,16 +36,20 @@ private DeleteConnectorAction() {/* no instances */}
3536
public static class Request extends ConnectorActionRequest implements ToXContentObject {
3637

3738
private final String connectorId;
39+
private final boolean deleteSyncJobs;
3840

3941
private static final ParseField CONNECTOR_ID_FIELD = new ParseField("connector_id");
42+
private static final ParseField DELETE_SYNC_JOB_FIELD = new ParseField("delete_sync_jobs");
4043

4144
public Request(StreamInput in) throws IOException {
4245
super(in);
4346
this.connectorId = in.readString();
47+
this.deleteSyncJobs = in.readBoolean();
4448
}
4549

46-
public Request(String connectorId) {
50+
public Request(String connectorId, boolean deleteSyncJobs) {
4751
this.connectorId = connectorId;
52+
this.deleteSyncJobs = deleteSyncJobs;
4853
}
4954

5055
@Override
@@ -62,40 +67,55 @@ public String getConnectorId() {
6267
return connectorId;
6368
}
6469

70+
public boolean shouldDeleteSyncJobs() {
71+
return deleteSyncJobs;
72+
}
73+
74+
@Override
75+
public String[] indices() {
76+
// When deleting a connector, corresponding sync jobs can also be deleted
77+
return new String[] {
78+
ConnectorTemplateRegistry.CONNECTOR_SYNC_JOBS_INDEX_NAME_PATTERN,
79+
ConnectorTemplateRegistry.CONNECTOR_INDEX_NAME_PATTERN };
80+
}
81+
6582
@Override
6683
public void writeTo(StreamOutput out) throws IOException {
6784
super.writeTo(out);
6885
out.writeString(connectorId);
86+
out.writeBoolean(deleteSyncJobs);
6987
}
7088

7189
@Override
7290
public boolean equals(Object o) {
7391
if (this == o) return true;
7492
if (o == null || getClass() != o.getClass()) return false;
7593
Request request = (Request) o;
76-
return Objects.equals(connectorId, request.connectorId);
94+
return deleteSyncJobs == request.deleteSyncJobs && Objects.equals(connectorId, request.connectorId);
7795
}
7896

7997
@Override
8098
public int hashCode() {
81-
return Objects.hash(connectorId);
99+
return Objects.hash(connectorId, deleteSyncJobs);
82100
}
83101

84102
@Override
85103
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
86104
builder.startObject();
87105
builder.field(CONNECTOR_ID_FIELD.getPreferredName(), connectorId);
106+
builder.field(DELETE_SYNC_JOB_FIELD.getPreferredName(), deleteSyncJobs);
88107
builder.endObject();
89108
return builder;
90109
}
91110

92111
private static final ConstructingObjectParser<DeleteConnectorAction.Request, Void> PARSER = new ConstructingObjectParser<>(
93112
"delete_connector_request",
94113
false,
95-
(p) -> new Request((String) p[0])
114+
(p) -> new Request((String) p[0], (boolean) p[1])
96115
);
97116
static {
98117
PARSER.declareString(constructorArg(), CONNECTOR_ID_FIELD);
118+
PARSER.declareBoolean(constructorArg(), DELETE_SYNC_JOB_FIELD);
99119
}
100120

101121
public static DeleteConnectorAction.Request parse(XContentParser parser) {

x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/RestDeleteConnectorAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,11 @@ public List<Route> routes() {
3535

3636
@Override
3737
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
38-
DeleteConnectorAction.Request request = new DeleteConnectorAction.Request(restRequest.param("connector_id"));
38+
39+
String connectorId = restRequest.param("connector_id");
40+
boolean shouldDeleteSyncJobs = restRequest.paramAsBoolean("delete_sync_jobs", false);
41+
42+
DeleteConnectorAction.Request request = new DeleteConnectorAction.Request(connectorId, shouldDeleteSyncJobs);
3943
return channel -> client.execute(DeleteConnectorAction.INSTANCE, request, new RestToXContentListener<>(channel));
4044
}
4145
}

x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/action/TransportDeleteConnectorAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public TransportDeleteConnectorAction(
4343
@Override
4444
protected void doExecute(Task task, DeleteConnectorAction.Request request, ActionListener<AcknowledgedResponse> listener) {
4545
String connectorId = request.getConnectorId();
46-
connectorIndexService.deleteConnector(connectorId, listener.map(v -> AcknowledgedResponse.TRUE));
46+
boolean shouldDeleteSyncJobs = request.shouldDeleteSyncJobs();
47+
connectorIndexService.deleteConnector(connectorId, shouldDeleteSyncJobs, listener.map(v -> AcknowledgedResponse.TRUE));
4748
}
4849
}

x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,23 @@
77

88
package org.elasticsearch.xpack.application.connector.syncjob;
99

10+
import org.elasticsearch.ElasticsearchException;
1011
import org.elasticsearch.ElasticsearchStatusException;
1112
import org.elasticsearch.ExceptionsHelper;
1213
import org.elasticsearch.ResourceNotFoundException;
1314
import org.elasticsearch.action.ActionListener;
1415
import org.elasticsearch.action.DelegatingActionListener;
1516
import org.elasticsearch.action.DocWriteRequest;
1617
import org.elasticsearch.action.DocWriteResponse;
18+
import org.elasticsearch.action.bulk.BulkItemResponse;
1719
import org.elasticsearch.action.delete.DeleteRequest;
1820
import org.elasticsearch.action.delete.DeleteResponse;
1921
import org.elasticsearch.action.get.GetRequest;
2022
import org.elasticsearch.action.get.GetResponse;
2123
import org.elasticsearch.action.index.IndexRequest;
2224
import org.elasticsearch.action.search.SearchRequest;
2325
import org.elasticsearch.action.search.SearchResponse;
26+
import org.elasticsearch.action.support.IndicesOptions;
2427
import org.elasticsearch.action.support.WriteRequest;
2528
import org.elasticsearch.action.update.UpdateRequest;
2629
import org.elasticsearch.action.update.UpdateResponse;
@@ -33,6 +36,9 @@
3336
import org.elasticsearch.index.query.QueryBuilder;
3437
import org.elasticsearch.index.query.TermQueryBuilder;
3538
import org.elasticsearch.index.query.TermsQueryBuilder;
39+
import org.elasticsearch.index.reindex.BulkByScrollResponse;
40+
import org.elasticsearch.index.reindex.DeleteByQueryAction;
41+
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
3642
import org.elasticsearch.rest.RestStatus;
3743
import org.elasticsearch.search.SearchHit;
3844
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -58,6 +64,7 @@
5864
import java.util.Objects;
5965
import java.util.Optional;
6066
import java.util.function.BiConsumer;
67+
import java.util.stream.Collectors;
6168
import java.util.stream.Stream;
6269

6370
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
@@ -586,6 +593,37 @@ public void updateConnectorSyncJobError(String connectorSyncJobId, String error,
586593
}
587594
}
588595

596+
/**
597+
* Deletes all {@link ConnectorSyncJob} documents that match a specific {@link Connector} id in the underlying index.
598+
* Gracefully handles non-existent sync job index.
599+
*
600+
* @param connectorId The id of the {@link Connector} to match in the sync job documents.
601+
* @param listener The action listener to invoke on response/failure.
602+
*/
603+
public void deleteAllSyncJobsByConnectorId(String connectorId, ActionListener<BulkByScrollResponse> listener) {
604+
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(CONNECTOR_SYNC_JOB_INDEX_NAME).setQuery(
605+
new TermQueryBuilder(
606+
ConnectorSyncJob.CONNECTOR_FIELD.getPreferredName() + "." + Connector.ID_FIELD.getPreferredName(),
607+
connectorId
608+
)
609+
).setRefresh(true).setIndicesOptions(IndicesOptions.fromOptions(true, true, false, false));
610+
611+
client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener.delegateFailureAndWrap((l, r) -> {
612+
final List<BulkItemResponse.Failure> bulkDeleteFailures = r.getBulkFailures();
613+
if (bulkDeleteFailures.isEmpty() == false) {
614+
l.onFailure(
615+
new ElasticsearchException(
616+
"Error deleting sync jobs associated with connector ["
617+
+ connectorId
618+
+ "] "
619+
+ bulkDeleteFailures.stream().map(BulkItemResponse.Failure::getMessage).collect(Collectors.joining("\n"))
620+
)
621+
);
622+
}
623+
l.onResponse(r);
624+
}));
625+
}
626+
589627
/**
590628
* Listeners that checks failures for IndexNotFoundException and DocumentMissingException,
591629
* and transforms them in ResourceNotFoundException, invoking onFailure on the delegate listener.

x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/ConnectorIndexServiceTests.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import java.util.stream.IntStream;
5656

5757
import static org.elasticsearch.xpack.application.connector.ConnectorTestUtils.getRandomCronExpression;
58+
import static org.elasticsearch.xpack.application.connector.ConnectorTestUtils.registerSimplifiedConnectorIndexTemplates;
5859
import static org.hamcrest.CoreMatchers.anyOf;
5960
import static org.hamcrest.CoreMatchers.equalTo;
6061

@@ -66,6 +67,7 @@ public class ConnectorIndexServiceTests extends ESSingleNodeTestCase {
6667

6768
@Before
6869
public void setup() {
70+
registerSimplifiedConnectorIndexTemplates(indicesAdmin());
6971
this.connectorIndexService = new ConnectorIndexService(client());
7072
}
7173

@@ -104,11 +106,11 @@ public void testDeleteConnector() throws Exception {
104106
}
105107

106108
String connectorIdToDelete = connectorIds.get(0);
107-
DeleteResponse resp = awaitDeleteConnector(connectorIdToDelete);
109+
DeleteResponse resp = awaitDeleteConnector(connectorIdToDelete, false);
108110
assertThat(resp.status(), equalTo(RestStatus.OK));
109111
expectThrows(ResourceNotFoundException.class, () -> awaitGetConnector(connectorIdToDelete));
110112

111-
expectThrows(ResourceNotFoundException.class, () -> awaitDeleteConnector(connectorIdToDelete));
113+
expectThrows(ResourceNotFoundException.class, () -> awaitDeleteConnector(connectorIdToDelete, false));
112114
}
113115

114116
public void testUpdateConnectorConfiguration_FullConfiguration() throws Exception {
@@ -526,11 +528,11 @@ public void testUpdateConnectorApiKeyIdOrApiKeySecretId() throws Exception {
526528
assertThat(updateApiKeyIdRequest.getApiKeySecretId(), equalTo(indexedConnector.getApiKeySecretId()));
527529
}
528530

529-
private DeleteResponse awaitDeleteConnector(String connectorId) throws Exception {
531+
private DeleteResponse awaitDeleteConnector(String connectorId, boolean deleteConnectorSyncJobs) throws Exception {
530532
CountDownLatch latch = new CountDownLatch(1);
531533
final AtomicReference<DeleteResponse> resp = new AtomicReference<>(null);
532534
final AtomicReference<Exception> exc = new AtomicReference<>(null);
533-
connectorIndexService.deleteConnector(connectorId, new ActionListener<>() {
535+
connectorIndexService.deleteConnector(connectorId, deleteConnectorSyncJobs, new ActionListener<>() {
534536
@Override
535537
public void onResponse(DeleteResponse deleteResponse) {
536538
resp.set(deleteResponse);

0 commit comments

Comments
 (0)