Skip to content

Commit 59570e1

Browse files
authored
Do failover only if target items were running (#2134)
* Fix failover triggered too sensitive * Handle legacy crashed running items * Persist job instance id into running node * Do failover in legacy crashed listener * Complete FailoverListenerManagerTest * Complete ShardingServiceTest * Complete ExecutionServiceTest * Handle failovering items at first * Complete FailoverListenerManagerTest * Complete FailoverServiceTest * Complete ExecutionServiceTest * Complete FailoverListenerManagerTest
1 parent 8b64a8c commit 59570e1

File tree

8 files changed

+229
-10
lines changed

8 files changed

+229
-10
lines changed

elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManager.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,28 @@
1717

1818
package org.apache.shardingsphere.elasticjob.lite.internal.failover;
1919

20+
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
2021
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
2122
import org.apache.shardingsphere.elasticjob.infra.yaml.YamlEngine;
2223
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationNode;
2324
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
2425
import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceNode;
26+
import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceService;
2527
import org.apache.shardingsphere.elasticjob.lite.internal.listener.AbstractListenerManager;
2628
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
29+
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionService;
2730
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
2831
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
2932
import org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEvent;
3033
import org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEvent.Type;
3134
import org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEventListener;
3235

36+
import java.util.Collections;
37+
import java.util.HashSet;
3338
import java.util.List;
39+
import java.util.Map;
40+
import java.util.Map.Entry;
41+
import java.util.Set;
3442

3543
/**
3644
* Failover listener manager.
@@ -45,6 +53,10 @@ public final class FailoverListenerManager extends AbstractListenerManager {
4553

4654
private final FailoverService failoverService;
4755

56+
private final ExecutionService executionService;
57+
58+
private final InstanceService instanceService;
59+
4860
private final ConfigurationNode configNode;
4961

5062
private final InstanceNode instanceNode;
@@ -55,6 +67,8 @@ public FailoverListenerManager(final CoordinatorRegistryCenter regCenter, final
5567
configService = new ConfigurationService(regCenter, jobName);
5668
shardingService = new ShardingService(regCenter, jobName);
5769
failoverService = new FailoverService(regCenter, jobName);
70+
executionService = new ExecutionService(regCenter, jobName);
71+
instanceService = new InstanceService(regCenter, jobName);
5872
configNode = new ConfigurationNode(jobName);
5973
instanceNode = new InstanceNode(jobName);
6074
}
@@ -63,6 +77,7 @@ public FailoverListenerManager(final CoordinatorRegistryCenter regCenter, final
6377
public void start() {
6478
addDataListener(new JobCrashedJobListener());
6579
addDataListener(new FailoverSettingsChangedJobListener());
80+
addDataListener(new LegacyCrashedRunningItemListener());
6681
}
6782

6883
private boolean isFailoverEnabled() {
@@ -103,4 +118,47 @@ public void onChange(final DataChangedEvent event) {
103118
}
104119
}
105120
}
121+
122+
class LegacyCrashedRunningItemListener implements DataChangedEventListener {
123+
124+
@Override
125+
public void onChange(final DataChangedEvent event) {
126+
if (!isCurrentInstanceOnline(event) || !isFailoverEnabled()) {
127+
return;
128+
}
129+
Set<JobInstance> availableJobInstances = new HashSet<>(instanceService.getAvailableJobInstances());
130+
if (!isTheOnlyInstance(availableJobInstances)) {
131+
return;
132+
}
133+
Map<Integer, JobInstance> allRunningItems = executionService.getAllRunningItems();
134+
Map<Integer, JobInstance> allFailoveringItems = failoverService.getAllFailoveringItems();
135+
if (allRunningItems.isEmpty() && allFailoveringItems.isEmpty()) {
136+
return;
137+
}
138+
for (Entry<Integer, JobInstance> entry : allFailoveringItems.entrySet()) {
139+
if (!availableJobInstances.contains(entry.getValue())) {
140+
int item = entry.getKey();
141+
failoverService.setCrashedFailoverFlagDirectly(item);
142+
failoverService.clearFailoveringItem(item);
143+
executionService.clearRunningInfo(Collections.singletonList(item));
144+
allRunningItems.remove(item);
145+
}
146+
}
147+
for (Entry<Integer, JobInstance> entry : allRunningItems.entrySet()) {
148+
if (!availableJobInstances.contains(entry.getValue())) {
149+
failoverService.setCrashedFailoverFlag(entry.getKey());
150+
executionService.clearRunningInfo(Collections.singletonList(entry.getKey()));
151+
}
152+
}
153+
failoverService.failoverIfNecessary();
154+
}
155+
156+
private boolean isCurrentInstanceOnline(final DataChangedEvent event) {
157+
return Type.ADDED == event.getType() && event.getKey().endsWith(instanceNode.getLocalInstancePath());
158+
}
159+
160+
private boolean isTheOnlyInstance(final Set<JobInstance> availableJobInstances) {
161+
return Collections.singleton(JobRegistry.getInstance().getJobInstance(jobName)).equals(availableJobInstances);
162+
}
163+
}
106164
}

elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverService.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,24 @@
1717

1818
package org.apache.shardingsphere.elasticjob.lite.internal.failover;
1919

20+
import com.google.common.base.Strings;
2021
import lombok.extern.slf4j.Slf4j;
22+
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
23+
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
2124
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
2225
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleController;
2326
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingNode;
2427
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
2528
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
26-
import org.apache.shardingsphere.elasticjob.reg.base.LeaderExecutionCallback;
2729
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
30+
import org.apache.shardingsphere.elasticjob.reg.base.LeaderExecutionCallback;
2831

2932
import java.util.ArrayList;
3033
import java.util.Collection;
3134
import java.util.Collections;
35+
import java.util.LinkedHashMap;
3236
import java.util.List;
37+
import java.util.Map;
3338

3439
/**
3540
* Failover service.
@@ -43,10 +48,13 @@ public final class FailoverService {
4348

4449
private final ShardingService shardingService;
4550

51+
private final ConfigurationService configService;
52+
4653
public FailoverService(final CoordinatorRegistryCenter regCenter, final String jobName) {
4754
this.jobName = jobName;
4855
jobNodeStorage = new JobNodeStorage(regCenter, jobName);
4956
shardingService = new ShardingService(regCenter, jobName);
57+
configService = new ConfigurationService(regCenter, jobName);
5058
}
5159

5260
/**
@@ -57,6 +65,7 @@ public FailoverService(final CoordinatorRegistryCenter regCenter, final String j
5765
public void setCrashedFailoverFlag(final int item) {
5866
if (!isFailoverAssigned(item)) {
5967
jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item));
68+
jobNodeStorage.removeJobNodeIfExisted(ShardingNode.getRunningNode(item));
6069
}
6170
}
6271

@@ -167,6 +176,32 @@ public List<Integer> getLocalTakeOffItems() {
167176
return result;
168177
}
169178

179+
/**
180+
* Get all failovering items.
181+
*
182+
* @return all failovering items
183+
*/
184+
public Map<Integer, JobInstance> getAllFailoveringItems() {
185+
int shardingTotalCount = configService.load(true).getShardingTotalCount();
186+
Map<Integer, JobInstance> result = new LinkedHashMap<>(shardingTotalCount, 1);
187+
for (int i = 0; i < shardingTotalCount; i++) {
188+
String data = jobNodeStorage.getJobNodeData(FailoverNode.getExecutingFailoverNode(i));
189+
if (!Strings.isNullOrEmpty(data)) {
190+
result.put(i, new JobInstance(data));
191+
}
192+
}
193+
return result;
194+
}
195+
196+
/**
197+
* Clear failovering item.
198+
*
199+
* @param item item
200+
*/
201+
public void clearFailoveringItem(final int item) {
202+
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getExecutingFailoverNode(item));
203+
}
204+
170205
/**
171206
* Remove failover info.
172207
*/

elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ExecutionService.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.shardingsphere.elasticjob.lite.internal.sharding;
1919

20+
import com.google.common.base.Strings;
2021
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
22+
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
2123
import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
2224
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
2325
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
@@ -26,7 +28,9 @@
2628

2729
import java.util.ArrayList;
2830
import java.util.Collection;
31+
import java.util.LinkedHashMap;
2932
import java.util.List;
33+
import java.util.Map;
3034

3135
/**
3236
* Execution service.
@@ -52,11 +56,17 @@ public ExecutionService(final CoordinatorRegistryCenter regCenter, final String
5256
*/
5357
public void registerJobBegin(final ShardingContexts shardingContexts) {
5458
JobRegistry.getInstance().setJobRunning(jobName, true);
55-
if (!configService.load(true).isMonitorExecution()) {
59+
JobConfiguration jobConfiguration = configService.load(true);
60+
if (!jobConfiguration.isMonitorExecution()) {
5661
return;
5762
}
63+
String jobInstanceId = JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId();
5864
for (int each : shardingContexts.getShardingItemParameters().keySet()) {
59-
jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), "");
65+
if (jobConfiguration.isFailover()) {
66+
jobNodeStorage.fillJobNode(ShardingNode.getRunningNode(each), jobInstanceId);
67+
} else {
68+
jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), jobInstanceId);
69+
}
6070
}
6171
}
6272

