|
76 | 76 | import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; |
77 | 77 | import static org.elasticsearch.xpack.application.connector.ConnectorFiltering.fromXContentBytesConnectorFiltering; |
78 | 78 | import static org.elasticsearch.xpack.application.connector.ConnectorFiltering.sortFilteringRulesByOrder; |
| 79 | +import static org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry.MANAGED_CONNECTOR_INDEX_PREFIX; |
79 | 80 |
|
80 | 81 | /** |
81 | 82 | * A service that manages persistent {@link Connector} configurations. |
@@ -807,38 +808,71 @@ public void updateConnectorLastSyncStats(UpdateConnectorLastSyncStatsAction.Requ |
807 | 808 | } |
808 | 809 |
|
809 | 810 | /** |
810 | | - * Updates the is_native property of a {@link Connector}. It always sets the {@link ConnectorStatus} to |
811 | | - * CONFIGURED. |
| 811 | + * Updates the is_native property of a {@link Connector}. It sets the {@link ConnectorStatus} to |
| 812 | + * CONFIGURED when connector is in CONNECTED state to indicate that connector needs to reconnect. |
812 | 813 | * |
813 | 814 | * @param request The request for updating the connector's is_native property. |
814 | 815 | * @param listener The listener for handling responses, including successful updates or errors. |
815 | 816 | */ |
816 | 817 | public void updateConnectorNative(UpdateConnectorNativeAction.Request request, ActionListener<UpdateResponse> listener) { |
817 | 818 | try { |
818 | 819 | String connectorId = request.getConnectorId(); |
| 820 | + boolean isNative = request.isNative(); |
819 | 821 |
|
820 | | - final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc( |
821 | | - new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) |
822 | | - .id(connectorId) |
823 | | - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) |
824 | | - .source( |
825 | | - Map.of( |
826 | | - Connector.IS_NATIVE_FIELD.getPreferredName(), |
827 | | - request.isNative(), |
828 | | - Connector.STATUS_FIELD.getPreferredName(), |
829 | | - ConnectorStatus.CONFIGURED.toString() |
830 | | - ) |
831 | | - ) |
| 822 | + getConnector(connectorId, listener.delegateFailure((l, connector) -> { |
832 | 823 |
|
833 | | - ); |
834 | | - client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> { |
835 | | - if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { |
836 | | - l.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); |
| 824 | + String indexName = getConnectorIndexNameFromSearchResult(connector); |
| 825 | + |
| 826 | + boolean doesNotHaveContentPrefix = indexName != null && isValidManagedConnectorIndexName(indexName) == false; |
| 827 | + // Ensure attached content index is prefixed correctly |
| 828 | + if (isNative && doesNotHaveContentPrefix) { |
| 829 | + l.onFailure( |
| 830 | + new ElasticsearchStatusException( |
| 831 | + "The index name [" |
| 832 | + + indexName |
| 833 | + + "] attached to the connector [" |
| 834 | + + connectorId |
| 835 | + + "] must start with the required prefix: [" |
| 836 | + + MANAGED_CONNECTOR_INDEX_PREFIX |
| 837 | + + "] to be Elastic-managed. Please update the attached index first to comply with this requirement.", |
| 838 | + RestStatus.BAD_REQUEST |
| 839 | + ) |
| 840 | + ); |
837 | 841 | return; |
838 | 842 | } |
839 | | - l.onResponse(updateResponse); |
840 | | - })); |
841 | 843 |
|
| 844 | + ConnectorStatus status = getConnectorStatusFromSearchResult(connector); |
| 845 | + |
| 846 | + // If connector was connected already, change its status to CONFIGURED as we need to re-connect |
| 847 | + boolean isConnected = status == ConnectorStatus.CONNECTED; |
| 848 | + boolean isValidTransitionToConfigured = ConnectorStateMachine.isValidTransition(status, ConnectorStatus.CONFIGURED); |
| 849 | + if (isConnected && isValidTransitionToConfigured) { |
| 850 | + status = ConnectorStatus.CONFIGURED; |
| 851 | + } |
| 852 | + |
| 853 | + final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).setRefreshPolicy( |
| 854 | + WriteRequest.RefreshPolicy.IMMEDIATE |
| 855 | + ) |
| 856 | + .doc( |
| 857 | + new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) |
| 858 | + .id(connectorId) |
| 859 | + .source( |
| 860 | + Map.of( |
| 861 | + Connector.IS_NATIVE_FIELD.getPreferredName(), |
| 862 | + isNative, |
| 863 | + Connector.STATUS_FIELD.getPreferredName(), |
| 864 | + status.toString() |
| 865 | + ) |
| 866 | + ) |
| 867 | + ); |
| 868 | + client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (ll, updateResponse) -> { |
| 869 | + if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { |
| 870 | + ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); |
| 871 | + return; |
| 872 | + } |
| 873 | + ll.onResponse(updateResponse); |
| 874 | + })); |
| 875 | + })); |
842 | 876 | } catch (Exception e) { |
843 | 877 | listener.onFailure(e); |
844 | 878 | } |
@@ -896,22 +930,45 @@ public void updateConnectorIndexName(UpdateConnectorIndexNameAction.Request requ |
896 | 930 | return; |
897 | 931 | } |
898 | 932 |
|
899 | | - final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc( |
900 | | - new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) |
901 | | - .id(connectorId) |
902 | | - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) |
903 | | - .source(new HashMap<>() { |
904 | | - { |
905 | | - put(Connector.INDEX_NAME_FIELD.getPreferredName(), request.getIndexName()); |
906 | | - } |
907 | | - }) |
908 | | - ); |
909 | | - client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (ll, updateResponse) -> { |
910 | | - if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { |
911 | | - ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); |
| 933 | + getConnector(connectorId, l.delegateFailure((ll, connector) -> { |
| 934 | + |
| 935 | + Boolean isNativeConnector = getConnectorIsNativeFlagFromSearchResult(connector); |
| 936 | + Boolean doesNotHaveContentPrefix = indexName != null && isValidManagedConnectorIndexName(indexName) == false; |
| 937 | + |
| 938 | + if (isNativeConnector && doesNotHaveContentPrefix) { |
| 939 | + ll.onFailure( |
| 940 | + new ElasticsearchStatusException( |
| 941 | + "Index attached to an Elastic-managed connector must start with the prefix: [" |
| 942 | + + MANAGED_CONNECTOR_INDEX_PREFIX |
| 943 | + + "]. The index name in the payload [" |
| 944 | + + indexName |
| 945 | + + "] doesn't comply with this requirement.", |
| 946 | + RestStatus.BAD_REQUEST |
| 947 | + ) |
| 948 | + ); |
912 | 949 | return; |
913 | 950 | } |
914 | | - ll.onResponse(updateResponse); |
| 951 | + |
| 952 | + final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc( |
| 953 | + new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX) |
| 954 | + .id(connectorId) |
| 955 | + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) |
| 956 | + .source(new HashMap<>() { |
| 957 | + { |
| 958 | + put(Connector.INDEX_NAME_FIELD.getPreferredName(), request.getIndexName()); |
| 959 | + } |
| 960 | + }) |
| 961 | + ); |
| 962 | + client.update( |
| 963 | + updateRequest, |
| 964 | + new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (lll, updateResponse) -> { |
| 965 | + if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) { |
| 966 | + lll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId))); |
| 967 | + return; |
| 968 | + } |
| 969 | + lll.onResponse(updateResponse); |
| 970 | + }) |
| 971 | + ); |
915 | 972 | })); |
916 | 973 | })); |
917 | 974 |
|
@@ -1064,6 +1121,18 @@ private ConnectorStatus getConnectorStatusFromSearchResult(ConnectorSearchResult |
1064 | 1121 | return ConnectorStatus.connectorStatus((String) searchResult.getResultMap().get(Connector.STATUS_FIELD.getPreferredName())); |
1065 | 1122 | } |
1066 | 1123 |
|
| 1124 | + private Boolean getConnectorIsNativeFlagFromSearchResult(ConnectorSearchResult searchResult) { |
| 1125 | + return (Boolean) searchResult.getResultMap().get(Connector.IS_NATIVE_FIELD.getPreferredName()); |
| 1126 | + } |
| 1127 | + |
| 1128 | + private String getConnectorIndexNameFromSearchResult(ConnectorSearchResult searchResult) { |
| 1129 | + return (String) searchResult.getResultMap().get(Connector.INDEX_NAME_FIELD.getPreferredName()); |
| 1130 | + } |
| 1131 | + |
| 1132 | + private boolean isValidManagedConnectorIndexName(String indexName) { |
| 1133 | + return indexName.startsWith(MANAGED_CONNECTOR_INDEX_PREFIX); |
| 1134 | + } |
| 1135 | + |
1067 | 1136 | @SuppressWarnings("unchecked") |
1068 | 1137 | private Map<String, Object> getConnectorConfigurationFromSearchResult(ConnectorSearchResult searchResult) { |
1069 | 1138 | return (Map<String, Object>) searchResult.getResultMap().get(Connector.CONFIGURATION_FIELD.getPreferredName()); |
|
0 commit comments