|
8 | 8 | import static org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest.DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT; |
9 | 9 | import static org.opensearch.ml.common.utils.StringUtils.gson; |
10 | 10 |
|
| 11 | +import java.util.ArrayList; |
11 | 12 | import java.util.Collection; |
12 | 13 | import java.util.Collections; |
13 | 14 | import java.util.HashMap; |
|
23 | 24 | import java.util.stream.Collectors; |
24 | 25 | import java.util.stream.StreamSupport; |
25 | 26 |
|
| 27 | +import org.apache.commons.lang3.StringUtils; |
26 | 28 | import org.apache.commons.lang3.math.NumberUtils; |
27 | 29 | import org.apache.logging.log4j.util.Strings; |
28 | 30 | import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; |
|
65 | 67 | @ToolAnnotation(ListIndexTool.TYPE) |
66 | 68 | public class ListIndexTool implements Tool { |
67 | 69 | public static final String TYPE = "ListIndexTool"; |
| 70 | + public static final String INPUT_SCHEMA_FIELD = "input_schema"; |
| 71 | + public static final String STRICT_FIELD = "strict"; |
68 | 72 | // This needs to be changed once it's changed in opensearch core in RestIndicesListAction. |
69 | 73 | private static final int MAX_SUPPORTED_LIST_INDICES_PAGE_SIZE = 5000; |
70 | 74 | public static final int DEFAULT_PAGE_SIZE = 100; |
71 | | - private static final String DEFAULT_DESCRIPTION = String |
| 75 | + public static final String DEFAULT_DESCRIPTION = String |
72 | 76 | .join( |
73 | 77 | " ", |
74 | 78 | "This tool gets index information from the OpenSearch cluster.", |
75 | | - "It takes 2 optional arguments named `index` which is a comma-delimited list of one or more indices to get information from (default is an empty list meaning all indices),", |
| 79 | + "It takes 2 optional arguments named `indices` which is a comma-delimited list of one or more indices to get information from (default is an empty list meaning all indices),", |
76 | 80 | "and `local` which means whether to return information from the local node only instead of the cluster manager node (default is false).", |
77 | 81 | "The tool returns the indices information, including `health`, `status`, `index`, `uuid`, `pri`, `rep`, `docs.count`, `docs.deleted`, `store.size`, `pri.store. size `, `pri.store.size`, `pri.store`." |
78 | 82 | ); |
| 83 | + public static final String DEFAULT_INPUT_SCHEMA = "{\"type\":\"object\"," |
| 84 | + + "\"properties\":{\"indices\":{\"type\":\"array\",\"items\": {\"type\": \"string\"}," |
| 85 | + + "\"description\":\"OpenSearch index name list, separated by comma. " |
| 86 | + + "for example: [\\\"index1\\\", \\\"index2\\\"], use empty array [] to list all indices in the cluster\"}}," |
| 87 | + + "\"additionalProperties\":false}"; |
79 | 88 |
|
80 | 89 | @Setter |
81 | 90 | @Getter |
@@ -111,62 +120,67 @@ public Object parse(Object o) { |
111 | 120 | }; |
112 | 121 |
|
113 | 122 | this.attributes = new HashMap<>(); |
114 | | - attributes |
115 | | - .put( |
116 | | - "input_schema", |
117 | | - "{\"type\":\"object\",\"properties\":{\"indices\":{\"type\":\"string\",\"description\":\"OpenSearch index name list, separated by comma. for example: index1, index2\"}},\"additionalProperties\":false}" |
118 | | - ); |
119 | | - attributes.put("strict", false); |
| 123 | + attributes.put(INPUT_SCHEMA_FIELD, DEFAULT_INPUT_SCHEMA); |
| 124 | + attributes.put(STRICT_FIELD, false); |
120 | 125 | } |
121 | 126 |
|
122 | 127 | @Override |
123 | 128 | public <T> void run(Map<String, String> parameters, ActionListener<T> listener) { |
124 | 129 | // TODO: This logic exactly matches the OpenSearch _list/indices REST action. If code at |
125 | 130 | // o.o.rest/action/list/RestIndicesListAction.java changes those changes need to be reflected here |
126 | | - @SuppressWarnings("unchecked") |
127 | | - List<String> indexList = parameters.containsKey("indices") |
128 | | - ? gson.fromJson(parameters.get("indices"), List.class) |
129 | | - : Collections.emptyList(); |
130 | | - final String[] indices = indexList.toArray(Strings.EMPTY_ARRAY); |
131 | | - |
132 | | - final IndicesOptions indicesOptions = IndicesOptions.strictExpand(); |
133 | | - final boolean local = parameters.containsKey("local") && Boolean.parseBoolean(parameters.get("local")); |
134 | | - final boolean includeUnloadedSegments = Boolean.parseBoolean(parameters.get("include_unloaded_segments")); |
135 | | - final int pageSize = parameters.containsKey("page_size") |
136 | | - ? NumberUtils.toInt(parameters.get("page_size"), DEFAULT_PAGE_SIZE) |
137 | | - : DEFAULT_PAGE_SIZE; |
138 | | - final PageParams pageParams = new PageParams(null, PageParams.PARAM_ASC_SORT_VALUE, pageSize); |
139 | | - |
140 | | - final ActionListener<Table> internalListener = ActionListener.notifyOnce(ActionListener.wrap(table -> { |
141 | | - // Handle empty table |
142 | | - if (table == null || table.getRows().isEmpty()) { |
143 | | - @SuppressWarnings("unchecked") |
144 | | - T empty = (T) ("There were no results searching the indices parameter [" + parameters.get("indices") + "]."); |
145 | | - listener.onResponse(empty); |
146 | | - return; |
| 131 | + try { |
| 132 | + List<String> indexList = new ArrayList<>(); |
| 133 | + if (StringUtils.isNotBlank(parameters.get("indices"))) { |
| 134 | + indexList = parameters.containsKey("indices") |
| 135 | + ? gson.fromJson(parameters.get("indices"), List.class) |
| 136 | + : Collections.emptyList(); |
147 | 137 | } |
148 | | - StringBuilder sb = new StringBuilder( |
149 | | - // Currently using c.value which is short header matching _cat/indices |
150 | | - // May prefer to use c.attr.get("desc") for full description |
151 | | - table.getHeaders().stream().map(c -> c.value.toString()).collect(Collectors.joining(",", "", "\n")) |
| 138 | + final String[] indices = indexList.toArray(Strings.EMPTY_ARRAY); |
| 139 | + |
| 140 | + final IndicesOptions indicesOptions = IndicesOptions.strictExpand(); |
| 141 | + final boolean local = parameters.containsKey("local") && Boolean.parseBoolean(parameters.get("local")); |
| 142 | + final boolean includeUnloadedSegments = Boolean.parseBoolean(parameters.get("include_unloaded_segments")); |
| 143 | + final int pageSize = parameters.containsKey("page_size") |
| 144 | + ? NumberUtils.toInt(parameters.get("page_size"), DEFAULT_PAGE_SIZE) |
| 145 | + : DEFAULT_PAGE_SIZE; |
| 146 | + final PageParams pageParams = new PageParams(null, PageParams.PARAM_ASC_SORT_VALUE, pageSize); |
| 147 | + |
| 148 | + final ActionListener<Table> internalListener = ActionListener.notifyOnce(ActionListener.wrap(table -> { |
| 149 | + // Handle empty table |
| 150 | + if (table == null || table.getRows().isEmpty()) { |
| 151 | + @SuppressWarnings("unchecked") |
| 152 | + T empty = (T) ("There were no results searching the indices parameter [" + parameters.get("indices") + "]."); |
| 153 | + listener.onResponse(empty); |
| 154 | + return; |
| 155 | + } |
| 156 | + StringBuilder sb = new StringBuilder( |
| 157 | + // Currently using c.value which is short header matching _cat/indices |
| 158 | + // May prefer to use c.attr.get("desc") for full description |
| 159 | + table.getHeaders().stream().map(c -> c.value.toString()).collect(Collectors.joining(",", "", "\n")) |
| 160 | + ); |
| 161 | + for (List<Cell> row : table.getRows()) { |
| 162 | + sb |
| 163 | + .append( |
| 164 | + row.stream().map(c -> c.value == null ? null : c.value.toString()).collect(Collectors.joining(",", "", "\n")) |
| 165 | + ); |
| 166 | + } |
| 167 | + @SuppressWarnings("unchecked") |
| 168 | + T response = (T) sb.toString(); |
| 169 | + listener.onResponse(response); |
| 170 | + }, listener::onFailure)); |
| 171 | + |
| 172 | + fetchClusterInfoAndPages( |
| 173 | + indices, |
| 174 | + local, |
| 175 | + includeUnloadedSegments, |
| 176 | + pageParams, |
| 177 | + indicesOptions, |
| 178 | + new ConcurrentLinkedQueue<>(), |
| 179 | + internalListener |
152 | 180 | ); |
153 | | - for (List<Cell> row : table.getRows()) { |
154 | | - sb.append(row.stream().map(c -> c.value == null ? null : c.value.toString()).collect(Collectors.joining(",", "", "\n"))); |
155 | | - } |
156 | | - @SuppressWarnings("unchecked") |
157 | | - T response = (T) sb.toString(); |
158 | | - listener.onResponse(response); |
159 | | - }, listener::onFailure)); |
160 | | - |
161 | | - fetchClusterInfoAndPages( |
162 | | - indices, |
163 | | - local, |
164 | | - includeUnloadedSegments, |
165 | | - pageParams, |
166 | | - indicesOptions, |
167 | | - new ConcurrentLinkedQueue<>(), |
168 | | - internalListener |
169 | | - ); |
| 181 | + } catch (Exception e) { |
| 182 | + listener.onFailure(e); |
| 183 | + } |
170 | 184 | } |
171 | 185 |
|
172 | 186 | private void fetchClusterInfoAndPages( |
|
0 commit comments