41
41
import org .junit .jupiter .api .AfterEach ;
42
42
import org .junit .jupiter .api .BeforeEach ;
43
43
import org .junit .jupiter .api .Test ;
44
+ import org .junit .jupiter .api .Timeout ;
44
45
import org .junit .jupiter .params .ParameterizedTest ;
45
46
import org .junit .jupiter .params .provider .EnumSource ;
46
47
60
61
import java .util .Queue ;
61
62
import java .util .concurrent .CompletableFuture ;
62
63
import java .util .concurrent .ConcurrentLinkedDeque ;
64
+ import java .util .concurrent .TimeUnit ;
63
65
import java .util .function .Supplier ;
64
66
65
67
import static org .apache .flink .runtime .jobgraph .tasks .CheckpointCoordinatorConfiguration .MINIMAL_CHECKPOINT_TIME ;
@@ -131,6 +133,7 @@ void clearData() {
131
133
132
134
@ ParameterizedTest
133
135
@ EnumSource (DeliveryGuarantee .class )
136
+ @ Timeout (value = 30000 , unit = TimeUnit .MILLISECONDS )
134
137
void testForNormalCaseWithoutFailure (
135
138
DeliveryGuarantee guarantee , @ InjectClusterClient ClusterClient <?> client )
136
139
throws Exception {
@@ -149,6 +152,7 @@ void testForNormalCaseWithoutFailure(
149
152
}
150
153
151
154
@ Test
155
+ @ Timeout (value = 30000 , unit = TimeUnit .MILLISECONDS )
152
156
void testExactlyOnceWithFailure (@ InjectClusterClient ClusterClient <?> client ) throws Exception {
153
157
// Test continuous + unbounded splits
154
158
StreamExecutionEnvironment env = getEnvWithRestartStrategyParallelism ();
@@ -167,6 +171,7 @@ void testExactlyOnceWithFailure(@InjectClusterClient ClusterClient<?> client) th
167
171
}
168
172
169
173
@ Test
174
+ @ Timeout (value = 30000 , unit = TimeUnit .MILLISECONDS )
170
175
void testAtLeastOnceWithFailure (@ InjectClusterClient ClusterClient <?> client ) throws Exception {
171
176
// Test continuous + unbounded splits
172
177
StreamExecutionEnvironment env = getEnvWithRestartStrategyParallelism ();
@@ -186,6 +191,7 @@ void testAtLeastOnceWithFailure(@InjectClusterClient ClusterClient<?> client) th
186
191
}
187
192
188
193
@ Test
194
+ @ Timeout (value = 30000 , unit = TimeUnit .MILLISECONDS )
189
195
void testAtMostOnceWithFailure (@ InjectClusterClient ClusterClient <?> client ) throws Exception {
190
196
// Test continuous + unbounded splits
191
197
StreamExecutionEnvironment env = getEnvWithRestartStrategyParallelism ();
0 commit comments