@@ -62,7 +62,7 @@ def init_patch(self, *args, **kwargs):
6262 error_msg = "[ERR-304] Failed to fetch metadata for producer with client id {}: Unable to connect to Superstream service" .format (client_id )
6363 # Skip optimization but keep stats reporting
6464 opt_cfg = {}
65- elif not metadata .active :
65+ elif not metadata .get ( " active" , True ) :
6666 error_msg = "[ERR-301] Superstream optimization is not active for this kafka cluster, please head to the Superstream console and activate it."
6767 logger .error (error_msg )
6868 # Skip optimization but keep stats reporting
@@ -73,13 +73,12 @@ def init_patch(self, *args, **kwargs):
7373
7474 # Apply optimized configuration
7575 for k , v in opt_cfg .items ():
76- snake = k .replace ("." , "_" )
77- if kwargs .get (snake ) != v :
78- logger .debug ("Overriding configuration: {} -> {}" , snake , v )
79- kwargs [snake ] = v
76+ if kwargs .get (k ) != v :
77+ logger .debug ("Overriding configuration: {} -> {}" , k , v )
78+ kwargs [k ] = v
8079
8180 # Set up reporting interval
82- report_interval = metadata .report_interval_ms if metadata else _DEFAULT_REPORT_INTERVAL_MS
81+ report_interval = metadata .get ( " report_interval_ms" ) if metadata else _DEFAULT_REPORT_INTERVAL_MS
8382
8483 # Create and register producer tracker
8584 tr = ProducerTracker (
@@ -181,7 +180,7 @@ def init_patch(self, *args, **kwargs):
181180 logger .error (error_msg )
182181 # Skip optimization but keep stats reporting
183182 opt_cfg = {}
184- elif not metadata .active :
183+ elif not metadata .get ( " active" , True ) :
185184 error_msg = "[ERR-301] Superstream optimization is not active for this kafka cluster, please head to the Superstream console and activate it."
186185 logger .error (error_msg )
187186 # Skip optimization but keep stats reporting
@@ -193,7 +192,7 @@ def init_patch(self, *args, **kwargs):
193192 if kwargs .get (k ) != v :
194193 logger .debug ("Overriding configuration: {} -> {}" , k , v )
195194 kwargs [k ] = v
196- report_interval = metadata .report_interval_ms if metadata else _DEFAULT_REPORT_INTERVAL_MS
195+ report_interval = metadata .get ( " report_interval_ms" ) if metadata else _DEFAULT_REPORT_INTERVAL_MS
197196 tr = ProducerTracker (
198197 lib = "aiokafka" ,
199198 producer = self ,
@@ -280,7 +279,7 @@ def init_patch(self, conf: Dict[str, Any], *args, **kwargs):
280279 logger .error (error_msg )
281280 # Skip optimization but keep stats reporting
282281 opt_cfg = {}
283- elif not metadata .active :
282+ elif not metadata .get ( " active" , True ) :
284283 error_msg = "[ERR-301] Superstream optimization is not active for this kafka cluster, please head to the Superstream console and activate it."
285284 logger .error (error_msg )
286285 # Skip optimization but keep stats reporting
@@ -292,7 +291,7 @@ def init_patch(self, conf: Dict[str, Any], *args, **kwargs):
292291 if conf .get (k ) != v :
293292 logger .debug ("Overriding configuration: {} -> {}" , k , v )
294293 conf [k ] = v
295- report_interval = metadata .report_interval_ms if metadata else _DEFAULT_REPORT_INTERVAL_MS
294+ report_interval = metadata .get ( " report_interval_ms" ) if metadata else _DEFAULT_REPORT_INTERVAL_MS
296295 tr = ProducerTracker (
297296 lib = "confluent" ,
298297 producer = self ,
0 commit comments