11import datetime
2+ import time
23import ydb
34from .base import BaseWorkload
45from jobs .topic_jobs import TopicJobManager
@@ -10,47 +11,56 @@ def name(self) -> str:
1011 return "topic"
1112
1213 def create (self ):
13- self .logger .info ("Creating topic: %s" , self .args .topic_path )
14-
15- try :
16- self .driver .topic_client .create_topic (
17- path = self .args .topic_path ,
18- min_active_partitions = self .args .topic_min_partitions ,
19- max_active_partitions = self .args .topic_max_partitions ,
20- retention_period = datetime .timedelta (hours = self .args .topic_retention_hours ),
21- consumers = [self .args .topic_consumer ],
22- )
23- self .logger .info ("Topic created successfully: %s" , self .args .topic_path )
24- self .logger .info ("Consumer created: %s" , self .args .topic_consumer )
25-
26- except ydb .Error as e :
27- error_msg = str (e ).lower ()
28- if "already exists" in error_msg :
29- self .logger .info ("Topic already exists: %s" , self .args .topic_path )
30-
31- try :
32- description = self .driver .topic_client .describe_topic (self .args .topic_path )
33- consumer_exists = any (c .name == self .args .topic_consumer for c in description .consumers )
34-
35- if not consumer_exists :
36- self .logger .info ("Adding consumer %s to existing topic" , self .args .topic_consumer )
37- self .driver .topic_client .alter_topic (
38- path = self .args .topic_path , add_consumers = [self .args .topic_consumer ]
39- )
40- self .logger .info ("Consumer added successfully: %s" , self .args .topic_consumer )
41- else :
42- self .logger .info ("Consumer already exists: %s" , self .args .topic_consumer )
43-
44- except Exception as alter_err :
45- self .logger .warning ("Failed to add consumer: %s" , alter_err )
14+ retry_no = 0
15+ while retry_no < 3 :
16+ self .logger .info ("Creating topic: %s (retry no: %d)" , self .args .topic_path , retry_no )
17+
18+ try :
19+ self .driver .topic_client .create_topic (
20+ path = self .args .topic_path ,
21+ min_active_partitions = self .args .topic_min_partitions ,
22+ max_active_partitions = self .args .topic_max_partitions ,
23+ retention_period = datetime .timedelta (hours = self .args .topic_retention_hours ),
24+ consumers = [self .args .topic_consumer ],
25+ )
26+ self .logger .info ("Topic created successfully: %s" , self .args .topic_path )
27+ self .logger .info ("Consumer created: %s" , self .args .topic_consumer )
28+ return
29+
30+ except ydb .Error as e :
31+ error_msg = str (e ).lower ()
32+ if "already exists" in error_msg :
33+ self .logger .info ("Topic already exists: %s" , self .args .topic_path )
34+
35+ try :
36+ description = self .driver .topic_client .describe_topic (self .args .topic_path )
37+ consumer_exists = any (c .name == self .args .topic_consumer for c in description .consumers )
38+
39+ if not consumer_exists :
40+ self .logger .info ("Adding consumer %s to existing topic" , self .args .topic_consumer )
41+ self .driver .topic_client .alter_topic (
42+ path = self .args .topic_path , add_consumers = [self .args .topic_consumer ]
43+ )
44+ self .logger .info ("Consumer added successfully: %s" , self .args .topic_consumer )
45+ return
46+ else :
47+ self .logger .info ("Consumer already exists: %s" , self .args .topic_consumer )
48+ return
49+
50+ except Exception as alter_err :
51+ self .logger .warning ("Failed to add consumer: %s" , alter_err )
52+ raise
53+ elif "storage pool" in error_msg or "pq" in error_msg :
54+ self .logger .error ("YDB instance does not support topics (PersistentQueues): %s" , e )
55+ self .logger .error ("Please use YDB instance with topic support" )
56+ raise
57+ elif "Schemeshard not available" in error_msg :
58+ self .logger .info ("YDB instance is not ready (Schemeshard not available), retrying in 5 seconds..." )
59+ time .sleep (5 )
60+ retry_no += 1
61+ else :
62+ self .logger .error ("Failed to create topic: %s" , e )
4663 raise
47- elif "storage pool" in error_msg or "pq" in error_msg :
48- self .logger .error ("YDB instance does not support topics (PersistentQueues): %s" , e )
49- self .logger .error ("Please use YDB instance with topic support" )
50- raise
51- else :
52- self .logger .error ("Failed to create topic: %s" , e )
53- raise
5464
5565 def run_slo (self , metrics ):
5666 self .logger .info ("Starting topic SLO tests" )
0 commit comments