@@ -130,6 +140,23 @@ private List<Integer> getAllItems() {
130140
return result;
131141
}
132142

143+
/**
144+
* Get all running items with instance.
145+
*
146+
* @return running items with instance.
147+
*/
148+
public Map<Integer, JobInstance> getAllRunningItems() {
149+
int shardingTotalCount = configService.load(true).getShardingTotalCount();
150+
Map<Integer, JobInstance> result = new LinkedHashMap<>(shardingTotalCount, 1);
151+
for (int i = 0; i < shardingTotalCount; i++) {
152+
String data = jobNodeStorage.getJobNodeData(ShardingNode.getRunningNode(i));
153+
if (!Strings.isNullOrEmpty(data)) {
154+
result.put(i, new JobInstance(data));
155+
}
156+
}
157+
return result;
158+
}
159+
133160
/**
134161
* Set misfire flag if sharding items still running.
135162
*

elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/sharding/ShardingService.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,13 +202,17 @@ public List<Integer> getCrashedShardingItems(final String jobInstanceId) {
202202
List<Integer> result = new LinkedList<>();
203203
int shardingTotalCount = configService.load(true).getShardingTotalCount();
204204
for (int i = 0; i < shardingTotalCount; i++) {
205-
if (jobInstanceId.equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) {
205+
if (isRunningItem(i) && jobInstanceId.equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) {
206206
result.add(i);
207207
}
208208
}
209209
return result;
210210
}
211211

212+
private boolean isRunningItem(final int item) {
213+
return jobNodeStorage.isJobNodeExisted(ShardingNode.getRunningNode(item));
214+
}
215+
212216
/**
213217
* Get sharding items from localhost job server.
214218
*

elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/failover/FailoverListenerManagerTest.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@
2121
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
2222
import org.apache.shardingsphere.elasticjob.lite.fixture.LiteYamlConstants;
2323
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
24+
import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceNode;
25+
import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceService;
2426
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
2527
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleController;
28+
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionService;
2629
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService;
2730
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
2831
import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
@@ -38,6 +41,8 @@
3841

3942
import java.util.Arrays;
4043
import java.util.Collections;
44+
import java.util.LinkedHashMap;
45+
import java.util.Map;
4146

4247
import static org.mockito.Mockito.times;
4348
import static org.mockito.Mockito.verify;
@@ -61,6 +66,15 @@ public final class FailoverListenerManagerTest {
6166
@Mock
6267
private FailoverService failoverService;
6368

69+
@Mock
70+
private InstanceService instanceService;
71+
72+
@Mock
73+
private ExecutionService executionService;
74+
75+
@Mock
76+
private InstanceNode instanceNode;
77+
6478
private final FailoverListenerManager failoverListenerManager = new FailoverListenerManager(null, "test_job");
6579

6680
@Before
@@ -69,12 +83,15 @@ public void setUp() {
6983
ReflectionUtils.setFieldValue(failoverListenerManager, "configService", configService);
7084
ReflectionUtils.setFieldValue(failoverListenerManager, "shardingService", shardingService);
7185
ReflectionUtils.setFieldValue(failoverListenerManager, "failoverService", failoverService);
86+
ReflectionUtils.setFieldValue(failoverListenerManager, "instanceService", instanceService);
87+
ReflectionUtils.setFieldValue(failoverListenerManager, "executionService", executionService);
88+
ReflectionUtils.setFieldValue(failoverListenerManager, "instanceNode", instanceNode);
7289
}
7390

7491
@Test
7592
public void assertStart() {
7693
failoverListenerManager.start();
77-
verify(jobNodeStorage, times(2)).addDataListener(ArgumentMatchers.any(DataChangedEventListener.class));
94+
verify(jobNodeStorage, times(3)).addDataListener(ArgumentMatchers.any(DataChangedEventListener.class));
7895
}
7996

8097
@Test
@@ -119,6 +136,8 @@ public void assertJobCrashedJobListenerWhenIsOtherInstanceCrashed() {
119136
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
120137
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
121138
when(shardingService.getCrashedShardingItems("127.0.0.1@-@1")).thenReturn(Arrays.asList(0, 2));
139+
when(instanceNode.isInstancePath("/test_job/instances/127.0.0.1@-@1")).thenReturn(true);
140+
when(instanceNode.getInstanceFullPath()).thenReturn("/test_job/instances");
122141
failoverListenerManager.new JobCrashedJobListener().onChange(new DataChangedEvent(Type.DELETED, "/test_job/instances/127.0.0.1@-@1", ""));
123142
verify(failoverService).setCrashedFailoverFlag(0);
124143
verify(failoverService).setCrashedFailoverFlag(2);
@@ -132,6 +151,8 @@ public void assertJobCrashedJobListenerWhenIsOtherFailoverInstanceCrashed() {
132151
JobRegistry.getInstance().registerJob("test_job", jobScheduleController);
133152
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
134153
when(failoverService.getFailoveringItems("127.0.0.1@-@1")).thenReturn(Collections.singletonList(1));
154+
when(instanceNode.isInstancePath("/test_job/instances/127.0.0.1@-@1")).thenReturn(true);
155+
when(instanceNode.getInstanceFullPath()).thenReturn("/test_job/instances");
135156
failoverListenerManager.new JobCrashedJobListener().onChange(new DataChangedEvent(Type.DELETED, "/test_job/instances/127.0.0.1@-@1", ""));
136157
verify(failoverService).setCrashedFailoverFlagDirectly(1);
137158
verify(failoverService).failoverIfNecessary();
@@ -161,4 +182,25 @@ public void assertFailoverSettingsChangedJobListenerWhenIsFailoverPathAndUpdateB
161182
failoverListenerManager.new FailoverSettingsChangedJobListener().onChange(new DataChangedEvent(Type.UPDATED, "/test_job/config", LiteYamlConstants.getJobYamlWithFailover(false)));
162183
verify(failoverService).removeFailoverInfo();
163184
}
185+
186+
@Test
187+
public void assertLegacyCrashedRunningItemListenerWhenRunningItemsArePresent() {
188+
JobInstance jobInstance = new JobInstance("127.0.0.1@-@1");
189+
JobRegistry.getInstance().addJobInstance("test_job", jobInstance);
190+
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).build());
191+
when(instanceNode.getLocalInstancePath()).thenReturn("instances/127.0.0.1@-@1");
192+
when(instanceService.getAvailableJobInstances()).thenReturn(Collections.singletonList(jobInstance));
193+
Map<Integer, JobInstance> allRunningItems = new LinkedHashMap<>(2, 1);
194+
allRunningItems.put(0, new JobInstance("127.0.0.1@-@2"));
195+
allRunningItems.put(1, new JobInstance("127.0.0.1@-@2"));
196+
when(executionService.getAllRunningItems()).thenReturn(allRunningItems);
197+
when(failoverService.getAllFailoveringItems()).thenReturn(Collections.singletonMap(1, new JobInstance("127.0.0.1@-@2")));
198+
failoverListenerManager.new LegacyCrashedRunningItemListener().onChange(new DataChangedEvent(Type.ADDED, "/test_job/instances/127.0.0.1@-@1", ""));
199+
verify(failoverService).setCrashedFailoverFlagDirectly(1);
200+
verify(failoverService).clearFailoveringItem(1);
201+
verify(executionService).clearRunningInfo(Collections.singletonList(1));
202+
verify(failoverService).setCrashedFailoverFlag(0);
203+
verify(executionService).clearRunningInfo(Collections.singletonList(0));
204+
verify(failoverService).failoverIfNecessary();
205+
}
164206
}

0 commit comments

Comments
 (0)