11import datetime
2+ import time
23import ydb
34from .base import BaseWorkload
45from jobs .topic_jobs import TopicJobManager
@@ -10,47 +11,59 @@ def name(self) -> str:
1011 return "topic"
1112
1213 def create (self ):
13- self . logger . info ( "Creating topic: %s" , self . args . topic_path )
14-
15- try :
16- self . driver . topic_client . create_topic (
17- path = self . args . topic_path ,
18- min_active_partitions = self .args . topic_min_partitions ,
19- max_active_partitions = self .args .topic_max_partitions ,
20- retention_period = datetime . timedelta ( hours = self .args .topic_retention_hours ) ,
21- consumers = [ self .args .topic_consumer ] ,
22- )
23- self . logger . info ( "Topic created successfully: %s" , self .args .topic_path )
24- self . logger . info ( "Consumer created: %s" , self . args . topic_consumer )
14+ retry_no = 0
15+ while retry_no < 3 :
16+ self . logger . info ( "Creating topic: %s (retry no: %d)" , self . args . topic_path , retry_no )
17+
18+ try :
19+ self .driver . topic_client . create_topic (
20+ path = self .args .topic_path ,
21+ min_active_partitions = self .args .topic_min_partitions ,
22+ max_active_partitions = self .args .topic_max_partitions ,
23+ retention_period = datetime . timedelta ( hours = self . args . topic_retention_hours ),
24+ consumers = [ self .args .topic_consumer ],
25+ )
2526
26- except ydb .Error as e :
27- error_msg = str (e ).lower ()
28- if "already exists" in error_msg :
29- self .logger .info ("Topic already exists: %s" , self .args .topic_path )
30-
31- try :
32- description = self .driver .topic_client .describe_topic (self .args .topic_path )
33- consumer_exists = any (c .name == self .args .topic_consumer for c in description .consumers )
34-
35- if not consumer_exists :
36- self .logger .info ("Adding consumer %s to existing topic" , self .args .topic_consumer )
37- self .driver .topic_client .alter_topic (
38- path = self .args .topic_path , add_consumers = [self .args .topic_consumer ]
39- )
40- self .logger .info ("Consumer added successfully: %s" , self .args .topic_consumer )
41- else :
42- self .logger .info ("Consumer already exists: %s" , self .args .topic_consumer )
43-
44- except Exception as alter_err :
45- self .logger .warning ("Failed to add consumer: %s" , alter_err )
27+ self .logger .info ("Topic created successfully: %s" , self .args .topic_path )
28+ self .logger .info ("Consumer created: %s" , self .args .topic_consumer )
29+ return
30+
31+ except ydb .Error as e :
32+ error_msg = str (e ).lower ()
33+ if "already exists" in error_msg :
34+ self .logger .info ("Topic already exists: %s" , self .args .topic_path )
35+
36+ try :
37+ description = self .driver .topic_client .describe_topic (self .args .topic_path )
38+ consumer_exists = any (c .name == self .args .topic_consumer for c in description .consumers )
39+
40+ if not consumer_exists :
41+ self .logger .info ("Adding consumer %s to existing topic" , self .args .topic_consumer )
42+ self .driver .topic_client .alter_topic (
43+ path = self .args .topic_path , add_consumers = [self .args .topic_consumer ]
44+ )
45+ self .logger .info ("Consumer added successfully: %s" , self .args .topic_consumer )
46+ return
47+ else :
48+ self .logger .info ("Consumer already exists: %s" , self .args .topic_consumer )
49+ return
50+
51+ except Exception as alter_err :
52+ self .logger .warning ("Failed to add consumer: %s" , alter_err )
53+ raise
54+ elif "storage pool" in error_msg or "pq" in error_msg :
55+ self .logger .error ("YDB instance does not support topics (PersistentQueues): %s" , e )
56+ self .logger .error ("Please use YDB instance with topic support" )
4657 raise
47- elif "storage pool" in error_msg or "pq" in error_msg :
48- self .logger .error ("YDB instance does not support topics (PersistentQueues): %s" , e )
49- self .logger .error ("Please use YDB instance with topic support" )
50- raise
51- else :
52- self .logger .error ("Failed to create topic: %s" , e )
53- raise
58+ elif isinstance (e , ydb .Unavailable ):
59+ self .logger .info ("YDB instance is not ready, retrying in 5 seconds..." )
60+ time .sleep (5 )
61+ retry_no += 1
62+ else :
63+ self .logger .error ("Failed to create topic: %s" , e )
64+ raise
65+
66+ raise RuntimeError ("Failed to create topic" )
5467
5568 def run_slo (self , metrics ):
5669 self .logger .info ("Starting topic SLO tests" )
0 commit comments