1+ from time import sleep
2+ import random
13from testflows .core import *
24from helpers .common import getuid
35from s3 .tests .export_part .steps import *
@@ -264,10 +266,30 @@ def packet_rate_limit(self, rate_mbit):
264266 )
265267
266268
267- @TestScenario
268- @Requirements (RQ_ClickHouse_ExportPart_NetworkResilience_DestinationInterruption ("1.0" ))
269- def minio_network_interruption (self , number_of_values = 3 , signal = "KILL" ):
270- """Check that restarting MinIO while exporting parts inbetween works correctly."""
269+ def get_minio_interruption_strategies ():
270+ return [
271+ ("before" , "KILL" ),
272+ ("before" , "TERM" ),
273+ ("before" , "SEGV" ),
274+ ("during" , "KILL" ),
275+ ("during" , "TERM" ),
276+ ("during" , "SEGV" ),
277+ ("after" , "KILL" ),
278+ ("after" , "TERM" ),
279+ ("after" , "SEGV" ),
280+ ("random" , "KILL" ),
281+ ("random" , "TERM" ),
282+ ("random" , "SEGV" ),
283+ ]
284+
285+
286+ @TestOutline (Scenario )
287+ @Examples (
288+ "strategy, signal" ,
289+ get_minio_interruption_strategies (),
290+ )
291+ def minio_interruption (self , strategy , signal ):
292+ """Check that MinIO interruptions at different times during exports work correctly."""
271293
272294 with Given ("I create a populated source table and empty S3 table" ):
273295 source_table = "source_" + getuid ()
@@ -276,42 +298,71 @@ def minio_network_interruption(self, number_of_values=3, signal="KILL"):
276298 table_name = source_table ,
277299 partition_by = "p" ,
278300 columns = default_columns (),
279- number_of_values = number_of_values ,
280301 stop_merges = True ,
281302 )
282303 s3_table_name = create_s3_table (table_name = "s3" , create_new_bucket = True )
283304
284- with And ("I stop MinIO " ):
285- kill_minio ( signal = signal )
305+ with And ("I slow the network to make export take longer " ):
306+ network_packet_rate_limit ( node = self . context . node , rate_mbit = 0.05 )
286307
287- with When ("I read export events" ):
288- initial_events = get_export_events (node = self .context .node )
308+ if strategy == "before" :
309+ with When ("I kill MinIO before export" ):
310+ kill_minio (signal = signal )
311+ export_parts (
312+ source_table = source_table ,
313+ destination_table = s3_table_name ,
314+ node = self .context .node ,
315+ )
289316
290- with And ("I export data" ):
291- export_parts (
292- source_table = source_table ,
293- destination_table = s3_table_name ,
294- node = self .context .node ,
295- )
317+ elif strategy == "during" :
318+ with When ("I kill MinIO during export" ):
319+ export_parts (
320+ source_table = source_table ,
321+ destination_table = s3_table_name ,
322+ node = self .context .node ,
323+ )
324+ kill_minio (signal = signal )
296325
297- with And ("I start MinIO" ):
298- start_minio ()
326+ elif strategy == "after" :
327+ with When ("I export data" ):
328+ export_parts (
329+ source_table = source_table ,
330+ destination_table = s3_table_name ,
331+ node = self .context .node ,
332+ )
299333
300- with Then ("Destination data should be a subset of source data" ):
301- source_data = select_all_ordered (
302- table_name = source_table , node = self .context .node
303- )
304- destination_data = select_all_ordered (
305- table_name = s3_table_name , node = self .context .node
306- )
307- assert set (source_data ) >= set (destination_data ), error ()
334+ with And ("I kill MinIO after export" ):
335+ sleep (5 )
336+ kill_minio (signal = signal )
308337
309- with And ("Failed exports should be logged in the system.events table" ):
310- final_events = get_export_events (node = self .context .node )
311- assert (
312- final_events ["PartsExportFailures" ] - initial_events ["PartsExportFailures" ]
313- == (len (source_data ) - len (destination_data )) / number_of_values
314- ), error ()
338+ elif strategy == "random" :
339+ with When ("I kill MinIO at a random time after export" ):
340+ export_parts (
341+ source_table = source_table ,
342+ destination_table = s3_table_name ,
343+ node = self .context .node ,
344+ )
345+ sleep (random .uniform (0 , 3 ))
346+ kill_minio (signal = signal )
347+
348+ with Then ("I start MinIO" ):
349+ sleep (5 )
350+ start_minio ()
351+
352+ with And ("Check destination matches source, or is a subset if export was interrupted" ):
353+ if strategy == "after" :
354+ source_matches_destination (
355+ source_table = source_table ,
356+ destination_table = s3_table_name ,
357+ )
358+ else :
359+ source_data = select_all_ordered (
360+ table_name = source_table , node = self .context .node
361+ )
362+ destination_data = select_all_ordered (
363+ table_name = s3_table_name , node = self .context .node
364+ )
365+ assert set (source_data ) >= set (destination_data ), error ()
315366
316367
317368@TestScenario
@@ -377,7 +428,6 @@ def feature(self):
377428 Scenario (test = packet_duplication )(percent_duplicated = 50 )
378429 Scenario (test = packet_reordering )(delay_ms = 100 , percent_reordered = 90 )
379430 Scenario (test = packet_rate_limit )(rate_mbit = 0.05 )
380- Scenario (test = minio_network_interruption )(signal = "TERM" )
381- Scenario (test = minio_network_interruption )(signal = "KILL" )
431+ Scenario (run = minio_interruption )
382432 Scenario (test = clickhouse_network_interruption )(safe = True )
383433 Scenario (test = clickhouse_network_interruption )(safe = False )
0 commit comments