33import os
44from typing import Any , Dict
55
6- from ..logger import get_logger
6+ from ..util . logger import get_logger
77from ..util .config import get_topics_list , is_disabled
8- from .. core . manager import fetch_metadata , optimal_cfg
8+ from .metadata import fetch_metadata , optimal_cfg
99from ..core .reporter import send_clients_msg
1010from .tracker import ProducerTracker , Heartbeat
1111
@@ -30,12 +30,17 @@ def init_patch(self, *args, **kwargs):
3030 bootstrap = orig_cfg .get ("bootstrap_servers" ) or (args [0 ] if args else None )
3131 if not bootstrap :
3232 return orig_init (self , * args , ** kwargs )
33+ bootstrap = normalize_bootstrap (bootstrap )
3334 client_id = orig_cfg .get ("client_id" , "" )
3435 if client_id .startswith (_SUPERLIB_PREFIX ):
3536 return orig_init (self , * args , ** kwargs )
3637 topics_env = get_topics_list ()
37- metadata = fetch_metadata (bootstrap , orig_cfg )
38- opt_cfg = optimal_cfg (metadata , topics_env , orig_cfg )
38+ metadata = fetch_metadata (bootstrap , orig_cfg , "kafka-python" )
39+ error_msg = ""
40+ if metadata and not metadata .active :
41+ error_msg = "[ERR-301] Superstream optimization is not active for this kafka cluster, please head to the Superstream console and activate it."
42+ logger .error (error_msg )
43+ opt_cfg = optimal_cfg (metadata , topics_env , orig_cfg ) if metadata and metadata .active else {}
3944 for k , v in opt_cfg .items ():
4045 snake = k .replace ("." , "_" )
4146 if kwargs .get (snake ) != v :
@@ -64,13 +69,15 @@ def send_patch(inner, topic, *a, **kw):
6469 orig_close = self .close
6570
6671 def close_patch (inner , * a , ** kw ):
67- tr .close ()
68- Heartbeat .unregister_tracker (tr .uuid )
72+ if not hasattr (self , "_superstream_closed" ):
73+ self ._superstream_closed = True
74+ tr .close ()
75+ Heartbeat .unregister_tracker (tr .uuid )
6976 return orig_close (* a , ** kw )
7077
7178 self .close = close_patch
7279 self ._superstream_patch = True
73- send_clients_msg (tr )
80+ send_clients_msg (tr , error_msg )
7481 logger .info ("Successfully optimized producer configuration for {}" , client_id )
7582
7683 Producer .__init__ = init_patch
@@ -92,12 +99,17 @@ def init_patch(self, *args, **kwargs):
9299 bootstrap = args [0 ]
93100 if not bootstrap :
94101 return orig_init (self , * args , ** kwargs )
102+ bootstrap = normalize_bootstrap (bootstrap )
95103 client_id = orig_cfg .get ("client_id" , "" )
96104 if client_id .startswith (_SUPERLIB_PREFIX ):
97105 return orig_init (self , * args , ** kwargs )
98106 topics_env = get_topics_list ()
99- metadata = fetch_metadata (bootstrap , orig_cfg )
100- opt_cfg = optimal_cfg (metadata , topics_env , orig_cfg )
107+ metadata = fetch_metadata (bootstrap , orig_cfg , "aiokafka" )
108+ error_msg = ""
109+ if metadata and not metadata .active :
110+ error_msg = "[ERR-301] Superstream optimization is not active for this kafka cluster, please head to the Superstream console and activate it."
111+ logger .error (error_msg )
112+ opt_cfg = optimal_cfg (metadata , topics_env , orig_cfg ) if metadata and metadata .active else {}
101113 for k , v in opt_cfg .items ():
102114 if kwargs .get (k ) != v :
103115 logger .debug ("Overriding configuration: {} -> {}" , k , v )
@@ -125,13 +137,15 @@ async def send_patch(inner, topic, *a, **kw):
125137 original_stop = self .stop
126138
127139 async def stop_patch (inner , * a , ** kw ):
140+ if not hasattr (self , "_superstream_closed" ):
141+ self ._superstream_closed = True
142+ tr .close ()
143+ Heartbeat .unregister_tracker (tr .uuid )
128144 await original_stop (* a , ** kw )
129- tr .close ()
130- Heartbeat .unregister_tracker (tr .uuid )
131145
132146 self .stop = stop_patch
133147 self ._superstream_patch = True
134- send_clients_msg (tr )
148+ send_clients_msg (tr , error_msg )
135149 logger .info ("Successfully optimized producer configuration for {}" , client_id )
136150
137151 Producer .__init__ = init_patch
@@ -149,12 +163,19 @@ def init_patch(self, conf: Dict[str, Any], *args, **kwargs):
149163 return orig_init (self , conf , * args , ** kwargs )
150164 conf = dict (conf )
151165 bootstrap = conf .get ("bootstrap.servers" )
166+ if not bootstrap :
167+ return orig_init (self , conf , * args , ** kwargs )
168+ bootstrap = normalize_bootstrap (bootstrap )
152169 client_id = conf .get ("client.id" , "" )
153170 if client_id .startswith (_SUPERLIB_PREFIX ):
154171 return orig_init (self , conf , * args , ** kwargs )
155172 topics_env = get_topics_list ()
156- metadata = fetch_metadata (bootstrap , conf )
157- opt_cfg = optimal_cfg (metadata , topics_env , conf )
173+ metadata = fetch_metadata (bootstrap , conf , "confluent" )
174+ error_msg = ""
175+ if metadata and not metadata .active :
176+ error_msg = "[ERR-301] Superstream optimization is not active for this kafka cluster, please head to the Superstream console and activate it."
177+ logger .error (error_msg )
178+ opt_cfg = optimal_cfg (metadata , topics_env , conf ) if metadata and metadata .active else {}
158179 for k , v in opt_cfg .items ():
159180 if conf .get (k ) != v :
160181 logger .debug ("Overriding configuration: {} -> {}" , k , v )
@@ -179,25 +200,18 @@ def produce_patch(inner, topic, *a, **kw):
179200 return original_produce (topic , * a , ** kw )
180201
181202 self .produce = produce_patch
182- orig_flush = self .flush
183-
184- def flush_patch (inner , * a , ** kw ):
185- tr .close ()
186- Heartbeat .unregister_tracker (tr .uuid )
187- return orig_flush (* a , ** kw )
188-
189- self .flush = flush_patch
190- if hasattr (self , "close" ):
191- orig_close = self .close
203+ orig_close = self .close
192204
193- def close_patch (inner , * a , ** kw ):
205+ def close_patch (inner , * a , ** kw ):
206+ if not hasattr (self , "_superstream_closed" ):
207+ self ._superstream_closed = True
194208 tr .close ()
195209 Heartbeat .unregister_tracker (tr .uuid )
196- return orig_close (* a , ** kw )
210+ return orig_close (* a , ** kw )
197211
198- self .close = close_patch
212+ self .close = close_patch
199213 self ._superstream_patch = True
200- send_clients_msg (tr )
214+ send_clients_msg (tr , error_msg )
201215 logger .info ("Successfully optimized producer configuration for {}" , client_id )
202216
203217 Producer .__init__ = init_patch
0 commit comments