Skip to content

Commit 2fa8f7e

Browse files
Fix task creation being skipped bug due to timeouts (#996)
* Fix task creation being skipped bug due to timeouts * Add test cases * Fixed TCs
1 parent 11a7ca6 commit 2fa8f7e

File tree

2 files changed

+148
-2
lines changed

2 files changed

+148
-2
lines changed

datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package com.linkedin.datastream.server;
77

88
import java.io.IOException;
9+
import java.lang.reflect.Field;
910
import java.lang.reflect.Method;
1011
import java.time.Duration;
1112
import java.time.Instant;
@@ -4505,4 +4506,142 @@ public TestSetup(EmbeddedDatastreamCluster datastreamKafkaCluster, Coordinator c
45054506
_connector = connector;
45064507
}
45074508
}
4509+
4510+
/**
4511+
* Sets the private static final ASSIGNMENT_TIMEOUT field on Coordinator via reflection.
4512+
* Returns the original value so it can be restored.
4513+
*/
4514+
private Duration setAssignmentTimeout(Duration newTimeout) throws Exception {
4515+
Field field = Coordinator.class.getDeclaredField("ASSIGNMENT_TIMEOUT");
4516+
field.setAccessible(true);
4517+
4518+
// Remove the 'final' modifier so we can write to the field
4519+
Field modifiersField = Field.class.getDeclaredField("modifiers");
4520+
modifiersField.setAccessible(true);
4521+
modifiersField.setInt(field, field.getModifiers() & ~java.lang.reflect.Modifier.FINAL);
4522+
4523+
Duration original = (Duration) field.get(null);
4524+
field.set(null, newTimeout);
4525+
return original;
4526+
}
4527+
4528+
@Test
4529+
public void testHandleAssignmentChangeClearsStateOnTimeout() throws Exception {
4530+
String testCluster = "testHandleAssignmentChangeClearsStateOnTimeout";
4531+
String testConnectorType = "testConnectorType";
4532+
String datastreamName1 = "datastream1";
4533+
4534+
// Reduce ASSIGNMENT_TIMEOUT so the test doesn't wait 90 seconds
4535+
Duration originalTimeout = setAssignmentTimeout(Duration.ofSeconds(1));
4536+
4537+
try {
4538+
Coordinator instance1 = createCoordinator(_zkConnectionString, testCluster);
4539+
4540+
// A connector that blocks on onAssignmentChange long enough to trigger the timeout
4541+
CountDownLatch blockingLatch = new CountDownLatch(1);
4542+
TestHookConnector slowConnector = new TestHookConnector("slowConnector", testConnectorType) {
4543+
@Override
4544+
public void onAssignmentChange(List<DatastreamTask> tasks) {
4545+
try {
4546+
// Block for longer than ASSIGNMENT_TIMEOUT (1 second)
4547+
blockingLatch.await(5, TimeUnit.SECONDS);
4548+
} catch (InterruptedException e) {
4549+
Thread.currentThread().interrupt();
4550+
}
4551+
super.onAssignmentChange(tasks);
4552+
}
4553+
};
4554+
4555+
instance1.addConnector(testConnectorType, slowConnector, new BroadcastStrategy(Optional.empty()), false,
4556+
new SourceBasedDeduper(), null);
4557+
instance1.start();
4558+
4559+
ZkClient zkClient = new ZkClient(_zkConnectionString);
4560+
DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, testConnectorType, datastreamName1);
4561+
4562+
// Wait for the timeout to occur and state to be cleared
4563+
Assert.assertTrue(PollUtils.poll(() -> instance1.getDatastreamTasks().isEmpty(), 200, 10000),
4564+
"Expected _assignedDatastreamTasks to be cleared after assignment timeout");
4565+
4566+
// Unblock the connector so it can process retried assignments
4567+
blockingLatch.countDown();
4568+
4569+
// After unblocking, the retried assignment should succeed
4570+
assertConnectorAssignment(slowConnector, 10000, datastreamName1);
4571+
4572+
// Verify the coordinator has the task tracked after successful retry
4573+
Assert.assertFalse(instance1.getDatastreamTasks().isEmpty(),
4574+
"Expected _assignedDatastreamTasks to be populated after successful retry");
4575+
4576+
instance1.stop();
4577+
instance1.getDatastreamCache().getZkclient().close();
4578+
zkClient.close();
4579+
} finally {
4580+
setAssignmentTimeout(originalTimeout);
4581+
}
4582+
}
4583+
4584+
/**
4585+
* Verifies that after a timeout clears _assignedDatastreamTasks, a re-assignment
4586+
* does NOT skip the task (i.e., the task is not incorrectly treated as "already running").
4587+
* This is the core bug that the clear() fix addresses.
4588+
*/
4589+
@Test
4590+
public void testHandleAssignmentChangeTimeoutDoesNotSkipTaskOnReassignment() throws Exception {
4591+
String testCluster = "testHandleAssignmentChangeTimeoutDoesNotSkipTaskOnReassignment";
4592+
String testConnectorType = "testConnectorType";
4593+
String datastreamName1 = "datastream1";
4594+
4595+
// Reduce ASSIGNMENT_TIMEOUT so the test doesn't wait 90 seconds
4596+
Duration originalTimeout = setAssignmentTimeout(Duration.ofSeconds(1));
4597+
4598+
try {
4599+
java.util.concurrent.atomic.AtomicInteger assignmentChangeCount = new java.util.concurrent.atomic.AtomicInteger(0);
4600+
// Block only the first onAssignmentChange; let subsequent ones through immediately
4601+
CountDownLatch blockFirstAssignment = new CountDownLatch(1);
4602+
4603+
Coordinator instance1 = createCoordinator(_zkConnectionString, testCluster);
4604+
4605+
TestHookConnector connector = new TestHookConnector("connector1", testConnectorType) {
4606+
@Override
4607+
public void onAssignmentChange(List<DatastreamTask> tasks) {
4608+
int count = assignmentChangeCount.incrementAndGet();
4609+
if (count == 1) {
4610+
// Block the first assignment to trigger timeout. Use a long wait so the
4611+
// coordinator's 1-second timeout fires while we're still blocked.
4612+
try {
4613+
blockFirstAssignment.await(30, TimeUnit.SECONDS);
4614+
} catch (InterruptedException e) {
4615+
Thread.currentThread().interrupt();
4616+
}
4617+
}
4618+
super.onAssignmentChange(tasks);
4619+
}
4620+
};
4621+
4622+
instance1.addConnector(testConnectorType, connector, new BroadcastStrategy(Optional.empty()), false,
4623+
new SourceBasedDeduper(), null);
4624+
instance1.start();
4625+
4626+
ZkClient zkClient = new ZkClient(_zkConnectionString);
4627+
DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, testConnectorType, datastreamName1);
4628+
4629+
// Unblock the first (timed-out) assignment after giving coordinator time to timeout and retry
4630+
Thread.sleep(3000);
4631+
blockFirstAssignment.countDown();
4632+
4633+
// The retry should successfully assign the task even though the first attempt timed out
4634+
assertConnectorAssignment(connector, 15000, datastreamName1);
4635+
4636+
// Verify the coordinator has the task tracked properly after successful retry
4637+
Assert.assertFalse(instance1.getDatastreamTasks().isEmpty(),
4638+
"Expected _assignedDatastreamTasks to be populated after successful retry");
4639+
4640+
instance1.stop();
4641+
instance1.getDatastreamCache().getZkclient().close();
4642+
zkClient.close();
4643+
} finally {
4644+
setAssignmentTimeout(originalTimeout);
4645+
}
4646+
}
45084647
}

datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -865,8 +865,15 @@ private void handleAssignmentChange(boolean isDatastreamUpdate) throws TimeoutEx
865865
try {
866866
getAssignmentsFuture(assignmentChangeFutures, start, isDatastreamUpdate);
867867
} catch (TimeoutException e) {
868-
// if it's timeout then we will retry
869-
_log.warn("Timeout when doing the assignment", e);
868+
// Clear the current assignment state to force full reconciliation on the next assignment change.
869+
// Without this, _assignedDatastreamTasks retains stale state from the previous successful assignment,
870+
// causing the next handleAssignmentChange() to compute an incorrect diff. For example, a task that was
871+
// unassigned and stopped by the connector but not removed from _assignedDatastreamTasks (due to this
872+
// timeout) would be incorrectly treated as "already running" if re-assigned, resulting in no
873+
// ConnectorTask being created. Clearing follows the same pattern as onSessionExpired().
874+
_log.warn("Timeout when doing the assignment. Clearing current assignment state to force full "
875+
+ "reconciliation on next assignment change.", e);
876+
_assignedDatastreamTasks.clear();
870877
retryHandleAssignmentChange(isDatastreamUpdate);
871878
return;
872879
} catch (InterruptedException e) {

0 commit comments

Comments
 (0)