Skip to content

Commit c094f1c

Browse files
fixes
1 parent 872fb78 commit c094f1c

File tree

10 files changed

+114
-47
lines changed

10 files changed

+114
-47
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,6 @@ venv/
22
__pycache__/
33
*.py[cod]
44
*$py.class
5-
superclient.egg-info
5+
superclient.egg-info
6+
build/
7+
dist/

MANIFEST.in

Lines changed: 0 additions & 2 deletions
This file was deleted.

README.md

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
</div>
77

8-
# Superstream Client For Python
8+
# Superclient Python
99

1010
A Python library for automatically optimizing Kafka producer configurations based on topic-specific recommendations.
1111

@@ -82,26 +82,35 @@ The Superstream library needs to modify your producer's configuration to apply o
8282
### Step 1: Install Superclient
8383

8484
```bash
85+
# Step 1: Install the package
8586
pip install superclient
86-
```
8787

88-
### Step 2: Run
88+
# Step 2: One-time setup (enables automatic loading)
89+
python -m superclient install_pth
90+
```
8991

90-
The package ships with a `sitecustomize.py` entry-point, therefore Python imports the agent automatically before your application's code starts. This is the recommended and default way to use Superclient.
92+
That's it! Superclient will now automatically load and optimize all Kafka producers in your Python environment.
9193

92-
#### Manual Initialization (Only if needed)
94+
## Usage
9395

94-
If `sitecustomize` is disabled in your environment (e.g., when using `python -S` or when `PYTHONNOUSERSITE` is set), you can initialize manually by adding this import at the very beginning of your application's main entry point (e.g., `main.py`, `app.py`, or `__init__.py`):
96+
After installation, superclient works automatically. Just use your Kafka clients as usual:
9597

9698
```python
97-
import superclient # side-effects automatically enable the agent
98-
99-
# Your application code follows
99+
# kafka-python
100100
from kafka import KafkaProducer
101-
# ... rest of your imports and code
102-
```
101+
producer = KafkaProducer(bootstrap_servers='localhost:9092')
102+
# Automatically optimized!
103103

104-
Note: The manual import must be placed before any Kafka-related imports to ensure proper interception of producer creation.
104+
# confluent-kafka
105+
from confluent_kafka import Producer
106+
producer = Producer({'bootstrap.servers': 'localhost:9092'})
107+
# Automatically optimized!
108+
109+
# aiokafka
110+
from aiokafka import AIOKafkaProducer
111+
producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
112+
# Automatically optimized!
113+
```
105114

106115
### Docker Integration
107116

@@ -113,6 +122,9 @@ FROM python:3.8-slim
113122
# Install superclient
114123
RUN pip install superclient
115124

