diff --git a/docs/changelog/135653.yaml b/docs/changelog/135653.yaml new file mode 100644 index 0000000000000..922442f983a12 --- /dev/null +++ b/docs/changelog/135653.yaml @@ -0,0 +1,6 @@ +pr: 135653 +summary: Reset health status on successful empty checkpoint +area: Machine Learning +type: bug +issues: + - 135650 diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java index dc8dfe377a844..5e5e11f30be0f 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java @@ -217,7 +217,6 @@ private void createContinuousTransform(String indexName, String transformId, Str * Verify the basic stats API, which includes state, health, and optionally progress (if it exists). * These are required for Kibana 8.13+. */ - @SuppressWarnings("unchecked") public void testBasicContinuousTransformStats() throws Exception { var transformId = "transform-continuous-basic-stats"; createContinuousTransform("continuous-basic-stats-reviews", transformId, "reviews-by-user-business-day"); @@ -234,6 +233,54 @@ public void testBasicContinuousTransformStats() throws Exception { deleteTransform(transformId); } + public void testEmptySourceIndexClearsErrors() throws Exception { + var sourceIndexName = "source-empty-reviews"; + var destIndexName = "destination-empty-reviews"; + var transformId = "transform-empty-source-index"; + + createReviewsIndexMappings(sourceIndexName, null); + + var config = createTransformConfigBuilder(transformId, destIndexName, QueryConfig.matchAll(), sourceIndexName).setPivotConfig( + createPivotConfig(groupByUserOnly(), aggregateScoresAndTimes()) + ) + .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))) + .setSettings(new SettingsConfig.Builder().setUnattended(true).build()) + .build(); + + putTransform(transformId, Strings.toString(config), RequestOptions.DEFAULT); + startTransform(config.getId(), RequestOptions.DEFAULT); + + waitUntilCheckpoint(config.getId(), 1L); + assertEquals("green", getTransformHealthStatus(transformId)); + + // this will cause the transform to fail to search + assertAcknowledged(adminClient().performRequest(new Request("PUT", sourceIndexName + "/_block/read"))); + assertBusy(() -> assertThat(getTransformHealthStatus(transformId), oneOf("yellow", "red")), 30, TimeUnit.SECONDS); + + // unblock reads on the search index and the transform should recover + var request = new Request("PUT", sourceIndexName + "/_settings"); + request.setJsonEntity(""" + { "blocks.read": false } + """); + assertAcknowledged(adminClient().performRequest(request)); + assertBusy(() -> assertEquals("green", getTransformHealthStatus(transformId)), 30, TimeUnit.SECONDS); + + stopTransform(transformId); + deleteTransform(transformId); + deleteIndex(sourceIndexName); + deleteIndex(destIndexName); + } + + private Map groupByUserOnly() { + return Map.of("by-user", new TermsGroupSource("user_id", null, false)); + } + + private AggregatorFactories.Builder aggregateScoresAndTimes() { + return AggregatorFactories.builder() + .addAggregator(AggregationBuilders.avg("review_score").field("stars")) + .addAggregator(AggregationBuilders.max("timestamp").field("timestamp")); + } + public void testDestinationIndexBlocked() throws Exception { var transformId = "transform-continuous-blocked-destination"; var sourceIndexName = "source-reviews"; @@ -385,12 +432,8 @@ public void testContinuousTransformUpdate() throws Exception { String indexName = "continuous-reviews-update"; createReviewsIndex(indexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow); - Map groups = new HashMap<>(); - groups.put("by-user", new TermsGroupSource("user_id", null, false)); - - AggregatorFactories.Builder aggs = AggregatorFactories.builder() - .addAggregator(AggregationBuilders.avg("review_score").field("stars")) - .addAggregator(AggregationBuilders.max("timestamp").field("timestamp")); + var groups = groupByUserOnly(); + var aggs = aggregateScoresAndTimes(); String id = "transform-to-update"; String dest = "reviews-by-user-business-day-to-update"; @@ -481,8 +524,7 @@ public void testRetentionPolicyDelete() throws Exception { String dest = "retention-policy-dest"; createReviewsIndex(indexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow); - Map groups = new HashMap<>(); - groups.put("by-user", new TermsGroupSource("user_id", null, false)); + var groups = groupByUserOnly(); AggregatorFactories.Builder aggs = AggregatorFactories.builder() .addAggregator(AggregationBuilders.max("timestamp").field("timestamp")); diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index 3ee46e0ff087f..6c608380dafba 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -223,6 +223,10 @@ protected String getTransformState(String id) throws IOException { return (String) getBasicTransformStats(id).get("state"); } + protected String getTransformHealthStatus(String id) throws IOException { + return (String) XContentMapValues.extractValue("health.status", getBasicTransformStats(id)); + } + @SuppressWarnings("unchecked") protected Map getTransform(String id) throws IOException { var request = new Request("GET", TRANSFORM_ENDPOINT + id); @@ -390,7 +394,40 @@ protected void createReviewsIndex( ) throws Exception { assert numUsers > 0; - // create mapping + createReviewsIndexMappings(indexName, defaultPipeline); + + // create index + StringBuilder sourceBuilder = new StringBuilder(); + for (int i = 0; i < numDocs; i++) { + Integer user = userIdProvider.apply(i); + int stars = i % 5; + long business = i % 50; + String dateString = dateStringProvider.apply(i); + + sourceBuilder.append(Strings.format(""" + {"create":{"_index":"%s"}} + """, indexName)); + + sourceBuilder.append("{"); + if (user != null) { + sourceBuilder.append("\"user_id\":\"").append("user_").append(user).append("\","); + } + sourceBuilder.append(Strings.format(""" + "count":%s,"business_id":"business_%s","stars":%s,"comment":"Great stuff, deserves %s stars","regular_object":\ + {"foo": 42},"nested_object":{"bar": 43},"timestamp":"%s"} + """, i, business, stars, stars, dateString)); + + if (i % 100 == 0) { + sourceBuilder.append("\r\n"); + doBulk(sourceBuilder.toString(), false); + sourceBuilder.setLength(0); + } + } + sourceBuilder.append("\r\n"); + doBulk(sourceBuilder.toString(), true); + } + + protected void createReviewsIndexMappings(String indexName, String defaultPipeline) throws IOException { try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); { @@ -439,36 +476,6 @@ protected void createReviewsIndex( req.setOptions(RequestOptions.DEFAULT); assertOKAndConsume(adminClient().performRequest(req)); } - - // create index - StringBuilder sourceBuilder = new StringBuilder(); - for (int i = 0; i < numDocs; i++) { - Integer user = userIdProvider.apply(i); - int stars = i % 5; - long business = i % 50; - String dateString = dateStringProvider.apply(i); - - sourceBuilder.append(Strings.format(""" - {"create":{"_index":"%s"}} - """, indexName)); - - sourceBuilder.append("{"); - if (user != null) { - sourceBuilder.append("\"user_id\":\"").append("user_").append(user).append("\","); - } - sourceBuilder.append(Strings.format(""" - "count":%s,"business_id":"business_%s","stars":%s,"comment":"Great stuff, deserves %s stars","regular_object":\ - {"foo": 42},"nested_object":{"bar": 43},"timestamp":"%s"} - """, i, business, stars, stars, dateString)); - - if (i % 100 == 0) { - sourceBuilder.append("\r\n"); - doBulk(sourceBuilder.toString(), false); - sourceBuilder.setLength(0); - } - } - sourceBuilder.append("\r\n"); - doBulk(sourceBuilder.toString(), true); } protected void doBulk(String bulkDocuments, boolean refresh) throws IOException { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 4af18941fb20d..714f531ef8349 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -483,6 +483,7 @@ protected void onFinish(ActionListener listener) { if (context.shouldStopAtCheckpoint()) { stop(); } + context.resetReasonAndFailureCounter(); listener.onResponse(null); return; }