|
17 | 17 |
|
18 | 18 | package org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable.lock; |
19 | 19 |
|
| 20 | +import org.apache.shardingsphere.distsql.handler.engine.update.DistSQLUpdateExecutor; |
20 | 21 | import org.apache.shardingsphere.distsql.segment.AlgorithmSegment; |
21 | 22 | import org.apache.shardingsphere.distsql.statement.type.ral.updatable.LockClusterStatement; |
| 23 | +import org.apache.shardingsphere.infra.algorithm.core.exception.MissingRequiredAlgorithmException; |
22 | 24 | import org.apache.shardingsphere.infra.spi.exception.ServiceProviderNotFoundException; |
| 25 | +import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; |
| 26 | +import org.apache.shardingsphere.mode.exclusive.ExclusiveOperatorEngine; |
| 27 | +import org.apache.shardingsphere.mode.exclusive.callback.ExclusiveOperationVoidCallback; |
23 | 28 | import org.apache.shardingsphere.mode.manager.ContextManager; |
24 | 29 | import org.apache.shardingsphere.mode.manager.cluster.lock.exception.LockedClusterException; |
25 | 30 | import org.apache.shardingsphere.mode.state.ShardingSphereState; |
26 | | -import org.junit.jupiter.api.Test; |
| 31 | +import org.apache.shardingsphere.mode.state.StatePersistService; |
| 32 | +import org.apache.shardingsphere.proxy.backend.context.ProxyContext; |
| 33 | +import org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension; |
| 34 | +import org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings; |
27 | 35 | import org.junit.jupiter.api.extension.ExtendWith; |
28 | | -import org.mockito.junit.jupiter.MockitoExtension; |
| 36 | +import org.junit.jupiter.params.ParameterizedTest; |
| 37 | +import org.junit.jupiter.params.provider.Arguments; |
| 38 | +import org.junit.jupiter.params.provider.MethodSource; |
29 | 39 |
|
| 40 | +import java.sql.SQLException; |
30 | 41 | import java.util.Properties; |
| 42 | +import java.util.stream.Stream; |
31 | 43 |
|
32 | 44 | import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; |
33 | 45 | import static org.junit.jupiter.api.Assertions.assertThrows; |
34 | 46 | import static org.mockito.Mockito.RETURNS_DEEP_STUBS; |
| 47 | +import static org.mockito.Mockito.any; |
| 48 | +import static org.mockito.Mockito.anyLong; |
| 49 | +import static org.mockito.Mockito.doAnswer; |
| 50 | +import static org.mockito.Mockito.eq; |
35 | 51 | import static org.mockito.Mockito.mock; |
| 52 | +import static org.mockito.Mockito.never; |
| 53 | +import static org.mockito.Mockito.verify; |
36 | 54 | import static org.mockito.Mockito.when; |
37 | 55 |
|
38 | | -@ExtendWith(MockitoExtension.class) |
| 56 | +@ExtendWith(AutoMockExtension.class) |
| 57 | +@StaticMockSettings(ProxyContext.class) |
39 | 58 | class LockClusterExecutorTest { |
40 | 59 |
|
41 | | - private final LockClusterExecutor executor = new LockClusterExecutor(); |
| 60 | + private final LockClusterExecutor executor = (LockClusterExecutor) TypedSPILoader.getService(DistSQLUpdateExecutor.class, LockClusterStatement.class); |
42 | 61 |
|
43 | | - @SuppressWarnings("resource") |
44 | | - @Test |
45 | | - void assertExecuteUpdateWithLockedCluster() { |
| 62 | + @ParameterizedTest(name = "{0}") |
| 63 | + @MethodSource("provideFailureScenarios") |
| 64 | + void assertExecuteUpdateWithFailureScenarios(final String name, final LockClusterStatement sqlStatement, |
| 65 | + final ShardingSphereState state, final Class<? extends Throwable> expectedException) throws SQLException { |
46 | 66 | ContextManager contextManager = mock(ContextManager.class, RETURNS_DEEP_STUBS); |
47 | | - when(contextManager.getStateContext().getState()).thenReturn(ShardingSphereState.UNAVAILABLE); |
48 | | - assertThrows(LockedClusterException.class, () -> executor.executeUpdate(new LockClusterStatement(new AlgorithmSegment("FOO", new Properties()), null), contextManager)); |
| 67 | + when(contextManager.getStateContext().getState()).thenReturn(state); |
| 68 | + assertThrows(expectedException, () -> executor.executeUpdate(sqlStatement, contextManager)); |
| 69 | + verify(contextManager.getExclusiveOperatorEngine(), never()).operate(any(), anyLong(), any()); |
49 | 70 | } |
50 | 71 |
|
51 | | - @SuppressWarnings("resource") |
52 | | - @Test |
53 | | - void assertExecuteUpdateWithWrongAlgorithm() { |
| 72 | + @ParameterizedTest(name = "{0}") |
| 73 | + @MethodSource("provideOperationScenarios") |
| 74 | + void assertExecuteUpdateWithOperationScenarios(final String name, final LockClusterStatement sqlStatement, |
| 75 | + final long expectedTimeoutMillis, final ShardingSphereState callbackState, |
| 76 | + final Class<? extends Throwable> expectedException, final boolean expectStateUpdated) throws SQLException { |
54 | 77 | ContextManager contextManager = mock(ContextManager.class, RETURNS_DEEP_STUBS); |
55 | | - when(contextManager.getStateContext().getState()).thenReturn(ShardingSphereState.OK); |
56 | | - assertThrows(ServiceProviderNotFoundException.class, () -> executor.executeUpdate(new LockClusterStatement(new AlgorithmSegment("FOO", new Properties()), null), contextManager)); |
| 78 | + ExclusiveOperatorEngine exclusiveOperatorEngine = mock(ExclusiveOperatorEngine.class); |
| 79 | + when(contextManager.getExclusiveOperatorEngine()).thenReturn(exclusiveOperatorEngine); |
| 80 | + when(contextManager.getStateContext().getState()).thenReturn(ShardingSphereState.OK, callbackState); |
| 81 | + StatePersistService stateService = null; |
| 82 | + if (expectStateUpdated) { |
| 83 | + stateService = mock(StatePersistService.class); |
| 84 | + mockStateService(stateService); |
| 85 | + } |
| 86 | + doAnswer(invocation -> { |
| 87 | + ExclusiveOperationVoidCallback callback = invocation.getArgument(2); |
| 88 | + callback.execute(); |
| 89 | + return null; |
| 90 | + }).when(exclusiveOperatorEngine).operate(any(), anyLong(), any(ExclusiveOperationVoidCallback.class)); |
| 91 | + if (null == expectedException) { |
| 92 | + assertDoesNotThrow(() -> executor.executeUpdate(sqlStatement, contextManager)); |
| 93 | + } else { |
| 94 | + assertThrows(expectedException, () -> executor.executeUpdate(sqlStatement, contextManager)); |
| 95 | + } |
| 96 | + verify(exclusiveOperatorEngine).operate(any(), eq(expectedTimeoutMillis), any(ExclusiveOperationVoidCallback.class)); |
| 97 | + if (expectStateUpdated) { |
| 98 | + verify(stateService).update(ShardingSphereState.READ_ONLY); |
| 99 | + } |
57 | 100 | } |
58 | 101 |
|
59 | | - @SuppressWarnings("resource") |
60 | | - @Test |
61 | | - void assertExecuteUpdateWithUsingTimeout() { |
| 102 | + private void mockStateService(final StatePersistService stateService) { |
| 103 | + ProxyContext proxyContext = mock(ProxyContext.class); |
62 | 104 | ContextManager contextManager = mock(ContextManager.class, RETURNS_DEEP_STUBS); |
63 | | - when(contextManager.getStateContext().getState()).thenReturn(ShardingSphereState.OK); |
64 | | - assertDoesNotThrow(() -> executor.executeUpdate(new LockClusterStatement(new AlgorithmSegment("WRITE", new Properties()), 2000L), contextManager)); |
| 105 | + when(proxyContext.getContextManager()).thenReturn(contextManager); |
| 106 | + when(contextManager.getPersistServiceFacade().getStateService()).thenReturn(stateService); |
| 107 | + when(ProxyContext.getInstance()).thenReturn(proxyContext); |
| 108 | + } |
| 109 | + |
| 110 | + private static Stream<Arguments> provideFailureScenarios() { |
| 111 | + return Stream.of( |
| 112 | + Arguments.of("cluster state is unavailable", new LockClusterStatement(new AlgorithmSegment("WRITE", new Properties()), 2000L), |
| 113 | + ShardingSphereState.UNAVAILABLE, LockedClusterException.class), |
| 114 | + Arguments.of("lock strategy is required", new LockClusterStatement(null, 2000L), ShardingSphereState.OK, MissingRequiredAlgorithmException.class), |
| 115 | + Arguments.of("lock strategy is unsupported", new LockClusterStatement(new AlgorithmSegment("FOO", new Properties()), 2000L), |
| 116 | + ShardingSphereState.OK, ServiceProviderNotFoundException.class)); |
| 117 | + } |
| 118 | + |
| 119 | + private static Stream<Arguments> provideOperationScenarios() { |
| 120 | + return Stream.of( |
| 121 | + Arguments.of("use explicit timeout", new LockClusterStatement(new AlgorithmSegment("WRITE", new Properties()), 2000L), |
| 122 | + 2000L, ShardingSphereState.OK, null, true), |
| 123 | + Arguments.of("use default timeout when absent", new LockClusterStatement(new AlgorithmSegment("WRITE", new Properties())), |
| 124 | + 3000L, ShardingSphereState.OK, null, true), |
| 125 | + Arguments.of("throw exception when callback state changes", new LockClusterStatement(new AlgorithmSegment("WRITE", new Properties()), 2000L), |
| 126 | + 2000L, ShardingSphereState.UNAVAILABLE, LockedClusterException.class, false)); |
65 | 127 | } |
66 | 128 | } |
0 commit comments