125+
# Run the one-time setup to enable automatic loading
126+
RUN python -m superclient install_pth
127+
116128
# Your application code
117129
COPY . /app
118130
WORKDIR /app
@@ -137,17 +149,6 @@ export SUPERSTREAM_TOPICS_LIST=orders,payments,user-events
137149
export SUPERSTREAM_LATENCY_SENSITIVE=true
138150
```
139151

140-
### SUPERSTREAM_LATENCY_SENSITIVE Explained
141-
142-
The linger.ms parameter follows these rules:
143-
144-
1. If SUPERSTREAM_LATENCY_SENSITIVE is set to true:
145-
- Linger value will never be modified, regardless of other settings
146-
147-
2. If SUPERSTREAM_LATENCY_SENSITIVE is set to false or not set:
148-
- If no explicit linger exists in original configuration: Use Superstream's optimized value
149-
- If explicit linger exists: Use the maximum of original value and Superstream's optimized value
150-
151152
## Prerequisites
152153

153154
- Python 3.8 or higher
@@ -156,4 +157,4 @@ The linger.ms parameter follows these rules:
156157

157158
## License
158159

159-
This project is licensed under the Apache License 2.0.
160+
Apache License 2.0

pyproject.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,9 @@ classifiers = [
2121
[project.urls]
2222
Homepage = "https://github.com/superstreamlabs/superclient-python"
2323

24+
[tool.setuptools]
25+
packages = ["superclient", "superclient.agent", "superclient.core", "superclient.model", "superclient.util"]
26+
include-package-data = true
27+
2428
[tool.setuptools.package-data]
25-
"superclient" = ["py.typed"]
29+
"superclient" = ["py.typed", "*.pth"]

sitecustomize.py

Lines changed: 0 additions & 6 deletions
This file was deleted.

superclient/__main__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
"""Allow running superclient module utilities"""
2+
import sys
3+
4+
if len(sys.argv) > 1 and sys.argv[1] == "install_pth":
5+
from .install_pth import install_pth
6+
install_pth()
7+
else:
8+
print("Usage: python -m superclient install_pth")

superclient/agent/__init__.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,20 @@ def _patch_module(module_name: str) -> None:
3434
"""Patch a specific module if it exists in sys.modules."""
3535
try:
3636
if module_name == "kafka" and "kafka" in sys.modules:
37-
patch_kafka_python(sys.modules["kafka"])
37+
# Check if KafkaProducer exists before patching
38+
kafka_module = sys.modules["kafka"]
39+
if hasattr(kafka_module, "KafkaProducer"):
40+
patch_kafka_python(kafka_module)
3841
elif module_name == "aiokafka" and "aiokafka" in sys.modules:
39-
patch_aiokafka(sys.modules["aiokafka"])
42+
# Check if AIOKafkaProducer exists before patching
43+
aiokafka_module = sys.modules["aiokafka"]
44+
if hasattr(aiokafka_module, "AIOKafkaProducer"):
45+
patch_aiokafka(aiokafka_module)
4046
elif module_name == "confluent_kafka" and "confluent_kafka" in sys.modules:
41-
patch_confluent(sys.modules["confluent_kafka"])
47+
# Check if Producer exists before patching
48+
confluent_module = sys.modules["confluent_kafka"]
49+
if hasattr(confluent_module, "Producer"):
50+
patch_confluent(confluent_module)
4251
except Exception as exc:
4352
logger.error("[ERR-001] Failed to patch {}: {}", module_name, exc)
4453

@@ -81,7 +90,7 @@ def initialize():
8190
8291
This function:
8392
1. Installs the import hook to catch future imports
84-
2. Patches any pre-imported modules
93+
2. Schedules patching of any pre-imported modules
8594
3. Starts the heartbeat thread
8695
"""
8796
if is_disabled():
@@ -91,9 +100,17 @@ def initialize():
91100
if builtins.__import__ is not _import_hook:
92101
builtins.__import__ = _import_hook
93102

94-
# Patch any pre-imported modules
95-
for module in ("kafka", "aiokafka", "confluent_kafka"):
96-
_patch_module(module)
103+
# Schedule patching of pre-imported modules using a deferred approach
104+
# This avoids circular import issues by running after the current import completes
105+
import threading
106+
def patch_preimported():
107+
"""Patch any modules that were imported before superclient"""
108+
for module in ("kafka", "aiokafka", "confluent_kafka"):
109+
if module in sys.modules:
110+
_patch_module(module)
111+
112+
# Use a timer with 0 delay to run after current import stack completes
113+
threading.Timer(0, patch_preimported).start()
97114

98115
# Start heartbeat
99116
Heartbeat.ensure()

superclient/agent/interceptor.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,15 @@ def init_patch(self, *args, **kwargs):
103103
if not hasattr(self, "_superstream_patch"):
104104
original_send = self.send
105105

106-
def send_patch(inner, topic, *a, **kw):
106+
def send_patch(topic, *a, **kw):
107107
"""Track topic usage when sending messages."""
108108
tr.record_topic(topic)
109109
return original_send(topic, *a, **kw)
110110

111111
self.send = send_patch
112112
orig_close = self.close
113113

114-
def close_patch(inner, *a, **kw):
114+
def close_patch(*a, **kw):
115115
"""Clean up Superstream resources when closing the producer."""
116116
if not hasattr(self, "_superstream_closed"):
117117
self._superstream_closed = True
@@ -211,14 +211,14 @@ def init_patch(self, *args, **kwargs):
211211
if not hasattr(self, "_superstream_patch"):
212212
original_send = self.send
213213

214-
async def send_patch(inner, topic, *a, **kw):
214+
async def send_patch(topic, *a, **kw):
215215
tr.record_topic(topic)
216216
return await original_send(topic, *a, **kw)
217217

218218
self.send = send_patch
219219
original_stop = self.stop
220220

221-
async def stop_patch(inner, *a, **kw):
221+
async def stop_patch(*a, **kw):
222222
if not hasattr(self, "_superstream_closed"):
223223
self._superstream_closed = True
224224
tr.close()
@@ -310,14 +310,14 @@ def init_patch(self, conf: Dict[str, Any], *args, **kwargs):
310310
if not hasattr(self, "_superstream_patch"):
311311
original_produce = self.produce
312312

313-
def produce_patch(inner, topic, *a, **kw):
313+
def produce_patch(topic, *a, **kw):
314314
tr.record_topic(topic)
315315
return original_produce(topic, *a, **kw)
316316

317317
self.produce = produce_patch
318318
orig_close = self.close
319319

320-
def close_patch(inner, *a, **kw):
320+
def close_patch(*a, **kw):
321321
if not hasattr(self, "_superstream_closed"):
322322
self._superstream_closed = True
323323
tr.close()

superclient/install_pth.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#!/usr/bin/env python3
2+
"""Install the superclient-init.pth file to enable automatic loading"""
3+
import os
4+
import sys
5+
import shutil
6+
import site
7+
8+
def install_pth():
9+
"""Install the .pth file to the appropriate site-packages directory"""
10+
# Get the path to the .pth file from package resources
11+
import superclient
12+
package_dir = os.path.dirname(superclient.__file__)
13+
source = os.path.join(package_dir, "superclient-init.pth")
14+
15+
if not os.path.exists(source):
16+
print(f"Error: {source} not found")
17+
return False
18+
19+
# Find site-packages directory
20+
site_packages = None
21+
for path in sys.path:
22+
if path.endswith('site-packages') and os.path.isdir(path):
23+
site_packages = path
24+
break
25+
26+
if not site_packages:
27+
site_packages = site.getsitepackages()[0]
28+
29+
dest = os.path.join(site_packages, "superclient-init.pth")
30+
31+
try:
32+
shutil.copy2(source, dest)
33+
print(f"Successfully installed {dest}")
34+
print("Superclient will now load automatically!")
35+
return True
36+
except Exception as e:
37+
print(f"Error installing .pth file: {e}")
38+
print(f"You may need to run this script with sudo or as administrator")
39+
return False
40+
41+
if __name__ == "__main__":
42+
install_pth()

superclient/superclient-init.pth

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
import superclient

0 commit comments

Comments
 (0)