Skip to content

Commit 244507f

Browse files
authored
[Transform] Allow dynamic updates to frequency (#136757) (#136879)
Updates to a continuous transform's frequency should go into effect for the next transform's checkpoint without needed to stop and restart the transform. Fix #133321
1 parent b6c32dc commit 244507f

File tree

8 files changed

+155
-23
lines changed

8 files changed

+155
-23
lines changed

docs/changelog/136757.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 136757
2+
summary: Allow dynamic updates to frequency
3+
area: Transform
4+
type: bug
5+
issues:
6+
- 133321

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,10 @@ public boolean changesDestIndex(TransformConfig config) {
244244
return isNullOrEqual(updatedIndex, config.getDestination().getIndex()) == false;
245245
}
246246

247+
public boolean changesFrequency(TransformConfig config) {
248+
return isNullOrEqual(frequency, config.getFrequency()) == false;
249+
}
250+
247251
private static boolean isNullOrEqual(Object lft, Object rgt) {
248252
return lft == null || lft.equals(rgt);
249253
}

x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,11 @@ protected void createReviewsIndexNano() throws IOException {
251251
protected void createContinuousPivotReviewsTransform(String transformId, String transformIndex, String authHeader) throws IOException {
252252

253253
// Set frequency high for testing
254+
createContinuousPivotReviewsTransform(transformId, transformIndex, authHeader, "1s");
255+
}
256+
257+
protected void createContinuousPivotReviewsTransform(String transformId, String transformIndex, String authHeader, String frequency)
258+
throws IOException {
254259
String config = Strings.format("""
255260
{
256261
"dest": {
@@ -265,7 +270,7 @@ protected void createContinuousPivotReviewsTransform(String transformId, String
265270
"delay": "15m"
266271
}
267272
},
268-
"frequency": "1s",
273+
"frequency": "%s",
269274
"pivot": {
270275
"group_by": {
271276
"reviewer": {
@@ -282,7 +287,7 @@ protected void createContinuousPivotReviewsTransform(String transformId, String
282287
}
283288
}
284289
}
285-
}""", transformIndex, REVIEWS_INDEX_NAME);
290+
}""", transformIndex, REVIEWS_INDEX_NAME, frequency);
286291

287292
createReviewsTransform(transformId, authHeader, null, config);
288293
}

x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUpdateIT.java

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import static org.hamcrest.Matchers.both;
3333
import static org.hamcrest.Matchers.containsString;
3434
import static org.hamcrest.Matchers.equalTo;
35+
import static org.hamcrest.Matchers.greaterThan;
3536
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
3637
import static org.hamcrest.Matchers.instanceOf;
3738
import static org.hamcrest.Matchers.is;
@@ -109,21 +110,54 @@ public void testUpdateTransferRightsSecondaryAuthHeaders() throws Exception {
109110
}
110111

111112
public void testUpdateThatChangesSettingsButNotHeaders() throws Exception {
112-
String transformId = "test_update_that_changes_settings";
113-
String destIndex = transformId + "-dest";
113+
var transformId = "test_update_that_changes_settings";
114+
var destIndex = transformId + "-dest";
114115

115116
// Create the transform
116117
createPivotReviewsTransform(transformId, destIndex, null, null, null);
117118

118-
Request updateTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_update", null);
119-
updateTransformRequest.setJsonEntity("""
119+
// Update the transform's settings
120+
var updatedConfig = updateTransform(transformId, """
120121
{ "settings": { "max_page_search_size": 123 } }""");
121122

123+
// Verify that the settings got updated
124+
assertThat(updatedConfig.get("settings"), is(equalTo(Map.of("max_page_search_size", 123))));
125+
}
126+
127+
private Map<String, Object> updateTransform(String transformId, String jsonPayload) throws Exception {
128+
var updateTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_update", null);
129+
updateTransformRequest.setJsonEntity(jsonPayload);
130+
return entityAsMap(client().performRequest(updateTransformRequest));
131+
}
132+
133+
public void testUpdateFrequency() throws Exception {
134+
var transformId = "test_update_frequency";
135+
var destIndex = transformId + "-dest";
136+
137+
// Create the transform
138+
createContinuousPivotReviewsTransform(transformId, destIndex, null, "1h");
139+
startTransform(transformId);
140+
141+
// wait until it finishes the first checkpoint and check that it hasn't triggered again
142+
assertBusy(() -> {
143+
var statsAndState = getTransformStateAndStats(transformId);
144+
assertThat(XContentMapValues.extractValue("checkpointing.last.checkpoint", statsAndState), equalTo(1));
145+
assertThat(XContentMapValues.extractValue("stats.trigger_count", statsAndState), equalTo(1));
146+
}, 10, TimeUnit.SECONDS);
147+
122148
// Update the transform's settings
123-
Map<String, Object> updatedConfig = entityAsMap(client().performRequest(updateTransformRequest));
149+
var updatedConfig = updateTransform(transformId, """
150+
{ "frequency": "1s" }""");
124151

125152
// Verify that the settings got updated
126-
assertThat(updatedConfig.get("settings"), is(equalTo(Map.of("max_page_search_size", 123))));
153+
assertThat(updatedConfig.get("frequency"), is(equalTo("1s")));
154+
assertBusy(() -> {
155+
var triggerCount = (Integer) XContentMapValues.extractValue("stats.trigger_count", getTransformStateAndStats(transformId));
156+
assertThat(triggerCount, is(greaterThan(1)));
157+
}, 10, TimeUnit.SECONDS);
158+
159+
stopTransform(transformId, true);
160+
deleteTransform(transformId);
127161
}
128162

129163
public void testConcurrentUpdates() throws Exception {

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,8 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
172172
boolean updateChangesSettings = update.changesSettings(originalConfig);
173173
boolean updateChangesHeaders = update.changesHeaders(originalConfig);
174174
boolean updateChangesDestIndex = update.changesDestIndex(originalConfig);
175-
if (updateChangesSettings || updateChangesHeaders || updateChangesDestIndex) {
175+
boolean updateFrequency = update.changesFrequency(originalConfig);
176+
if (updateChangesSettings || updateChangesHeaders || updateChangesDestIndex || updateFrequency) {
176177
PersistentTasksCustomMetadata.PersistentTask<?> transformTask = TransformTask.getTransformTask(
177178
request.getId(),
178179
clusterState
@@ -258,6 +259,7 @@ protected void taskOperation(
258259
transformTask.applyNewSettings(request.getConfig().getSettings());
259260
transformTask.applyNewAuthState(request.getAuthState());
260261
transformTask.checkAndResetDestinationIndexBlock(request.getConfig());
262+
transformTask.applyNewFrequency(request.getConfig());
261263
listener.onResponse(new Response(request.getConfig()));
262264
}
263265

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,13 @@ public void checkAndResetDestinationIndexBlock(TransformConfig config) {
421421
}
422422
}
423423

424+
public void applyNewFrequency(TransformConfig config) {
425+
var frequency = config != null ? config.getFrequency() : null;
426+
if (frequency != null) {
427+
transformScheduler.updateFrequency(config.getId(), frequency);
428+
}
429+
}
430+
424431
@Override
425432
protected void init(
426433
PersistentTasksService persistentTasksService,

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
import java.util.Optional;
2626
import java.util.concurrent.atomic.AtomicBoolean;
2727

28-
import static org.elasticsearch.core.Strings.format;
29-
3028
/**
3129
* {@link TransformScheduler} class is responsible for scheduling transform tasks according to their configured frequency as well as
3230
* retrying policy.
@@ -123,9 +121,9 @@ void processScheduledTasks() {
123121
Instant processingFinished = clock.instant();
124122
long tookMs = Duration.between(processingStarted, processingFinished).toMillis();
125123
if (taskWasProcessed) {
126-
logger.trace(format("Processing one scheduled task finished, took %dms", tookMs));
124+
logger.trace("Processing one scheduled task finished, took [{}] ms", tookMs);
127125
} else {
128-
logger.trace(format("Looking for scheduled tasks to process finished, took %dms", tookMs));
126+
logger.trace("Looking for scheduled tasks to process finished, took [{}] ms", tookMs);
129127
}
130128
}
131129
if (taskWasProcessed == false) {
@@ -161,12 +159,10 @@ private boolean processScheduledTasksInternal() {
161159
scheduledTasks.update(scheduledTask.getTransformId(), task -> {
162160
if (task.equals(scheduledTask) == false) {
163161
logger.debug(
164-
() -> format(
165-
"[%s] task object got modified while processing. Expected: %s, was: %s",
166-
scheduledTask.getTransformId(),
167-
scheduledTask,
168-
task
169-
)
162+
"[{}] task object got modified while processing. Expected: {}, was: {}",
163+
scheduledTask.getTransformId(),
164+
scheduledTask,
165+
task
170166
);
171167
}
172168
return new TransformScheduledTask(
@@ -200,7 +196,7 @@ public void stop() {
200196
*/
201197
public void registerTransform(TransformTaskParams transformTaskParams, Listener listener) {
202198
String transformId = transformTaskParams.getId();
203-
logger.trace(() -> format("[%s] register the transform", transformId));
199+
logger.trace("[{}] register the transform", transformId);
204200
long currentTimeMillis = clock.millis();
205201
TransformScheduledTask transformScheduledTask = new TransformScheduledTask(
206202
transformId,
@@ -223,7 +219,7 @@ public void registerTransform(TransformTaskParams transformTaskParams, Listener
223219
* @param failureCount new value of transform task's failure count
224220
*/
225221
public void handleTransformFailureCountChanged(String transformId, int failureCount) {
226-
logger.trace(() -> format("[%s] handle transform failure count change to %d", transformId, failureCount));
222+
logger.trace("[{}] handle transform failure count change to [{}]", transformId, failureCount);
227223
// Update the task's failure count (next_scheduled_time gets automatically re-calculated)
228224
scheduledTasks.update(
229225
transformId,
@@ -237,14 +233,30 @@ public void handleTransformFailureCountChanged(String transformId, int failureCo
237233
);
238234
}
239235

236+
public void updateFrequency(String transformId, TimeValue frequency) {
237+
logger.trace("[{}] handle transform frequency change to [{}]", transformId, frequency);
238+
scheduledTasks.update(transformId, task -> {
239+
if (task.getFrequency().equals(frequency)) {
240+
return task;
241+
}
242+
return new TransformScheduledTask(
243+
task.getTransformId(),
244+
getFrequency(frequency),
245+
task.getLastTriggeredTimeMillis(),
246+
task.getFailureCount(),
247+
task.getListener()
248+
);
249+
});
250+
}
251+
240252
/**
241253
* Updates the transform task's next_scheduled_time so that it is set to now.
242254
* Doing so may result in the task being processed earlier that it would normally (i.e.: according to its frequency) be.
243255
*
244256
* @param transformId id of the transform to schedule now
245257
*/
246258
public void scheduleNow(String transformId) {
247-
logger.trace(() -> format("[%s] schedule_now transform", transformId));
259+
logger.trace("[{}] schedule_now transform", transformId);
248260
long currentTimeMillis = clock.millis();
249261
// Update the task's next_scheduled_time
250262
scheduledTasks.update(
@@ -268,7 +280,7 @@ public void scheduleNow(String transformId) {
268280
*/
269281
public void deregisterTransform(String transformId) {
270282
Objects.requireNonNull(transformId);
271-
logger.trace(() -> format("[%s] de-register the transform", transformId));
283+
logger.trace("[{}] de-register the transform", transformId);
272284
scheduledTasks.remove(transformId);
273285
}
274286

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,68 @@ public void testScheduleNow() {
240240
transformScheduler.stop();
241241
}
242242

243+
public void testUpdateFrequencyWithChanges() throws Exception {
244+
var transformId = "test-update-frequency-with-changes";
245+
var startingFrequency = TimeValue.timeValueHours(1);
246+
var updatingFrequency = TimeValue.timeValueSeconds(1);
247+
248+
var transformTaskParams = new TransformTaskParams(transformId, TransformConfigVersion.CURRENT, startingFrequency, false);
249+
var clock = new FakeClock(Instant.ofEpochMilli(0));
250+
var events = new CopyOnWriteArrayList<TransformScheduler.Event>();
251+
TransformScheduler.Listener listener = events::add;
252+
253+
var transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, TimeValue.ZERO);
254+
transformScheduler.registerTransform(transformTaskParams, listener);
255+
assertThat(events, hasSize(1));
256+
257+
// Update the frequency
258+
transformScheduler.updateFrequency(transformId, updatingFrequency);
259+
clock.advanceTimeBy(Duration.ofMinutes(2));
260+
transformScheduler.processScheduledTasks();
261+
262+
assertBusy(() -> assertThat(events, hasSize(greaterThanOrEqualTo(2))), 10, TimeUnit.SECONDS);
263+
264+
var actualTask = transformScheduler.getTransformScheduledTasks()
265+
.stream()
266+
.filter(task -> task.getTransformId().equals(transformId))
267+
.findAny()
268+
.orElseThrow();
269+
assertThat(actualTask.getFrequency(), equalTo(updatingFrequency));
270+
271+
transformScheduler.deregisterTransform(transformId);
272+
transformScheduler.stop();
273+
}
274+
275+
public void testUpdateFrequencyWithNoChanges() throws Exception {
276+
var transformId = "test-update-frequency-with-changes";
277+
var startingFrequency = TimeValue.timeValueHours(1);
278+
279+
var transformTaskParams = new TransformTaskParams(transformId, TransformConfigVersion.CURRENT, startingFrequency, false);
280+
var clock = new FakeClock(Instant.ofEpochMilli(0));
281+
var events = new CopyOnWriteArrayList<TransformScheduler.Event>();
282+
TransformScheduler.Listener listener = events::add;
283+
284+
var transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, TimeValue.ZERO);
285+
transformScheduler.registerTransform(transformTaskParams, listener);
286+
assertThat(events, hasSize(1));
287+
288+
// Update the frequency
289+
transformScheduler.updateFrequency(transformId, startingFrequency);
290+
clock.advanceTimeBy(Duration.ofMinutes(2));
291+
transformScheduler.processScheduledTasks();
292+
assertThat(events, hasSize(1));
293+
294+
var actualTask = transformScheduler.getTransformScheduledTasks()
295+
.stream()
296+
.filter(task -> task.getTransformId().equals(transformId))
297+
.findAny()
298+
.orElseThrow();
299+
assertThat(actualTask.getFrequency(), equalTo(startingFrequency));
300+
301+
transformScheduler.deregisterTransform(transformId);
302+
transformScheduler.stop();
303+
}
304+
243305
public void testConcurrentProcessing() throws Exception {
244306
String transformId = "test-with-fake-clock-concurrent";
245307
int frequencySeconds = 5;

0 commit comments

Comments
 (0)