Skip to content

Commit 4e549cb

Browse files
authored
[Transform] Reset health status on successful noop (#135653)
When there are no results for the current checkpoint, the checkpoint is considered successful, and the transform will reset its failure count. This fixes a bug where transform will continue to report unhealthy if it previously failed to search the source index. Resolve #135650
1 parent daf5ddc commit 4e549cb

File tree

4 files changed

+92
-40
lines changed

4 files changed

+92
-40
lines changed

docs/changelog/135653.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 135653
2+
summary: Reset health status on successful empty checkpoint
3+
area: Machine Learning
4+
type: bug
5+
issues:
6+
- 135650

x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,6 @@ private void createContinuousTransform(String indexName, String transformId, Str
217217
* Verify the basic stats API, which includes state, health, and optionally progress (if it exists).
218218
* These are required for Kibana 8.13+.
219219
*/
220-
@SuppressWarnings("unchecked")
221220
public void testBasicContinuousTransformStats() throws Exception {
222221
var transformId = "transform-continuous-basic-stats";
223222
createContinuousTransform("continuous-basic-stats-reviews", transformId, "reviews-by-user-business-day");
@@ -234,6 +233,50 @@ public void testBasicContinuousTransformStats() throws Exception {
234233
deleteTransform(transformId);
235234
}
236235

236+
public void testEmptySourceIndexClearsErrors() throws Exception {
237+
var sourceIndexName = "source-empty-reviews";
238+
var destIndexName = "destination-empty-reviews";
239+
var transformId = "transform-empty-source-index";
240+
241+
createReviewsIndexMappings(sourceIndexName, null);
242+
243+
var config = createTransformConfigBuilder(transformId, destIndexName, QueryConfig.matchAll(), sourceIndexName).setPivotConfig(
244+
createPivotConfig(groupByUserOnly(), aggregateScoresAndTimes())
245+
)
246+
.setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)))
247+
.setSettings(new SettingsConfig.Builder().setUnattended(true).build())
248+
.build();
249+
250+
putTransform(transformId, Strings.toString(config), RequestOptions.DEFAULT);
251+
startTransform(config.getId(), RequestOptions.DEFAULT);
252+
253+
waitUntilCheckpoint(config.getId(), 1L);
254+
assertEquals("green", getTransformHealthStatus(transformId));
255+
256+
// this will cause the transform to fail to search
257+
assertAcknowledged(adminClient().performRequest(new Request("PUT", sourceIndexName + "/_block/read")));
258+
assertBusy(() -> assertThat(getTransformHealthStatus(transformId), oneOf("yellow", "red")), 30, TimeUnit.SECONDS);
259+
260+
// unblock reads on the search index and the transform should recover
261+
assertAcknowledged(adminClient().performRequest(new Request("DELETE", sourceIndexName + "/_block/read")));
262+
assertBusy(() -> assertEquals("green", getTransformHealthStatus(transformId)), 30, TimeUnit.SECONDS);
263+
264+
stopTransform(transformId);
265+
deleteTransform(transformId);
266+
deleteIndex(sourceIndexName);
267+
deleteIndex(destIndexName);
268+
}
269+
270+
private Map<String, SingleGroupSource> groupByUserOnly() {
271+
return Map.of("by-user", new TermsGroupSource("user_id", null, false));
272+
}
273+
274+
private AggregatorFactories.Builder aggregateScoresAndTimes() {
275+
return AggregatorFactories.builder()
276+
.addAggregator(AggregationBuilders.avg("review_score").field("stars"))
277+
.addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));
278+
}
279+
237280
public void testDestinationIndexBlocked() throws Exception {
238281
var transformId = "transform-continuous-blocked-destination";
239282
var sourceIndexName = "source-reviews";
@@ -385,12 +428,8 @@ public void testContinuousTransformUpdate() throws Exception {
385428
String indexName = "continuous-reviews-update";
386429
createReviewsIndex(indexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
387430

388-
Map<String, SingleGroupSource> groups = new HashMap<>();
389-
groups.put("by-user", new TermsGroupSource("user_id", null, false));
390-
391-
AggregatorFactories.Builder aggs = AggregatorFactories.builder()
392-
.addAggregator(AggregationBuilders.avg("review_score").field("stars"))
393-
.addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));
431+
var groups = groupByUserOnly();
432+
var aggs = aggregateScoresAndTimes();
394433

395434
String id = "transform-to-update";
396435
String dest = "reviews-by-user-business-day-to-update";
@@ -481,8 +520,7 @@ public void testRetentionPolicyDelete() throws Exception {
481520
String dest = "retention-policy-dest";
482521
createReviewsIndex(indexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
483522

484-
Map<String, SingleGroupSource> groups = new HashMap<>();
485-
groups.put("by-user", new TermsGroupSource("user_id", null, false));
523+
var groups = groupByUserOnly();
486524

487525
AggregatorFactories.Builder aggs = AggregatorFactories.builder()
488526
.addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));

x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,10 @@ protected String getTransformState(String id) throws IOException {
223223
return (String) getBasicTransformStats(id).get("state");
224224
}
225225

226+
protected String getTransformHealthStatus(String id) throws IOException {
227+
return (String) XContentMapValues.extractValue("health.status", getBasicTransformStats(id));
228+
}
229+
226230
@SuppressWarnings("unchecked")
227231
protected Map<String, Object> getTransform(String id) throws IOException {
228232
var request = new Request("GET", TRANSFORM_ENDPOINT + id);
@@ -390,7 +394,40 @@ protected void createReviewsIndex(
390394
) throws Exception {
391395
assert numUsers > 0;
392396

393-
// create mapping
397+
createReviewsIndexMappings(indexName, defaultPipeline);
398+
399+
// create index
400+
StringBuilder sourceBuilder = new StringBuilder();
401+
for (int i = 0; i < numDocs; i++) {
402+
Integer user = userIdProvider.apply(i);
403+
int stars = i % 5;
404+
long business = i % 50;
405+
String dateString = dateStringProvider.apply(i);
406+
407+
sourceBuilder.append(Strings.format("""
408+
{"create":{"_index":"%s"}}
409+
""", indexName));
410+
411+
sourceBuilder.append("{");
412+
if (user != null) {
413+
sourceBuilder.append("\"user_id\":\"").append("user_").append(user).append("\",");
414+
}
415+
sourceBuilder.append(Strings.format("""
416+
"count":%s,"business_id":"business_%s","stars":%s,"comment":"Great stuff, deserves %s stars","regular_object":\
417+
{"foo": 42},"nested_object":{"bar": 43},"timestamp":"%s"}
418+
""", i, business, stars, stars, dateString));
419+
420+
if (i % 100 == 0) {
421+
sourceBuilder.append("\r\n");
422+
doBulk(sourceBuilder.toString(), false);
423+
sourceBuilder.setLength(0);
424+
}
425+
}
426+
sourceBuilder.append("\r\n");
427+
doBulk(sourceBuilder.toString(), true);
428+
}
429+
430+
protected void createReviewsIndexMappings(String indexName, String defaultPipeline) throws IOException {
394431
try (XContentBuilder builder = jsonBuilder()) {
395432
builder.startObject();
396433
{
@@ -439,36 +476,6 @@ protected void createReviewsIndex(
439476
req.setOptions(RequestOptions.DEFAULT);
440477
assertOKAndConsume(adminClient().performRequest(req));
441478
}
442-
443-
// create index
444-
StringBuilder sourceBuilder = new StringBuilder();
445-
for (int i = 0; i < numDocs; i++) {
446-
Integer user = userIdProvider.apply(i);
447-
int stars = i % 5;
448-
long business = i % 50;
449-
String dateString = dateStringProvider.apply(i);
450-
451-
sourceBuilder.append(Strings.format("""
452-
{"create":{"_index":"%s"}}
453-
""", indexName));
454-
455-
sourceBuilder.append("{");
456-
if (user != null) {
457-
sourceBuilder.append("\"user_id\":\"").append("user_").append(user).append("\",");
458-
}
459-
sourceBuilder.append(Strings.format("""
460-
"count":%s,"business_id":"business_%s","stars":%s,"comment":"Great stuff, deserves %s stars","regular_object":\
461-
{"foo": 42},"nested_object":{"bar": 43},"timestamp":"%s"}
462-
""", i, business, stars, stars, dateString));
463-
464-
if (i % 100 == 0) {
465-
sourceBuilder.append("\r\n");
466-
doBulk(sourceBuilder.toString(), false);
467-
sourceBuilder.setLength(0);
468-
}
469-
}
470-
sourceBuilder.append("\r\n");
471-
doBulk(sourceBuilder.toString(), true);
472479
}
473480

474481
protected void doBulk(String bulkDocuments, boolean refresh) throws IOException {

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,7 @@ protected void onFinish(ActionListener<Void> listener) {
483483
if (context.shouldStopAtCheckpoint()) {
484484
stop();
485485
}
486+
context.resetReasonAndFailureCounter();
486487
listener.onResponse(null);
487488
return;
488489
}

0 commit comments

Comments
 (0)