|
57 | 57 | import static org.hamcrest.Matchers.empty; |
58 | 58 | import static org.hamcrest.Matchers.equalTo; |
59 | 59 | import static org.hamcrest.Matchers.hasItem; |
| 60 | +import static org.hamcrest.Matchers.in; |
60 | 61 | import static org.hamcrest.Matchers.is; |
61 | 62 | import static org.hamcrest.Matchers.iterableWithSize; |
62 | 63 | import static org.hamcrest.Matchers.notNullValue; |
@@ -2119,6 +2120,140 @@ private void expectEmptyDataStreamStats(String user, Request request) throws IOE |
2119 | 2120 | assertThat(path.evaluate("backing_indices"), equalTo(0)); |
2120 | 2121 | } |
2121 | 2122 |
|
| 2123 | + public void testWatcher() throws Exception { |
| 2124 | + setupDataStream(); |
| 2125 | + setupOtherDataStream(); |
| 2126 | + |
| 2127 | + final Tuple<String, String> backingIndices = getSingleDataAndFailureIndices("test1"); |
| 2128 | + final String dataIndexName = backingIndices.v1(); |
| 2129 | + final String failureIndexName = backingIndices.v2(); |
| 2130 | + |
| 2131 | + final String watchId = "failures-watch"; |
| 2132 | + final String username = "user"; |
| 2133 | + final String roleName = "role"; |
| 2134 | + createUser(username, PASSWORD, roleName); |
| 2135 | + |
| 2136 | + { |
| 2137 | + // grant access to the failure store |
| 2138 | + createOrUpdateRoleAndApiKey(username, roleName, """ |
| 2139 | + { |
| 2140 | + "cluster": ["manage_security", "manage_watcher"], |
| 2141 | + "indices": [ |
| 2142 | + { |
| 2143 | + "names": ["test1"], |
| 2144 | + "privileges": ["read_failure_store"] |
| 2145 | + } |
| 2146 | + ] |
| 2147 | + } |
| 2148 | + """); |
| 2149 | + |
| 2150 | + // searching the failure store should return only test1 failure indices |
| 2151 | + createOrUpdateWatcher(username, watchId, Strings.format(""" |
| 2152 | + { |
| 2153 | + "trigger": { "schedule": { "interval": "60m"}}, |
| 2154 | + "input": { |
| 2155 | + "search": { |
| 2156 | + "request": { |
| 2157 | + "indices": [ "%s" ], |
| 2158 | + "body": {"query": {"match_all": {}}} |
| 2159 | + } |
| 2160 | + } |
| 2161 | + } |
| 2162 | + }""", randomFrom("test1::failures", "test*::failures", "*::failures", failureIndexName))); |
| 2163 | + executeWatchAndAssertResults(username, watchId, failureIndexName); |
| 2164 | + |
| 2165 | + // searching the data should return empty results |
| 2166 | + createOrUpdateWatcher(username, watchId, Strings.format(""" |
| 2167 | + { |
| 2168 | + "trigger": { "schedule": { "interval": "60m"}}, |
| 2169 | + "input": { |
| 2170 | + "search": { |
| 2171 | + "request": { |
| 2172 | + "indices": [ "%s" ], |
| 2173 | + "body": {"query": {"match_all": {}}} |
| 2174 | + } |
| 2175 | + } |
| 2176 | + } |
| 2177 | + }""", randomFrom(dataIndexName, "*", "test*", "test1", "test1::data"))); |
| 2178 | + executeWatchAndAssertEmptyResults(username, watchId); |
| 2179 | + } |
| 2180 | + |
| 2181 | + { |
| 2182 | + // remove read_failure_store and add read |
| 2183 | + createOrUpdateRoleAndApiKey(username, roleName, """ |
| 2184 | + { |
| 2185 | + "cluster": ["manage_security", "manage_watcher"], |
| 2186 | + "indices": [ |
| 2187 | + { |
| 2188 | + "names": ["test1"], |
| 2189 | + "privileges": ["read"] |
| 2190 | + } |
| 2191 | + ] |
| 2192 | + } |
| 2193 | + """); |
| 2194 | + |
| 2195 | + // searching the failure store should return empty results |
| 2196 | + createOrUpdateWatcher(username, watchId, Strings.format(""" |
| 2197 | + { |
| 2198 | + "trigger": { "schedule": { "interval": "60m"}}, |
| 2199 | + "input": { |
| 2200 | + "search": { |
| 2201 | + "request": { |
| 2202 | + "indices": [ "%s" ], |
| 2203 | + "body": {"query": {"match_all": {}}} |
| 2204 | + } |
| 2205 | + } |
| 2206 | + } |
| 2207 | + }""", randomFrom("test1::failures", "test*::failures", "*::failures", failureIndexName))); |
| 2208 | + executeWatchAndAssertEmptyResults(username, watchId); |
| 2209 | + |
| 2210 | + // searching the data should return single result |
| 2211 | + createOrUpdateWatcher(username, watchId, Strings.format(""" |
| 2212 | + { |
| 2213 | + "trigger": { "schedule": { "interval": "60m"}}, |
| 2214 | + "input": { |
| 2215 | + "search": { |
| 2216 | + "request": { |
| 2217 | + "indices": [ "%s" ], |
| 2218 | + "body": {"query": {"match_all": {}}} |
| 2219 | + } |
| 2220 | + } |
| 2221 | + } |
| 2222 | + }""", randomFrom("*", "test*", "test1", "test1::data", dataIndexName))); |
| 2223 | + executeWatchAndAssertResults(username, watchId, dataIndexName); |
| 2224 | + } |
| 2225 | + } |
| 2226 | + |
| 2227 | + private void executeWatchAndAssertResults(String user, String watchId, final String... expectedIndices) throws IOException { |
| 2228 | + Request request = new Request("POST", "_watcher/watch/" + watchId + "/_execute"); |
| 2229 | + Response response = performRequest(user, request); |
| 2230 | + ObjectPath path = assertOKAndCreateObjectPath(response); |
| 2231 | + assertThat(path.evaluate("watch_record.user"), equalTo(user)); |
| 2232 | + assertThat(path.evaluate("watch_record.state"), equalTo("executed")); |
| 2233 | + assertThat(path.evaluate("watch_record.result.input.status"), equalTo("success")); |
| 2234 | + assertThat(path.evaluate("watch_record.result.input.payload.hits.total"), equalTo(expectedIndices.length)); |
| 2235 | + List<Map<String, ?>> hits = path.evaluate("watch_record.result.input.payload.hits.hits"); |
| 2236 | + hits.stream().map(hit -> hit.get("_index")).forEach(index -> { assertThat(index, is(in(expectedIndices))); }); |
| 2237 | + } |
| 2238 | + |
| 2239 | + private void executeWatchAndAssertEmptyResults(String user, String watchId) throws IOException { |
| 2240 | + Request request = new Request("POST", "_watcher/watch/" + watchId + "/_execute"); |
| 2241 | + Response response = performRequest(user, request); |
| 2242 | + ObjectPath path = assertOKAndCreateObjectPath(response); |
| 2243 | + assertThat(path.evaluate("watch_record.user"), equalTo(user)); |
| 2244 | + assertThat(path.evaluate("watch_record.state"), equalTo("executed")); |
| 2245 | + assertThat(path.evaluate("watch_record.result.input.status"), equalTo("success")); |
| 2246 | + assertThat(path.evaluate("watch_record.result.input.payload.hits.total"), equalTo(0)); |
| 2247 | + List<Map<String, ?>> hits = path.evaluate("watch_record.result.input.payload.hits.hits"); |
| 2248 | + assertThat(hits.size(), equalTo(0)); |
| 2249 | + } |
| 2250 | + |
| 2251 | + private void createOrUpdateWatcher(String user, String watchId, String watch) throws IOException { |
| 2252 | + Request request = new Request("PUT", "/_watcher/watch/" + watchId); |
| 2253 | + request.setJsonEntity(watch); |
| 2254 | + assertOK(performRequest(user, request)); |
| 2255 | + } |
| 2256 | + |
2122 | 2257 | public void testAliasBasedAccess() throws Exception { |
2123 | 2258 | List<String> docIds = setupDataStream(); |
2124 | 2259 | assertThat(docIds.size(), equalTo(2)); |
|
0 commit comments