Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/135653.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 135653
summary: Reset health status on successful empty checkpoint
area: Machine Learning
type: bug
issues:
- 135650
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ private void createContinuousTransform(String indexName, String transformId, Str
* Verify the basic stats API, which includes state, health, and optionally progress (if it exists).
* These are required for Kibana 8.13+.
*/
@SuppressWarnings("unchecked")
public void testBasicContinuousTransformStats() throws Exception {
var transformId = "transform-continuous-basic-stats";
createContinuousTransform("continuous-basic-stats-reviews", transformId, "reviews-by-user-business-day");
Expand All @@ -234,6 +233,50 @@ public void testBasicContinuousTransformStats() throws Exception {
deleteTransform(transformId);
}

public void testEmptySourceIndexClearsErrors() throws Exception {
var sourceIndexName = "source-empty-reviews";
var destIndexName = "destination-empty-reviews";
var transformId = "transform-empty-source-index";

createReviewsIndexMappings(sourceIndexName, null);

var config = createTransformConfigBuilder(transformId, destIndexName, QueryConfig.matchAll(), sourceIndexName).setPivotConfig(
createPivotConfig(groupByUserOnly(), aggregateScoresAndTimes())
)
.setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)))
.setSettings(new SettingsConfig.Builder().setUnattended(true).build())
.build();

putTransform(transformId, Strings.toString(config), RequestOptions.DEFAULT);
startTransform(config.getId(), RequestOptions.DEFAULT);

waitUntilCheckpoint(config.getId(), 1L);
assertEquals("green", getTransformHealthStatus(transformId));

// this will cause the transform to fail to search
assertAcknowledged(adminClient().performRequest(new Request("PUT", sourceIndexName + "/_block/read")));
assertBusy(() -> assertThat(getTransformHealthStatus(transformId), oneOf("yellow", "red")), 30, TimeUnit.SECONDS);

// unblock reads on the search index and the transform should recover
assertAcknowledged(adminClient().performRequest(new Request("DELETE", sourceIndexName + "/_block/read")));
assertBusy(() -> assertEquals("green", getTransformHealthStatus(transformId)), 30, TimeUnit.SECONDS);

stopTransform(transformId);
deleteTransform(transformId);
deleteIndex(sourceIndexName);
deleteIndex(destIndexName);
}

private Map<String, SingleGroupSource> groupByUserOnly() {
return Map.of("by-user", new TermsGroupSource("user_id", null, false));
}

private AggregatorFactories.Builder aggregateScoresAndTimes() {
return AggregatorFactories.builder()
.addAggregator(AggregationBuilders.avg("review_score").field("stars"))
.addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));
}

public void testDestinationIndexBlocked() throws Exception {
var transformId = "transform-continuous-blocked-destination";
var sourceIndexName = "source-reviews";
Expand Down Expand Up @@ -385,12 +428,8 @@ public void testContinuousTransformUpdate() throws Exception {
String indexName = "continuous-reviews-update";
createReviewsIndex(indexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);

Map<String, SingleGroupSource> groups = new HashMap<>();
groups.put("by-user", new TermsGroupSource("user_id", null, false));

AggregatorFactories.Builder aggs = AggregatorFactories.builder()
.addAggregator(AggregationBuilders.avg("review_score").field("stars"))
.addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));
var groups = groupByUserOnly();
var aggs = aggregateScoresAndTimes();

String id = "transform-to-update";
String dest = "reviews-by-user-business-day-to-update";
Expand Down Expand Up @@ -481,8 +520,7 @@ public void testRetentionPolicyDelete() throws Exception {
String dest = "retention-policy-dest";
createReviewsIndex(indexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);

Map<String, SingleGroupSource> groups = new HashMap<>();
groups.put("by-user", new TermsGroupSource("user_id", null, false));
var groups = groupByUserOnly();

AggregatorFactories.Builder aggs = AggregatorFactories.builder()
.addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ protected String getTransformState(String id) throws IOException {
return (String) getBasicTransformStats(id).get("state");
}

protected String getTransformHealthStatus(String id) throws IOException {
return (String) XContentMapValues.extractValue("health.status", getBasicTransformStats(id));
}

@SuppressWarnings("unchecked")
protected Map<String, Object> getTransform(String id) throws IOException {
var request = new Request("GET", TRANSFORM_ENDPOINT + id);
Expand Down Expand Up @@ -390,7 +394,40 @@ protected void createReviewsIndex(
) throws Exception {
assert numUsers > 0;

// create mapping
createReviewsIndexMappings(indexName, defaultPipeline);

// create index
StringBuilder sourceBuilder = new StringBuilder();
for (int i = 0; i < numDocs; i++) {
Integer user = userIdProvider.apply(i);
int stars = i % 5;
long business = i % 50;
String dateString = dateStringProvider.apply(i);

sourceBuilder.append(Strings.format("""
{"create":{"_index":"%s"}}
""", indexName));

sourceBuilder.append("{");
if (user != null) {
sourceBuilder.append("\"user_id\":\"").append("user_").append(user).append("\",");
}
sourceBuilder.append(Strings.format("""
"count":%s,"business_id":"business_%s","stars":%s,"comment":"Great stuff, deserves %s stars","regular_object":\
{"foo": 42},"nested_object":{"bar": 43},"timestamp":"%s"}
""", i, business, stars, stars, dateString));

if (i % 100 == 0) {
sourceBuilder.append("\r\n");
doBulk(sourceBuilder.toString(), false);
sourceBuilder.setLength(0);
}
}
sourceBuilder.append("\r\n");
doBulk(sourceBuilder.toString(), true);
}

protected void createReviewsIndexMappings(String indexName, String defaultPipeline) throws IOException {
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
Expand Down Expand Up @@ -439,36 +476,6 @@ protected void createReviewsIndex(
req.setOptions(RequestOptions.DEFAULT);
assertOKAndConsume(adminClient().performRequest(req));
}

// create index
StringBuilder sourceBuilder = new StringBuilder();
for (int i = 0; i < numDocs; i++) {
Integer user = userIdProvider.apply(i);
int stars = i % 5;
long business = i % 50;
String dateString = dateStringProvider.apply(i);

sourceBuilder.append(Strings.format("""
{"create":{"_index":"%s"}}
""", indexName));

sourceBuilder.append("{");
if (user != null) {
sourceBuilder.append("\"user_id\":\"").append("user_").append(user).append("\",");
}
sourceBuilder.append(Strings.format("""
"count":%s,"business_id":"business_%s","stars":%s,"comment":"Great stuff, deserves %s stars","regular_object":\
{"foo": 42},"nested_object":{"bar": 43},"timestamp":"%s"}
""", i, business, stars, stars, dateString));

if (i % 100 == 0) {
sourceBuilder.append("\r\n");
doBulk(sourceBuilder.toString(), false);
sourceBuilder.setLength(0);
}
}
sourceBuilder.append("\r\n");
doBulk(sourceBuilder.toString(), true);
}

protected void doBulk(String bulkDocuments, boolean refresh) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ protected void onFinish(ActionListener<Void> listener) {
if (context.shouldStopAtCheckpoint()) {
stop();
}
context.resetReasonAndFailureCounter();
listener.onResponse(null);
return;
}
Expand Down